Skip to content

Nodes

A node is the unit of work in a pipeline. Each node has an ID, a type, typed input ports, and typed output ports. The type determines what the node does and which config fields it accepts.

score-leads: # node ID (human-readable slug)
type: data.sql # determines behavior
label: "Score by engagement" # optional display name
query: | # type-specific config
SELECT *, clicks * 0.3 + opens * 0.5 AS score
FROM input
FieldRequiredDescription
typeyesNode type string
labelnoDisplay name (cosmetic only, ID is the key)
configvariesType-specific fields (see below)

The node ID is the YAML key. It must be a valid slug: lowercase, hyphens, no spaces. This ID is used in edge references and file paths.

Reads a local file and outputs it as table data. No input ports.

read-data:
type: file.source
path: data/input.csv # relative to project root
format: csv # ndjson | csv | json
csvOptions: # only for csv format
delimiter: ","
hasHeader: true
skipRows: 0
encoding: utf-8

Output port: data (table).

Outputs a single constant. No input ports.

threshold:
type: value.literal
valueType: number # string | number | boolean
value: 0.75

Output port: value (value).

DuckDB-backed operations. Single input port, single output port (with exceptions below).

filter-active:
type: data.filter
expression: "status = 'active'"
sort-by-score:
type: data.sort
by: score
order: desc
dedup-emails:
type: data.dedup
key: email

Available data ops: data.filter, data.map, data.sort, data.limit, data.dedup, data.join, data.group, data.sql, data.concat, data.partition, data.pull, data.collect.

Special port layouts:

TypeInput portsOutput ports
data.joinleft, rightoutput
data.concatinputs[0..N]output
data.partitioninputmatching, not_matching
data.pullinputvalue (value type)
data.collectvalue_0, value_1list (value type)
all othersinputoutput

Runs arbitrary SQL against input data. The input table is available as input in the query.

transform:
type: data.sql
query: |
SELECT name, email, score * 100 AS pct
FROM input
WHERE score > 0.5

Sends records to different output ports based on conditions. One input port, one output port per route.

route-by-source:
type: router
input: input
routes:
- condition: "tier = 'enterprise'"
output: enterprise
- condition: "tier = 'startup'"
output: startup
- default: true
output: other

Makes HTTP requests. Supports templated URLs, auth, retries, and response parsing.

fetch-enrichment:
type: api.call
method: POST
url: "https://api.example.com/enrich"
auth:
type: bearer
bearerToken: "$secrets.API_KEY"
body:
email: "{{ email }}"
responseParsing:
fields:
- sourcePath: "data.company"
outputField: company
type: string

Input port: input (table). Output port: output (table).

Runs a script defined by a node-spec.yaml. Ports come from the spec.

my-transform:
type: custom
spec: nodes/my-transform/node-spec.yaml

Custom nodes declare their contract in a node-spec.yaml file:

name: score-calculator
description: Calculate engagement score from interaction data
runtime: node # node | python
inputs:
records:
type: table
schema:
email:
type: string
required: true
clicks:
type: number
opens:
type: number
outputs:
scored:
type: table
schema:
email:
type: string
score:
type: number
tier:
type: string
enum: [high, medium, low]

The spec defines the contract. The runtime validates data against it before and after execution.

Data op config values can be promoted to input ports. This lets a value from an upstream node feed into a config parameter at runtime.

filter-by-threshold:
type: data.filter
expression: "score >= $threshold"
promotedFields:
threshold:
type: number
default: 0.5

The threshold field becomes a value input port. An upstream value.literal or data.pull node can feed it.

Any node can declare parallel fan-out. The runtime splits the input, runs the node per chunk, and merges outputs. The node code stays unchanged.

enrich:
type: custom
spec: nodes/enrich/node-spec.yaml
parallel:
over: records # which input port to split
chunks: auto # auto or positive integer
merge: enriched # which output port to concatenate

See Execution Model for details.