Nodes
A node is a unit of work with typed inputs and outputs. Each node has an ID, a type, zero or more input ports, and one or more output ports. The type determines what the node does and which config fields it accepts.
Anatomy of a node
Section titled “Anatomy of a node”score-leads: # node ID (human-readable slug) type: data.sql # determines behavior label: "Score by engagement" # optional display name query: | # type-specific config SELECT *, clicks * 0.3 + opens * 0.5 AS score FROM input| Field | Required | Description |
|---|---|---|
type | yes | Node type string (see below) |
label | no | Display name (cosmetic only — the ID is the key) |
| (config) | varies | Type-specific fields determined by the node type |
The node ID is the YAML key. It must be a valid slug: lowercase, hyphens, no spaces. This ID is used in edge references, file paths (nodes/<slug>/), and logs.
Node types
Section titled “Node types”file.source — read a file
Section titled “file.source — read a file”Reads a local file and outputs it as table data. No input ports.
read-data: type: file.source path: data/input.csv # relative to project root format: csv # ndjson | csv | json csvOptions: # only for csv format delimiter: "," hasHeader: true skipRows: 0 encoding: utf-8When to use: Loading CSV, NDJSON, or JSON data files at the start of a pipeline.
Output port: data (Table)
value.literal — constant value
Section titled “value.literal — constant value”Outputs a single constant. No input ports.
threshold: type: value.literal valueType: number # string | number | boolean value: 0.75When to use: Thresholds, feature flags, configuration parameters that feed into downstream nodes via promoted fields.
Output port: value (Value)
data.* — built-in transforms
Section titled “data.* — built-in transforms”DuckDB-backed operations for filtering, sorting, joining, grouping, and more. Most take a single input port and produce a single output port.
filter-active: type: data.filter expression: "status = 'active'"
sort-by-score: type: data.sort by: score order: desc
dedup-emails: type: data.dedup key: emailWhen to use: Standard data transformations that map to SQL operations. No code to write — configure with YAML.
Available operations: data.filter, data.map, data.sort, data.limit, data.dedup, data.join, data.group, data.sql, data.concat, data.partition, data.pull, data.collect. See Data Operations for details on each.
Port layout by type
Section titled “Port layout by type”| Type | Input ports | Output ports |
|---|---|---|
data.join | left, right | output |
data.concat | inputs[0..N] | output |
data.partition | input | matching, not_matching |
data.pull | input | value (Value type) |
data.collect | value_0, value_1… | list (Value type) |
| all others | input | output |
data.sql — DuckDB query
Section titled “data.sql — DuckDB query”Runs arbitrary SQL against input data. The input table is available as input in the query.
transform: type: data.sql query: | SELECT name, email, score * 100 AS pct FROM input WHERE score > 0.5When to use: Transformations that combine multiple SQL operations in a single step, or use features like window functions and CTEs.
router — conditional routing
Section titled “router — conditional routing”Sends records to different output ports based on conditions. One input port, one output port per route.
route-by-source: type: router input: input routes: - condition: "tier = 'enterprise'" output: enterprise - condition: "tier = 'startup'" output: startup - default: true output: otherWhen to use: Splitting data into separate branches for different downstream processing.
api.call — external API
Section titled “api.call — external API”Makes HTTP requests. Supports templated URLs, auth, retries, and response parsing.
fetch-enrichment: type: api.call method: POST url: "https://api.example.com/enrich" auth: type: bearer bearerToken: "$secrets.API_KEY" body: email: "{{ email }}" responseParsing: fields: - sourcePath: "data.company" outputField: company type: stringWhen to use: Calling external APIs for enrichment, webhooks, or data submission.
Input port: input (Table). Output port: output (Table).
custom — user-defined code
Section titled “custom — user-defined code”Runs a script defined by a node-spec.yaml. Ports come from the spec.
my-transform: type: custom spec: nodes/my-transform/node-spec.yamlWhen to use: Logic that requires external libraries, API calls with complex handling, ML inference, or domain-specific transformations that cannot be expressed as SQL.
node-spec.yaml
Section titled “node-spec.yaml”Custom nodes declare their contract in a node-spec.yaml file:
name: score-calculatordescription: Calculate engagement score from interaction dataruntime: node # node | python
inputs: records: type: table schema: email: type: string required: true clicks: type: number opens: type: number
outputs: scored: type: table schema: email: type: string score: type: number tier: type: string enum: [high, medium, low]The spec defines the contract. The runtime validates data against it before and after execution. See Schemas for schema details.
Node configuration fields
Section titled “Node configuration fields”| Field | Type | Required | Description |
|---|---|---|---|
type | string | yes | Node type |
label | string | no | Display name |
spec | string | no | Path to node-spec.yaml (required for custom) |
parallel | object | no | Fan-out configuration |
promotedFields | object | no | Config values that become input ports |
| (type config) | varies | varies | Fields specific to the node type |
Promoted fields
Section titled “Promoted fields”Data op config values can be promoted to input ports. This lets a value from an upstream node feed into a config parameter at runtime.
filter-by-threshold: type: data.filter expression: "score >= $threshold" promotedFields: threshold: type: number default: 0.5The threshold field becomes a Value input port. An upstream value.literal or data.pull node can feed it.
Parallel execution
Section titled “Parallel execution”Any node can declare parallel fan-out. The runtime splits the input, runs the node per chunk, and merges outputs. The node code stays unchanged.
enrich: type: custom spec: nodes/enrich/node-spec.yaml parallel: over: records # which input port to split chunks: auto # auto or a positive integer merge: enriched # which output port to concatenateSee Execution Model for details on how parallel execution works.
Node lifecycle
Section titled “Node lifecycle”Every node goes through the same lifecycle during pipeline execution:
idle --> running --> success --> error| Status | Meaning |
|---|---|
idle | Not yet executed |
running | Currently executing |
success | Completed, outputs written |
error | Failed (timeout, runtime error, validation) |
When a node enters error, all of its downstream dependents are skipped.
How nodes relate to edges and ports
Section titled “How nodes relate to edges and ports”Edges connect output ports to input ports. A node’s output port can feed multiple downstream nodes (fan-out). A node’s input port receives data from exactly one upstream output port.
edges: - "read-csv.data -> filter.input" # read-csv's data port feeds filter's input - "read-csv.data -> summarize.input" # same output feeds a second consumer - "filter.output -> write.records" # filter's output feeds write's records portPort names are defined by the node type. Built-in ops use standardized names (input, output, left, right). Custom nodes define their own port names in node-spec.yaml.