Skip to content

Nodes

A node is a unit of work with typed inputs and outputs. Each node has an ID, a type, zero or more input ports, and one or more output ports. The type determines what the node does and which config fields it accepts.

score-leads: # node ID (human-readable slug)
type: data.sql # determines behavior
label: "Score by engagement" # optional display name
query: | # type-specific config
SELECT *, clicks * 0.3 + opens * 0.5 AS score
FROM input
FieldRequiredDescription
typeyesNode type string (see below)
labelnoDisplay name (cosmetic only — the ID is the key)
(config)variesType-specific fields determined by the node type

The node ID is the YAML key. It must be a valid slug: lowercase, hyphens, no spaces. This ID is used in edge references, file paths (nodes/<slug>/), and logs.

Reads a local file and outputs it as table data. No input ports.

read-data:
type: file.source
path: data/input.csv # relative to project root
format: csv # ndjson | csv | json
csvOptions: # only for csv format
delimiter: ","
hasHeader: true
skipRows: 0
encoding: utf-8

When to use: Loading CSV, NDJSON, or JSON data files at the start of a pipeline.

Output port: data (Table)

Outputs a single constant. No input ports.

threshold:
type: value.literal
valueType: number # string | number | boolean
value: 0.75

When to use: Thresholds, feature flags, configuration parameters that feed into downstream nodes via promoted fields.

Output port: value (Value)

DuckDB-backed operations for filtering, sorting, joining, grouping, and more. Most take a single input port and produce a single output port.

filter-active:
type: data.filter
expression: "status = 'active'"
sort-by-score:
type: data.sort
by: score
order: desc
dedup-emails:
type: data.dedup
key: email

When to use: Standard data transformations that map to SQL operations. No code to write — configure with YAML.

Available operations: data.filter, data.map, data.sort, data.limit, data.dedup, data.join, data.group, data.sql, data.concat, data.partition, data.pull, data.collect. See Data Operations for details on each.

TypeInput portsOutput ports
data.joinleft, rightoutput
data.concatinputs[0..N]output
data.partitioninputmatching, not_matching
data.pullinputvalue (Value type)
data.collectvalue_0, value_1list (Value type)
all othersinputoutput

Runs arbitrary SQL against input data. The input table is available as input in the query.

transform:
type: data.sql
query: |
SELECT name, email, score * 100 AS pct
FROM input
WHERE score > 0.5

When to use: Transformations that combine multiple SQL operations in a single step, or use features like window functions and CTEs.

Sends records to different output ports based on conditions. One input port, one output port per route.

route-by-source:
type: router
input: input
routes:
- condition: "tier = 'enterprise'"
output: enterprise
- condition: "tier = 'startup'"
output: startup
- default: true
output: other

When to use: Splitting data into separate branches for different downstream processing.

Makes HTTP requests. Supports templated URLs, auth, retries, and response parsing.

fetch-enrichment:
type: api.call
method: POST
url: "https://api.example.com/enrich"
auth:
type: bearer
bearerToken: "$secrets.API_KEY"
body:
email: "{{ email }}"
responseParsing:
fields:
- sourcePath: "data.company"
outputField: company
type: string

When to use: Calling external APIs for enrichment, webhooks, or data submission.

Input port: input (Table). Output port: output (Table).

Runs a script defined by a node-spec.yaml. Ports come from the spec.

my-transform:
type: custom
spec: nodes/my-transform/node-spec.yaml

When to use: Logic that requires external libraries, API calls with complex handling, ML inference, or domain-specific transformations that cannot be expressed as SQL.

Custom nodes declare their contract in a node-spec.yaml file:

name: score-calculator
description: Calculate engagement score from interaction data
runtime: node # node | python
inputs:
records:
type: table
schema:
email:
type: string
required: true
clicks:
type: number
opens:
type: number
outputs:
scored:
type: table
schema:
email:
type: string
score:
type: number
tier:
type: string
enum: [high, medium, low]

The spec defines the contract. The runtime validates data against it before and after execution. See Schemas for schema details.

FieldTypeRequiredDescription
typestringyesNode type
labelstringnoDisplay name
specstringnoPath to node-spec.yaml (required for custom)
parallelobjectnoFan-out configuration
promotedFieldsobjectnoConfig values that become input ports
(type config)variesvariesFields specific to the node type

Data op config values can be promoted to input ports. This lets a value from an upstream node feed into a config parameter at runtime.

filter-by-threshold:
type: data.filter
expression: "score >= $threshold"
promotedFields:
threshold:
type: number
default: 0.5

The threshold field becomes a Value input port. An upstream value.literal or data.pull node can feed it.

Any node can declare parallel fan-out. The runtime splits the input, runs the node per chunk, and merges outputs. The node code stays unchanged.

enrich:
type: custom
spec: nodes/enrich/node-spec.yaml
parallel:
over: records # which input port to split
chunks: auto # auto or a positive integer
merge: enriched # which output port to concatenate

See Execution Model for details on how parallel execution works.

Every node goes through the same lifecycle during pipeline execution:

idle --> running --> success
--> error
StatusMeaning
idleNot yet executed
runningCurrently executing
successCompleted, outputs written
errorFailed (timeout, runtime error, validation)

When a node enters error, all of its downstream dependents are skipped.

Edges connect output ports to input ports. A node’s output port can feed multiple downstream nodes (fan-out). A node’s input port receives data from exactly one upstream output port.

edges:
- "read-csv.data -> filter.input" # read-csv's data port feeds filter's input
- "read-csv.data -> summarize.input" # same output feeds a second consumer
- "filter.output -> write.records" # filter's output feeds write's records port

Port names are defined by the node type. Built-in ops use standardized names (input, output, left, right). Custom nodes define their own port names in node-spec.yaml.