Skip to content

Key Concepts

You built a pipeline in the Quick Start that reads leads, filters them, and writes JSON. This page explains the concepts behind that pipeline so you can build your own.

Every example on this page uses the same lead-scoring scenario.

A pipeline is a flow.yaml file. It defines a directed graph of nodes connected by edges. Data flows from sources through transforms to outputs.

flow.yaml
# A pipeline with three nodes
nodes:
read-leads:
type: source
# ...
filter-top:
type: deterministic
# ...
write-output:
type: deterministic
# ...

Pipelines are versioned in Git. They execute deterministically — no LLM runs at execution time.

A node is a unit of work. It has typed inputs, typed outputs, and an operation that transforms one into the other. Every node gets a human-readable slug as its ID.

flow.yaml
score-leads:
type: deterministic
op: sql.query
params:
query: "SELECT *, score * weight AS final FROM leads"
inputs:
leads: { type: Table, from: ref(read-leads.leads) }
outputs:
scored: { type: Table }

Node types:

TypePurposeExample
sourceIngests data from an external sourceRead a CSV, call an API
deterministicTransforms data with no side effectsSQL query, filter, map
llmAI-generated at creation time, deterministic at runtimeCustom business logic
conditionalRoutes data based on a conditionBranch on a threshold
serviceCalls an external APIPush to CRM, send email
manualRequires human input before proceedingApproval step

Ports are the typed connection points on a node. Each input port declares what data type it accepts. Each output port declares what it produces. Types are checked before execution starts.

inputs:
orders: { type: Table, from: ref(fetch-orders.orders) }
threshold: { type: Value, from: ref(config.min-amount) }
outputs:
filtered: { type: Table }

A port mismatch — connecting a Value output to a Table input — is caught before any code runs.

Edges connect one node’s output port to another node’s input port. You declare them implicitly through from references on input ports.

flow.yaml
# This input declaration creates an edge:
# clean-data.customers --> enrich.customers
enrich:
inputs:
customers: { type: Table, from: ref(clean-data.customers) }

Edges enforce type contracts. The output type of the source port must match the input type of the target port. The graph executor resolves all edges, validates types, and determines execution order automatically.

Radhflow has four primitive data types. Every port uses one of them.

TypeWhat it isExample
ValueA single scalar — string, number, booleanAn API key, a threshold, a file path
RecordA single JSON object with named fieldsOne user profile, one config block
TableAn ordered collection of records (rows)A CSV import, a query result, a report
StreamAn unbounded sequence of recordsA webhook feed, a log tail, a queue

Tables are the most common type. In the lead-scoring pipeline, every connection carries a Table — rows of leads flowing from one node to the next.

Every Table and Record port has a schema. Schemas define the fields and their types using JSON Schema. They are stored as .schema.json files alongside the data.

nodes/read-leads/
schemas/
leads.schema.json # field definitions
artifacts/
leads.ndjson # the data

A schema file:

leads.schema.json
{
"fields": {
"name": { "type": "string" },
"email": { "type": "string" },
"score": { "type": "number" }
}
}

Schemas are contracts. If filter-top expects a score field of type number and read-leads produces a score field of type string, validation fails before any node executes.

NDJSON (Newline-Delimited JSON) is the interchange format for Tables. One JSON object per line. Human-readable. Diffable in Git. Parseable by every language.

leads.ndjson
{"name":"Alice","email":"alice@example.com","score":92}
{"name":"Bob","email":"bob@example.com","score":45}
{"name":"Carol","email":"carol@example.com","score":88}

Every NDJSON file has a companion .schema.json that defines its structure. This pairing — data plus schema — is how Radhflow enforces type safety across node boundaries.

ref() is how you wire nodes together. It references an output port on another node using the pattern ref(node-id.port-name).

flow.yaml
transform:
inputs:
raw: { type: Table, from: ref(fetch-data.rows) }
config: { type: Record, from: ref(load-config.settings) }

ref(fetch-data.rows) means: take the rows output from the fetch-data node and feed it into this input. The graph executor resolves these references, validates types, and determines execution order automatically.