Pipeline YAML
Every Radhflow pipeline is a single pipeline.rf.yaml file. It declares metadata, nodes, and edges. The graph parser reads this file, validates it, and produces a typed execution plan.
Top-level structure
Section titled “Top-level structure”name: lead-scoringversion: 1description: Score leads by engagement and push top tier to CRM
nodes: # ... node definitions
edges: # ... edge definitions| Field | Type | Required | Description |
|---|---|---|---|
name | string | yes | Human-readable pipeline name |
version | number | yes | Integer version, increment on change |
description | string | no | What this pipeline does |
nodes | object | yes | Map of node ID to node definition |
edges | string[] | yes | List of edge strings |
Nodes section
Section titled “Nodes section”Each key in nodes is the node ID — a human-readable slug like read-csv or filter-active. The value defines the node type and its configuration.
nodes: read-leads: type: file.source path: data/leads.csv format: csv
filter-active: type: data.filter expression: "status = 'active'"
score: type: data.sql query: | SELECT *, engagement_score * recency_weight AS final_score FROM input ORDER BY final_score DESC
write-output: type: custom spec: nodes/write-output/node-spec.yamlNode types determine which config fields are valid. See Nodes for the full list.
Edges section
Section titled “Edges section”Edges are strings in the format sourceNode.port -> destNode.port. They define data flow between nodes.
edges: - "read-leads.data -> filter-active.input" - "filter-active.output -> score.input" - "score.output -> write-output.records"The edge parser splits on -> (with spaces). Port names never contain dots, so the last . separates node ID from port name. Node IDs like file.source work because the parser uses lastIndexOf('.').
Indexed inputs for nodes with multiple input ports:
edges: - "source-a.output -> merge.inputs[0]" - "source-b.output -> merge.inputs[1]"Minimal example
Section titled “Minimal example”The smallest valid pipeline — a single source node with no edges:
name: minimalversion: 1nodes: greeting: type: value.literal valueType: string value: "hello world"edges: []Full example
Section titled “Full example”A pipeline that reads a CSV, filters rows, enriches with a SQL transform, and writes output:
name: lead-scoringversion: 1description: Read leads, filter active ones, score by engagement, write top tier
nodes: read-leads: type: file.source path: data/leads.csv format: csv csvOptions: delimiter: "," hasHeader: true
filter-active: type: data.filter expression: "status = 'active' AND email IS NOT NULL"
score-leads: type: data.sql query: | SELECT *, (clicks * 0.3 + opens * 0.5 + replies * 0.2) AS score FROM input ORDER BY score DESC
top-tier: type: data.filter expression: "score >= 80"
write-results: type: custom spec: nodes/write-results/node-spec.yaml
edges: - "read-leads.data -> filter-active.input" - "filter-active.output -> score-leads.input" - "score-leads.output -> top-tier.input" - "top-tier.output -> write-results.records"Fan-out with parallel
Section titled “Fan-out with parallel”Nodes can declare parallel execution. The runtime splits the input, runs the node once per chunk, and merges the outputs. The node code does not change.
nodes: enrich: type: custom spec: nodes/enrich/node-spec.yaml parallel: over: records # input port to split chunks: auto # or a fixed number like 4 merge: enriched # output port to concatenateRouter nodes
Section titled “Router nodes”Route records to different outputs based on conditions:
nodes: classify: type: router input: input routes: - condition: "source = 'reddit'" output: reddit - condition: "source = 'twitter'" output: twitter - default: true output: otherValidation
Section titled “Validation”The parser validates the YAML on load. Invalid nodes store errors but do not block parsing of the rest of the graph. Edge syntax errors are caught immediately. Run rf validate to check a pipeline before execution.