flow.yaml
The flow.yaml file defines your entire pipeline — nodes, edges, and configuration.
Minimal complete example
Section titled “Minimal complete example”A three-node pipeline that reads a CSV, filters rows, and writes results:
name: qualified-leadsversion: 1
nodes: read-leads: type: file.source path: data/leads.csv format: csv
filter-active: type: data.filter expression: "status = 'active'"
write-output: type: custom spec: nodes/write-output/node-spec.yaml
edges: - "read-leads.data -> filter-active.input" - "filter-active.output -> write-output.records"Save this as flow.yaml in your project root and run it with rf run.
Top-level keys
Section titled “Top-level keys”| Field | Type | Required | Description |
|---|---|---|---|
name | string | yes | Human-readable pipeline name (used in logs and execution results) |
version | number | yes | Integer version — increment when you change the pipeline |
description | string | no | What this pipeline does |
nodes | object | yes | Map of node ID to node definition |
edges | string[] | yes | List of edge strings (can be empty) |
config | object | no | Global settings and environment variables |
Each key in nodes is the node ID — a human-readable slug like read-csv or filter-active. The value defines the node type and its configuration.
nodes: read-leads: type: file.source path: data/leads.csv format: csv
filter-active: type: data.filter expression: "status = 'active'"
score: type: data.sql query: | SELECT *, engagement_score * recency_weight AS final_score FROM input ORDER BY final_score DESC
write-output: type: custom spec: nodes/write-output/node-spec.yamlNode IDs must be valid slugs: lowercase letters, numbers, and hyphens. No spaces, no dots, no underscores. The ID is used in edge references, file paths (nodes/<slug>/), and execution logs.
Each node type determines which config fields are valid. See Nodes for the full reference.
Node definition fields
Section titled “Node definition fields”| Field | Type | Required | Description |
|---|---|---|---|
type | string | yes | Node type (e.g., data.filter, file.source, custom) |
label | string | no | Display name (cosmetic only — the ID is the key) |
spec | string | no | Path to node-spec.yaml (required for custom type) |
parallel | object | no | Fan-out configuration |
| (config) | varies | varies | Type-specific fields like expression, query, path |
Edges define data flow. Each edge is a string in the format sourceNode.port -> destNode.port:
edges: - "read-leads.data -> filter-active.input" - "filter-active.output -> score.input" - "score.output -> write-output.records"The parser splits on -> (with spaces on both sides). Port names never contain dots, so the last . separates the node ID from the port name.
Indexed inputs
Section titled “Indexed inputs”Nodes with multiple input ports use indexed references:
edges: - "source-a.output -> merge.inputs[0]" - "source-b.output -> merge.inputs[1]"Edge validation
Section titled “Edge validation”The type checker validates every edge at parse time. It checks:
- Both nodes exist in the graph.
- Both ports exist on their respective nodes.
- Port types are compatible (Table to Table, Value to Value, etc.).
- Field schemas satisfy the destination’s requirements.
Run rf validate to check edges without executing.
Config
Section titled “Config”Global settings that apply to the entire pipeline:
config: maxParallel: 8 timeout: 600 env: API_KEY: "$secrets.API_KEY" BASE_URL: "https://api.example.com"| Field | Type | Default | Description |
|---|---|---|---|
maxParallel | number | 4 | Max nodes running concurrently per level |
timeout | number | 300 | Per-node timeout in seconds |
env | object | — | Environment variables available to all nodes |
Environment variables prefixed with $secrets. are resolved from the secrets store at runtime.
Progressive examples
Section titled “Progressive examples”The smallest valid pipeline
Section titled “The smallest valid pipeline”A single node, no edges:
name: minimalversion: 1nodes: greeting: type: value.literal valueType: string value: "hello world"edges: []Fan-out: one source, multiple consumers
Section titled “Fan-out: one source, multiple consumers”name: fan-outversion: 1
nodes: load-data: type: file.source path: data/users.csv format: csv
active-users: type: data.filter expression: "status = 'active'"
inactive-users: type: data.filter expression: "status = 'inactive'"
count-active: type: data.group by: [status] aggregations: total: op: count field: "*"
edges: - "load-data.data -> active-users.input" - "load-data.data -> inactive-users.input" - "active-users.output -> count-active.input"One source feeds multiple downstream nodes. Each consumer gets the same data independently.
Fan-in: multiple sources, one consumer
Section titled “Fan-in: multiple sources, one consumer”name: fan-inversion: 1
nodes: load-orders: type: file.source path: data/orders.csv format: csv
load-customers: type: file.source path: data/customers.csv format: csv
enrich: type: data.join join_type: left left_key: customer_id right_key: id
edges: - "load-orders.data -> enrich.left" - "load-customers.data -> enrich.right"Two sources feed into a join node. The join waits for both inputs before executing.
Conditional branching
Section titled “Conditional branching”name: route-by-tierversion: 1
nodes: load-leads: type: file.source path: data/leads.csv format: csv
classify: type: router input: input routes: - condition: "tier = 'enterprise'" output: enterprise - condition: "tier = 'startup'" output: startup - default: true output: other
handle-enterprise: type: custom spec: nodes/handle-enterprise/node-spec.yaml
handle-startup: type: custom spec: nodes/handle-startup/node-spec.yaml
edges: - "load-leads.data -> classify.input" - "classify.enterprise -> handle-enterprise.records" - "classify.startup -> handle-startup.records"Router nodes split data into named output ports based on conditions. Each route becomes a separate output port.
Parallel fan-out on a single node
Section titled “Parallel fan-out on a single node”nodes: enrich: type: custom spec: nodes/enrich/node-spec.yaml parallel: over: records # input port to split chunks: auto # or a fixed number like 4 merge: enriched # output port to concatenateThe runtime splits the input, runs the node once per chunk, and merges the outputs. The node code does not change.
Validation
Section titled “Validation”The parser validates flow.yaml on load. Invalid nodes store errors but do not block parsing of the rest of the graph. Edge syntax errors are caught immediately.
rf validate