Pipeline Spec
The complete flow.yaml specification for programmatic pipeline generation.
Top-level schema
Section titled “Top-level schema”# flow.yaml — complete schema# ─────────────────────────────────────────────
name: lead-scoring # string, required. Human-readable pipeline name. # Lowercase, hyphens, no spaces.
version: 1 # integer, required. Increment on breaking changes.
description: > # string, optional. What this pipeline does. Score inbound leads by engagement metrics and push qualified leads to CRM.
nodes: # object, required. Map of node ID → node definition. # ... (see Node schema below)
edges: # string[], required. List of edge strings. # ... (see Edge schema below)Required fields
Section titled “Required fields”| Field | Type | Constraints |
|---|---|---|
name | string | Lowercase, hyphens, no spaces. Must be unique per project. |
version | integer | Positive integer. Increment on breaking changes. |
nodes | object | At least one node. Keys are node IDs. |
edges | string[] | Can be empty ([]). Each string follows edge format. |
Optional fields
Section titled “Optional fields”| Field | Type | Default | Description |
|---|---|---|---|
description | string | "" | Human-readable description. |
Node schema
Section titled “Node schema”Each key under nodes is the node ID. IDs are human-readable slugs: lowercase, hyphens, no spaces, no dots in the ID itself.
nodes: read-leads: # string (node ID), required. Unique within the pipeline.
type: file.source # string, required. Node type identifier. # Format: category.operation # Categories: file, data, http, google, browser, # cli, value, router, custom
# ── Type-specific config fields ────────────────────────── # These vary by node type. See type reference below.
path: data/leads.csv # string. File path (file.source, file.write). format: csv # string. File format: csv, json, ndjson.
query: "SELECT * FROM i" # string. SQL query (data.sql). expression: "score > 80" # string. Filter expression (data.filter).
url: "https://api.ex/v1" # string. URL (http.request). method: GET # string. HTTP method (http.request).
value: "hello" # any. Literal value (value.literal). valueType: string # string. Value type (value.literal).
# ── Common optional fields ───────────────────────────────
spec: nodes/x/node.yaml # string, optional. Path to external node.yaml # for custom nodes.
parallel: # object, optional. Parallel execution config. over: input # string. Input port to split. chunks: auto # integer | "auto". Number of parallel chunks. merge: output # string. Output port to concatenate.
csvOptions: # object, optional. CSV parsing options. delimiter: "," # string. Field delimiter. hasHeader: true # boolean. First row is header. quote: "\"" # string. Quote character.Node types reference
Section titled “Node types reference”| Type | Category | Config fields |
|---|---|---|
file.source | File I/O | path, format, csvOptions |
file.write | File I/O | path, format |
data.sql | Transform | query |
data.filter | Transform | expression |
data.map | Transform | expression |
data.sort | Transform | field, order |
data.limit | Transform | count |
data.dedup | Transform | fields |
data.join | Transform | on, type |
data.group | Transform | by, aggregations |
http.request | Connector | url, method, headers, body |
google.sheets | Connector | spreadsheetId, range, credentials |
browser.extract | Connector | url, selector, waitFor |
cli.run | CLI | command, env, sandbox |
value.literal | Value | value, valueType |
router | Control | input, routes |
custom | Custom | spec (path to node.yaml) |
Edge schema
Section titled “Edge schema”Edges are strings in the format sourceNode.port -> targetNode.port.
edges: # ── Basic edge ───────────────────────────────────────────── - "read-leads.data -> filter.input" # string, required format: "nodeId.portName -> nodeId.portName" # The parser splits on " -> " (space-arrow-space). # Port name is after the last dot in each side.
# ── Multiple edges ───────────────────────────────────────── - "filter.output -> score.input" - "score.output -> write.records"
# ── Indexed ports (for multi-input nodes) ────────────────── - "source-a.output -> merge.inputs[0]" - "source-b.output -> merge.inputs[1]" # Bracket notation for indexed input ports.
# ── Fan-out (one output to multiple inputs) ──────────────── - "read.data -> branch-a.input" - "read.data -> branch-b.input" # Same output port can connect to multiple input ports.Validation rules
Section titled “Validation rules”The parser enforces these rules at load time:
nameandversionare required at top level.- Every node must have a unique ID.
- Every node must have a
typefield. - Edge source and target node IDs must exist in
nodes. - Edge port names must match declared ports on the node type.
- Connected ports must have compatible types (Value/Record/Table/Stream).
- Connected ports must have compatible schemas (see Type System).
- The graph must be a DAG — no cycles.
Error conditions
Section titled “Error conditions”| Error | Cause | Fix |
|---|---|---|
MISSING_FIELD | Required field (name, version, type) is absent. | Add the missing field. |
DUPLICATE_NODE_ID | Two nodes share the same ID. | Rename one node. |
UNKNOWN_NODE_TYPE | type value doesn’t match any known node type. | Check the type reference table above. |
INVALID_EDGE_FORMAT | Edge string doesn’t match a.b -> c.d format. | Fix the edge syntax. |
NODE_NOT_FOUND | Edge references a node ID not in nodes. | Add the node or fix the typo. |
PORT_NOT_FOUND | Edge references a port name not on the node type. | Check port names for the node type. |
TYPE_MISMATCH | Connected ports have incompatible types. | Match types (e.g., Table to Table). |
SCHEMA_MISMATCH | Output schema missing fields required by input. | Add missing fields to the source. |
CYCLE_DETECTED | Edges form a cycle. | Remove the back-edge. |
Minimal valid pipeline
Section titled “Minimal valid pipeline”name: minimalversion: 1nodes: greeting: type: value.literal valueType: string value: "hello world"edges: []Complete example
Section titled “Complete example”A realistic 5-node pipeline that reads leads, filters active ones, scores by engagement, takes the top tier, and writes results:
name: lead-scoringversion: 1description: Read leads, filter active, score by engagement, export top tier
nodes: read-leads: type: file.source path: data/leads.csv format: csv csvOptions: delimiter: "," hasHeader: true
filter-active: type: data.filter expression: "status = 'active' AND email IS NOT NULL"
score: type: data.sql query: | SELECT *, (clicks * 0.3 + opens * 0.5 + replies * 0.2) AS score FROM input ORDER BY score DESC
top-tier: type: data.filter expression: "score >= 80"
write-results: type: file.write path: output/qualified-leads.ndjson format: ndjson
edges: - "read-leads.data -> filter-active.input" - "filter-active.output -> score.input" - "score.output -> top-tier.input" - "top-tier.output -> write-results.records"