Skip to content

flow.yaml

The flow.yaml file defines your entire pipeline — nodes, edges, and configuration.

A three-node pipeline that reads a CSV, filters rows, and writes results:

name: qualified-leads
version: 1
nodes:
read-leads:
type: file.source
path: data/leads.csv
format: csv
filter-active:
type: data.filter
expression: "status = 'active'"
write-output:
type: custom
spec: nodes/write-output/node-spec.yaml
edges:
- "read-leads.data -> filter-active.input"
- "filter-active.output -> write-output.records"

Save this as flow.yaml in your project root and run it with rf run.

FieldTypeRequiredDescription
namestringyesHuman-readable pipeline name (used in logs and execution results)
versionnumberyesInteger version — increment when you change the pipeline
descriptionstringnoWhat this pipeline does
nodesobjectyesMap of node ID to node definition
edgesstring[]yesList of edge strings (can be empty)
configobjectnoGlobal settings and environment variables

Each key in nodes is the node ID — a human-readable slug like read-csv or filter-active. The value defines the node type and its configuration.

nodes:
read-leads:
type: file.source
path: data/leads.csv
format: csv
filter-active:
type: data.filter
expression: "status = 'active'"
score:
type: data.sql
query: |
SELECT *, engagement_score * recency_weight AS final_score
FROM input
ORDER BY final_score DESC
write-output:
type: custom
spec: nodes/write-output/node-spec.yaml

Node IDs must be valid slugs: lowercase letters, numbers, and hyphens. No spaces, no dots, no underscores. The ID is used in edge references, file paths (nodes/<slug>/), and execution logs.

Each node type determines which config fields are valid. See Nodes for the full reference.

FieldTypeRequiredDescription
typestringyesNode type (e.g., data.filter, file.source, custom)
labelstringnoDisplay name (cosmetic only — the ID is the key)
specstringnoPath to node-spec.yaml (required for custom type)
parallelobjectnoFan-out configuration
(config)variesvariesType-specific fields like expression, query, path

Edges define data flow. Each edge is a string in the format sourceNode.port -> destNode.port:

edges:
- "read-leads.data -> filter-active.input"
- "filter-active.output -> score.input"
- "score.output -> write-output.records"

The parser splits on -> (with spaces on both sides). Port names never contain dots, so the last . separates the node ID from the port name.

Nodes with multiple input ports use indexed references:

edges:
- "source-a.output -> merge.inputs[0]"
- "source-b.output -> merge.inputs[1]"

The type checker validates every edge at parse time. It checks:

  1. Both nodes exist in the graph.
  2. Both ports exist on their respective nodes.
  3. Port types are compatible (Table to Table, Value to Value, etc.).
  4. Field schemas satisfy the destination’s requirements.

Run rf validate to check edges without executing.

Global settings that apply to the entire pipeline:

config:
maxParallel: 8
timeout: 600
env:
API_KEY: "$secrets.API_KEY"
BASE_URL: "https://api.example.com"
FieldTypeDefaultDescription
maxParallelnumber4Max nodes running concurrently per level
timeoutnumber300Per-node timeout in seconds
envobjectEnvironment variables available to all nodes

Environment variables prefixed with $secrets. are resolved from the secrets store at runtime.

A single node, no edges:

name: minimal
version: 1
nodes:
greeting:
type: value.literal
valueType: string
value: "hello world"
edges: []
name: fan-out
version: 1
nodes:
load-data:
type: file.source
path: data/users.csv
format: csv
active-users:
type: data.filter
expression: "status = 'active'"
inactive-users:
type: data.filter
expression: "status = 'inactive'"
count-active:
type: data.group
by: [status]
aggregations:
total:
op: count
field: "*"
edges:
- "load-data.data -> active-users.input"
- "load-data.data -> inactive-users.input"
- "active-users.output -> count-active.input"

One source feeds multiple downstream nodes. Each consumer gets the same data independently.

name: fan-in
version: 1
nodes:
load-orders:
type: file.source
path: data/orders.csv
format: csv
load-customers:
type: file.source
path: data/customers.csv
format: csv
enrich:
type: data.join
join_type: left
left_key: customer_id
right_key: id
edges:
- "load-orders.data -> enrich.left"
- "load-customers.data -> enrich.right"

Two sources feed into a join node. The join waits for both inputs before executing.

name: route-by-tier
version: 1
nodes:
load-leads:
type: file.source
path: data/leads.csv
format: csv
classify:
type: router
input: input
routes:
- condition: "tier = 'enterprise'"
output: enterprise
- condition: "tier = 'startup'"
output: startup
- default: true
output: other
handle-enterprise:
type: custom
spec: nodes/handle-enterprise/node-spec.yaml
handle-startup:
type: custom
spec: nodes/handle-startup/node-spec.yaml
edges:
- "load-leads.data -> classify.input"
- "classify.enterprise -> handle-enterprise.records"
- "classify.startup -> handle-startup.records"

Router nodes split data into named output ports based on conditions. Each route becomes a separate output port.

nodes:
enrich:
type: custom
spec: nodes/enrich/node-spec.yaml
parallel:
over: records # input port to split
chunks: auto # or a fixed number like 4
merge: enriched # output port to concatenate

The runtime splits the input, runs the node once per chunk, and merges the outputs. The node code does not change.

The parser validates flow.yaml on load. Invalid nodes store errors but do not block parsing of the rest of the graph. Edge syntax errors are caught immediately.

Terminal window
rf validate