Nodes
A node is the unit of work in a pipeline. Each node has an ID, a type, typed input ports, and typed output ports. The type determines what the node does and which config fields it accepts.
Anatomy
Section titled “Anatomy”score-leads: # node ID (human-readable slug) type: data.sql # determines behavior label: "Score by engagement" # optional display name query: | # type-specific config SELECT *, clicks * 0.3 + opens * 0.5 AS score FROM input| Field | Required | Description |
|---|---|---|
type | yes | Node type string |
label | no | Display name (cosmetic only, ID is the key) |
| config | varies | Type-specific fields (see below) |
The node ID is the YAML key. It must be a valid slug: lowercase, hyphens, no spaces. This ID is used in edge references and file paths.
Node types
Section titled “Node types”file.source — read a file
Section titled “file.source — read a file”Reads a local file and outputs it as table data. No input ports.
read-data: type: file.source path: data/input.csv # relative to project root format: csv # ndjson | csv | json csvOptions: # only for csv format delimiter: "," hasHeader: true skipRows: 0 encoding: utf-8Output port: data (table).
value.literal — constant value
Section titled “value.literal — constant value”Outputs a single constant. No input ports.
threshold: type: value.literal valueType: number # string | number | boolean value: 0.75Output port: value (value).
data.* — built-in transforms
Section titled “data.* — built-in transforms”DuckDB-backed operations. Single input port, single output port (with exceptions below).
filter-active: type: data.filter expression: "status = 'active'"
sort-by-score: type: data.sort by: score order: desc
dedup-emails: type: data.dedup key: emailAvailable data ops: data.filter, data.map, data.sort, data.limit, data.dedup, data.join, data.group, data.sql, data.concat, data.partition, data.pull, data.collect.
Special port layouts:
| Type | Input ports | Output ports |
|---|---|---|
data.join | left, right | output |
data.concat | inputs[0..N] | output |
data.partition | input | matching, not_matching |
data.pull | input | value (value type) |
data.collect | value_0, value_1… | list (value type) |
| all others | input | output |
data.sql — DuckDB query
Section titled “data.sql — DuckDB query”Runs arbitrary SQL against input data. The input table is available as input in the query.
transform: type: data.sql query: | SELECT name, email, score * 100 AS pct FROM input WHERE score > 0.5router — conditional routing
Section titled “router — conditional routing”Sends records to different output ports based on conditions. One input port, one output port per route.
route-by-source: type: router input: input routes: - condition: "tier = 'enterprise'" output: enterprise - condition: "tier = 'startup'" output: startup - default: true output: otherapi.call — external API
Section titled “api.call — external API”Makes HTTP requests. Supports templated URLs, auth, retries, and response parsing.
fetch-enrichment: type: api.call method: POST url: "https://api.example.com/enrich" auth: type: bearer bearerToken: "$secrets.API_KEY" body: email: "{{ email }}" responseParsing: fields: - sourcePath: "data.company" outputField: company type: stringInput port: input (table). Output port: output (table).
custom — user-defined code
Section titled “custom — user-defined code”Runs a script defined by a node-spec.yaml. Ports come from the spec.
my-transform: type: custom spec: nodes/my-transform/node-spec.yamlnode-spec.yaml
Section titled “node-spec.yaml”Custom nodes declare their contract in a node-spec.yaml file:
name: score-calculatordescription: Calculate engagement score from interaction dataruntime: node # node | python
inputs: records: type: table schema: email: type: string required: true clicks: type: number opens: type: number
outputs: scored: type: table schema: email: type: string score: type: number tier: type: string enum: [high, medium, low]The spec defines the contract. The runtime validates data against it before and after execution.
Promoted fields
Section titled “Promoted fields”Data op config values can be promoted to input ports. This lets a value from an upstream node feed into a config parameter at runtime.
filter-by-threshold: type: data.filter expression: "score >= $threshold" promotedFields: threshold: type: number default: 0.5The threshold field becomes a value input port. An upstream value.literal or data.pull node can feed it.
Parallel execution
Section titled “Parallel execution”Any node can declare parallel fan-out. The runtime splits the input, runs the node per chunk, and merges outputs. The node code stays unchanged.
enrich: type: custom spec: nodes/enrich/node-spec.yaml parallel: over: records # which input port to split chunks: auto # auto or positive integer merge: enriched # which output port to concatenateSee Execution Model for details.