Skip to content

Execution Model

The executor takes a parsed graph and runs it deterministically. Same input, same output, every time. No LLM in the loop at runtime.

Five phases, in order:

validate → resolve → sort → execute → collect

Type-check all edges. Compare source port schemas against destination port schemas. Errors block execution. Warnings are logged.

Determine which nodes to run:

  • Full run: all nodes execute.
  • --node <id>: target node + its transitive upstream dependencies.
  • --from <id>: start at node, run it and all downstream dependents.

Topological sort using Kahn’s algorithm (BFS). Produces execution levels — groups of nodes with no dependencies on each other.

Level 0: [read-csv, read-api] # no dependencies
Level 1: [filter, transform] # depend on level 0
Level 2: [join] # depends on level 1
Level 3: [write-output] # depends on level 2

Cycle detection is built in. If the graph has cycles, execution fails with the list of nodes involved.

Run nodes level by level. Within each level, nodes run in parallel up to maxParallel (default: 4). Each node reads its input files, runs its operation, and writes output files.

Write execution results to .gain/executions/<timestamp>.json. Each result contains per-node status, timing, row counts, and any errors.

The sort produces two outputs:

Sorted order: a flat list of node IDs in valid execution order.

Execution levels: groups of nodes that can run concurrently.

# Given this graph:
nodes:
a: { type: file.source, path: data/a.csv, format: csv }
b: { type: file.source, path: data/b.csv, format: csv }
c: { type: data.filter, expression: "x > 0" }
d: { type: data.filter, expression: "y > 0" }
e: { type: data.join, leftKey: id, rightKey: id }
edges:
- "a.data -> c.input"
- "b.data -> d.input"
- "c.output -> e.left"
- "d.output -> e.right"

Result:

levels:
- [a, b] # independent sources, run in parallel
- [c, d] # independent filters, run in parallel
- [e] # join waits for both c and d

Nodes within a level are sorted alphabetically for deterministic ordering.

Nodes at the same execution level run concurrently, limited by maxParallel. This is implicit — you do not configure it. Independent branches automatically parallelize.

a ──→ c ──┐
├──→ e
b ──→ d ──┘

Nodes a and b run in parallel. Nodes c and d run in parallel after their inputs complete. Node e waits for both.

When a level has more nodes than maxParallel, nodes are split into batches. Each batch runs concurrently, then the next batch starts.

A single node can process its input in parallel chunks using the parallel config:

enrich:
type: custom
spec: nodes/enrich/node-spec.yaml
parallel:
over: records # input port to split
chunks: auto # or a fixed integer
merge: enriched # output port to concatenate

The runtime:

  1. Reads the NDJSON file for the over port.
  2. Splits it into chunks files (auto = min(rows, maxParallel)).
  3. Runs the node once per chunk in an isolated working directory.
  4. Concatenates the merge port outputs in order.

The node code is unchanged. It reads a smaller input and writes a smaller output. It does not know it is running in parallel.

chunks valueBehavior
automin(total rows, maxParallel)
integer Nmin(N, total rows) — never more chunks than rows

If some chunks fail and others succeed, the node produces a merged output from the successful chunks. The execution result logs which chunks failed.

Edges define data flow. A node’s input port receives data from whichever upstream node’s output port is connected via an edge.

At execution time, the executor resolves input paths:

  1. Find all edges where destNode is the current node.
  2. Look up the output file path from each sourceNode.
  3. Map sourcePort output file to destPort input path.

The node reads its inputs from these resolved paths. There is no global variable scope — data flows only through edges.

Run a single node and everything upstream of it. The executor walks the dependency graph backward (BFS from target) to find all transitive dependencies.

a → b → c → d → e

--node d runs: a, b, c, d. Skips e.

Run from a node onward. The executor walks forward to find all downstream dependents.

--from c runs: c, d, e. Skips a, b (assumes their outputs already exist).

Validate and sort but do not execute. Reports which nodes would run.

When a node fails, all downstream nodes are skipped with status skipped and the message “upstream node failed”. The executor does not attempt partial execution of dependent nodes.

Nodes with validation errors (missing required fields, bad config) fail immediately with status failed and error code INVALID_CONFIG. Their downstream nodes are skipped.

The overall execution result has one of three statuses:

StatusMeaning
successAll nodes completed successfully
partialSome nodes succeeded, some failed or were skipped
failedNo nodes succeeded
StatusMeaning
successNode completed, outputs written
failedNode threw an error or timed out
skippedNot executed (upstream failed or dry run)

Each node has a per-node timeout (default: 300 seconds). If a node exceeds it, the node is killed and marked failed.

Written to .gain/executions/<timestamp>.json:

{
"graph": "lead-scoring",
"status": "success",
"startedAt": "2025-01-15T10:00:00.000Z",
"completedAt": "2025-01-15T10:00:02.500Z",
"durationMs": 2500,
"nodes": {
"read-leads": {
"status": "success",
"startedAt": "2025-01-15T10:00:00.010Z",
"completedAt": "2025-01-15T10:00:00.150Z",
"durationMs": 140,
"outputs": {
"data": {
"path": "nodes/read-leads/output/data.ndjson",
"rowCount": 5000,
"bytes": 312000
}
}
},
"filter-active": {
"status": "success",
"startedAt": "2025-01-15T10:00:00.155Z",
"completedAt": "2025-01-15T10:00:00.300Z",
"durationMs": 145,
"outputs": {
"output": {
"path": "nodes/filter-active/output/output.ndjson",
"rowCount": 3200,
"bytes": 198400
}
}
}
}
}

Each node result includes timing, output file paths, row counts, byte sizes, and any error details.