Skip to content

Pipeline Spec

This is the structured reference for pipeline.rf.yaml. AI agents parse this page to generate valid pipeline definitions. Every field is documented with its type, constraints, and purpose.

# pipeline.rf.yaml — complete schema
# ─────────────────────────────────────────────
name: lead-scoring # string, required. Human-readable pipeline name.
# Lowercase, hyphens, no spaces.
version: 1 # integer, required. Increment on breaking changes.
description: > # string, optional. What this pipeline does.
Score inbound leads by engagement metrics
and push qualified leads to CRM.
nodes: # object, required. Map of node ID → node definition.
# ... (see Node schema below)
edges: # string[], required. List of edge strings.
# ... (see Edge schema below)

Each key under nodes is the node ID. IDs are human-readable slugs: lowercase, hyphens, no spaces, no dots in the ID itself.

nodes:
read-leads: # string (node ID), required. Unique within the pipeline.
type: file.source # string, required. Node type identifier.
# Format: category.operation
# Categories: file, data, http, google, browser,
# cli, value, router, custom
# ── Type-specific config fields ──────────────────────────
# These vary by node type. Examples below.
path: data/leads.csv # string. File path (file.source, file.write).
format: csv # string. File format: csv, json, ndjson.
query: "SELECT * FROM i" # string. SQL query (data.sql).
expression: "score > 80" # string. Filter expression (data.filter).
url: "https://api.ex/v1" # string. URL (http.request).
method: GET # string. HTTP method (http.request).
value: "hello" # any. Literal value (value.literal).
valueType: string # string. Value type (value.literal).
# ── Common optional fields ───────────────────────────────
spec: nodes/x/spec.yaml # string, optional. Path to external node-spec.yaml
# for custom nodes.
parallel: # object, optional. Parallel execution config.
over: input # string. Input port to split.
chunks: auto # integer | "auto". Number of parallel chunks.
merge: output # string. Output port to concatenate.
csvOptions: # object, optional. CSV parsing options.
delimiter: "," # string. Field delimiter.
hasHeader: true # boolean. First row is header.
quote: "\"" # string. Quote character.
TypeCategoryConfig fields
file.sourceFile I/Opath, format, csvOptions
file.writeFile I/Opath, format
data.sqlTransformquery
data.filterTransformexpression
data.mapTransformexpression
data.sortTransformfield, order
data.limitTransformcount
data.dedupTransformfields
data.joinTransformon, type
data.groupTransformby, aggregations
http.requestConnectorurl, method, headers, body
google.sheetsConnectorspreadsheetId, range, credentials
browser.extractConnectorurl, selector, waitFor
cli.runCLIcommand, env, sandbox
value.literalValuevalue, valueType
routerControlinput, routes
customCustomspec (path to node-spec.yaml)

Edges are strings in the format sourceNode.port -> targetNode.port.

edges:
# ── Basic edge ─────────────────────────────────────────────
- "read-leads.data -> filter.input"
# string, required format: "nodeId.portName -> nodeId.portName"
# The parser splits on " -> " (space-arrow-space).
# Port name is after the last dot in each side.
# ── Multiple edges ─────────────────────────────────────────
- "filter.output -> score.input"
- "score.output -> write.records"
# ── Indexed ports (for multi-input nodes) ──────────────────
- "source-a.output -> merge.inputs[0]"
- "source-b.output -> merge.inputs[1]"
# Bracket notation for indexed input ports.
# ── Fan-out (one output to multiple inputs) ────────────────
- "read.data -> branch-a.input"
- "read.data -> branch-b.input"
# Same output port can connect to multiple input ports.

The parser enforces these rules at load time:

  1. Every node must have a unique ID.
  2. Every node must have a type field.
  3. Edge source and target node IDs must exist in nodes.
  4. Edge port names must match declared ports on the node type.
  5. Connected ports must have compatible types (Value/Record/Table/Stream).
  6. Connected ports must have compatible schemas (see Type System).
  7. The graph must be a DAG — no cycles.
  8. name and version are required at top level.
name: minimal
version: 1
nodes:
greeting:
type: value.literal
valueType: string
value: "hello world"
edges: []
name: lead-scoring
version: 1
description: Read leads, filter active, score by engagement, export top tier
nodes:
read-leads:
type: file.source
path: data/leads.csv
format: csv
csvOptions:
delimiter: ","
hasHeader: true
filter-active:
type: data.filter
expression: "status = 'active' AND email IS NOT NULL"
score:
type: data.sql
query: |
SELECT *,
(clicks * 0.3 + opens * 0.5 + replies * 0.2) AS score
FROM input
ORDER BY score DESC
top-tier:
type: data.filter
expression: "score >= 80"
write-results:
type: file.write
path: output/qualified-leads.ndjson
format: ndjson
edges:
- "read-leads.data -> filter-active.input"
- "filter-active.output -> score.input"
- "score.output -> top-tier.input"
- "top-tier.output -> write-results.records"