Skip to content

Pipeline Spec

The complete flow.yaml specification for programmatic pipeline generation.

# flow.yaml — complete schema
# ─────────────────────────────────────────────
name: lead-scoring # string, required. Human-readable pipeline name.
# Lowercase, hyphens, no spaces.
version: 1 # integer, required. Increment on breaking changes.
description: > # string, optional. What this pipeline does.
Score inbound leads by engagement metrics
and push qualified leads to CRM.
nodes: # object, required. Map of node ID → node definition.
# ... (see Node schema below)
edges: # string[], required. List of edge strings.
# ... (see Edge schema below)
FieldTypeConstraints
namestringLowercase, hyphens, no spaces. Must be unique per project.
versionintegerPositive integer. Increment on breaking changes.
nodesobjectAt least one node. Keys are node IDs.
edgesstring[]Can be empty ([]). Each string follows edge format.
FieldTypeDefaultDescription
descriptionstring""Human-readable description.

Each key under nodes is the node ID. IDs are human-readable slugs: lowercase, hyphens, no spaces, no dots in the ID itself.

nodes:
read-leads: # string (node ID), required. Unique within the pipeline.
type: file.source # string, required. Node type identifier.
# Format: category.operation
# Categories: file, data, http, google, browser,
# cli, value, router, custom
# ── Type-specific config fields ──────────────────────────
# These vary by node type. See type reference below.
path: data/leads.csv # string. File path (file.source, file.write).
format: csv # string. File format: csv, json, ndjson.
query: "SELECT * FROM i" # string. SQL query (data.sql).
expression: "score > 80" # string. Filter expression (data.filter).
url: "https://api.ex/v1" # string. URL (http.request).
method: GET # string. HTTP method (http.request).
value: "hello" # any. Literal value (value.literal).
valueType: string # string. Value type (value.literal).
# ── Common optional fields ───────────────────────────────
spec: nodes/x/node.yaml # string, optional. Path to external node.yaml
# for custom nodes.
parallel: # object, optional. Parallel execution config.
over: input # string. Input port to split.
chunks: auto # integer | "auto". Number of parallel chunks.
merge: output # string. Output port to concatenate.
csvOptions: # object, optional. CSV parsing options.
delimiter: "," # string. Field delimiter.
hasHeader: true # boolean. First row is header.
quote: "\"" # string. Quote character.
TypeCategoryConfig fields
file.sourceFile I/Opath, format, csvOptions
file.writeFile I/Opath, format
data.sqlTransformquery
data.filterTransformexpression
data.mapTransformexpression
data.sortTransformfield, order
data.limitTransformcount
data.dedupTransformfields
data.joinTransformon, type
data.groupTransformby, aggregations
http.requestConnectorurl, method, headers, body
google.sheetsConnectorspreadsheetId, range, credentials
browser.extractConnectorurl, selector, waitFor
cli.runCLIcommand, env, sandbox
value.literalValuevalue, valueType
routerControlinput, routes
customCustomspec (path to node.yaml)

Edges are strings in the format sourceNode.port -> targetNode.port.

edges:
# ── Basic edge ─────────────────────────────────────────────
- "read-leads.data -> filter.input"
# string, required format: "nodeId.portName -> nodeId.portName"
# The parser splits on " -> " (space-arrow-space).
# Port name is after the last dot in each side.
# ── Multiple edges ─────────────────────────────────────────
- "filter.output -> score.input"
- "score.output -> write.records"
# ── Indexed ports (for multi-input nodes) ──────────────────
- "source-a.output -> merge.inputs[0]"
- "source-b.output -> merge.inputs[1]"
# Bracket notation for indexed input ports.
# ── Fan-out (one output to multiple inputs) ────────────────
- "read.data -> branch-a.input"
- "read.data -> branch-b.input"
# Same output port can connect to multiple input ports.

The parser enforces these rules at load time:

  1. name and version are required at top level.
  2. Every node must have a unique ID.
  3. Every node must have a type field.
  4. Edge source and target node IDs must exist in nodes.
  5. Edge port names must match declared ports on the node type.
  6. Connected ports must have compatible types (Value/Record/Table/Stream).
  7. Connected ports must have compatible schemas (see Type System).
  8. The graph must be a DAG — no cycles.
ErrorCauseFix
MISSING_FIELDRequired field (name, version, type) is absent.Add the missing field.
DUPLICATE_NODE_IDTwo nodes share the same ID.Rename one node.
UNKNOWN_NODE_TYPEtype value doesn’t match any known node type.Check the type reference table above.
INVALID_EDGE_FORMATEdge string doesn’t match a.b -> c.d format.Fix the edge syntax.
NODE_NOT_FOUNDEdge references a node ID not in nodes.Add the node or fix the typo.
PORT_NOT_FOUNDEdge references a port name not on the node type.Check port names for the node type.
TYPE_MISMATCHConnected ports have incompatible types.Match types (e.g., Table to Table).
SCHEMA_MISMATCHOutput schema missing fields required by input.Add missing fields to the source.
CYCLE_DETECTEDEdges form a cycle.Remove the back-edge.
name: minimal
version: 1
nodes:
greeting:
type: value.literal
valueType: string
value: "hello world"
edges: []

A realistic 5-node pipeline that reads leads, filters active ones, scores by engagement, takes the top tier, and writes results:

name: lead-scoring
version: 1
description: Read leads, filter active, score by engagement, export top tier
nodes:
read-leads:
type: file.source
path: data/leads.csv
format: csv
csvOptions:
delimiter: ","
hasHeader: true
filter-active:
type: data.filter
expression: "status = 'active' AND email IS NOT NULL"
score:
type: data.sql
query: |
SELECT *,
(clicks * 0.3 + opens * 0.5 + replies * 0.2) AS score
FROM input
ORDER BY score DESC
top-tier:
type: data.filter
expression: "score >= 80"
write-results:
type: file.write
path: output/qualified-leads.ndjson
format: ndjson
edges:
- "read-leads.data -> filter-active.input"
- "filter-active.output -> score.input"
- "score.output -> top-tier.input"
- "top-tier.output -> write-results.records"