Skip to content

Pipeline YAML

Every Radhflow pipeline is a single pipeline.rf.yaml file. It declares metadata, nodes, and edges. The graph parser reads this file, validates it, and produces a typed execution plan.

name: lead-scoring
version: 1
description: Score leads by engagement and push top tier to CRM
nodes:
# ... node definitions
edges:
# ... edge definitions
FieldTypeRequiredDescription
namestringyesHuman-readable pipeline name
versionnumberyesInteger version, increment on change
descriptionstringnoWhat this pipeline does
nodesobjectyesMap of node ID to node definition
edgesstring[]yesList of edge strings

Each key in nodes is the node ID — a human-readable slug like read-csv or filter-active. The value defines the node type and its configuration.

nodes:
read-leads:
type: file.source
path: data/leads.csv
format: csv
filter-active:
type: data.filter
expression: "status = 'active'"
score:
type: data.sql
query: |
SELECT *, engagement_score * recency_weight AS final_score
FROM input
ORDER BY final_score DESC
write-output:
type: custom
spec: nodes/write-output/node-spec.yaml

Node types determine which config fields are valid. See Nodes for the full list.

Edges are strings in the format sourceNode.port -> destNode.port. They define data flow between nodes.

edges:
- "read-leads.data -> filter-active.input"
- "filter-active.output -> score.input"
- "score.output -> write-output.records"

The edge parser splits on -> (with spaces). Port names never contain dots, so the last . separates node ID from port name. Node IDs like file.source work because the parser uses lastIndexOf('.').

Indexed inputs for nodes with multiple input ports:

edges:
- "source-a.output -> merge.inputs[0]"
- "source-b.output -> merge.inputs[1]"

The smallest valid pipeline — a single source node with no edges:

name: minimal
version: 1
nodes:
greeting:
type: value.literal
valueType: string
value: "hello world"
edges: []

A pipeline that reads a CSV, filters rows, enriches with a SQL transform, and writes output:

name: lead-scoring
version: 1
description: Read leads, filter active ones, score by engagement, write top tier
nodes:
read-leads:
type: file.source
path: data/leads.csv
format: csv
csvOptions:
delimiter: ","
hasHeader: true
filter-active:
type: data.filter
expression: "status = 'active' AND email IS NOT NULL"
score-leads:
type: data.sql
query: |
SELECT *,
(clicks * 0.3 + opens * 0.5 + replies * 0.2) AS score
FROM input
ORDER BY score DESC
top-tier:
type: data.filter
expression: "score >= 80"
write-results:
type: custom
spec: nodes/write-results/node-spec.yaml
edges:
- "read-leads.data -> filter-active.input"
- "filter-active.output -> score-leads.input"
- "score-leads.output -> top-tier.input"
- "top-tier.output -> write-results.records"

Nodes can declare parallel execution. The runtime splits the input, runs the node once per chunk, and merges the outputs. The node code does not change.

nodes:
enrich:
type: custom
spec: nodes/enrich/node-spec.yaml
parallel:
over: records # input port to split
chunks: auto # or a fixed number like 4
merge: enriched # output port to concatenate

Route records to different outputs based on conditions:

nodes:
classify:
type: router
input: input
routes:
- condition: "source = 'reddit'"
output: reddit
- condition: "source = 'twitter'"
output: twitter
- default: true
output: other

The parser validates the YAML on load. Invalid nodes store errors but do not block parsing of the rest of the graph. Edge syntax errors are caught immediately. Run rf validate to check a pipeline before execution.