Key Concepts
You built a pipeline in the Quick Start that reads leads, filters them, and writes JSON. This page explains the concepts behind that pipeline so you can build your own.
Every example on this page uses the same lead-scoring scenario.
Pipelines
Section titled “Pipelines”A pipeline is a flow.yaml file. It defines a directed graph of nodes connected by edges. Data flows from sources through transforms to outputs.
# A pipeline with three nodesnodes: read-leads: type: source # ... filter-top: type: deterministic # ... write-output: type: deterministic # ...Pipelines are versioned in Git. They execute deterministically — no LLM runs at execution time.
A node is a unit of work. It has typed inputs, typed outputs, and an operation that transforms one into the other. Every node gets a human-readable slug as its ID.
score-leads: type: deterministic op: sql.query params: query: "SELECT *, score * weight AS final FROM leads" inputs: leads: { type: Table, from: ref(read-leads.leads) } outputs: scored: { type: Table }Node types:
| Type | Purpose | Example |
|---|---|---|
source | Ingests data from an external source | Read a CSV, call an API |
deterministic | Transforms data with no side effects | SQL query, filter, map |
llm | AI-generated at creation time, deterministic at runtime | Custom business logic |
conditional | Routes data based on a condition | Branch on a threshold |
service | Calls an external API | Push to CRM, send email |
manual | Requires human input before proceeding | Approval step |
Ports are the typed connection points on a node. Each input port declares what data type it accepts. Each output port declares what it produces. Types are checked before execution starts.
inputs: orders: { type: Table, from: ref(fetch-orders.orders) } threshold: { type: Value, from: ref(config.min-amount) }outputs: filtered: { type: Table }A port mismatch — connecting a Value output to a Table input — is caught before any code runs.
Edges connect one node’s output port to another node’s input port. You declare them implicitly through from references on input ports.
# This input declaration creates an edge:# clean-data.customers --> enrich.customersenrich: inputs: customers: { type: Table, from: ref(clean-data.customers) }Edges enforce type contracts. The output type of the source port must match the input type of the target port. The graph executor resolves all edges, validates types, and determines execution order automatically.
Data types
Section titled “Data types”Radhflow has four primitive data types. Every port uses one of them.
| Type | What it is | Example |
|---|---|---|
| Value | A single scalar — string, number, boolean | An API key, a threshold, a file path |
| Record | A single JSON object with named fields | One user profile, one config block |
| Table | An ordered collection of records (rows) | A CSV import, a query result, a report |
| Stream | An unbounded sequence of records | A webhook feed, a log tail, a queue |
Tables are the most common type. In the lead-scoring pipeline, every connection carries a Table — rows of leads flowing from one node to the next.
Schemas
Section titled “Schemas”Every Table and Record port has a schema. Schemas define the fields and their types using JSON Schema. They are stored as .schema.json files alongside the data.
nodes/read-leads/ schemas/ leads.schema.json # field definitions artifacts/ leads.ndjson # the dataA schema file:
{ "fields": { "name": { "type": "string" }, "email": { "type": "string" }, "score": { "type": "number" } }}Schemas are contracts. If filter-top expects a score field of type number and read-leads produces a score field of type string, validation fails before any node executes.
NDJSON
Section titled “NDJSON”NDJSON (Newline-Delimited JSON) is the interchange format for Tables. One JSON object per line. Human-readable. Diffable in Git. Parseable by every language.
{"name":"Alice","email":"alice@example.com","score":92}{"name":"Bob","email":"bob@example.com","score":45}{"name":"Carol","email":"carol@example.com","score":88}Every NDJSON file has a companion .schema.json that defines its structure. This pairing — data plus schema — is how Radhflow enforces type safety across node boundaries.
ref() is how you wire nodes together. It references an output port on another node using the pattern ref(node-id.port-name).
transform: inputs: raw: { type: Table, from: ref(fetch-data.rows) } config: { type: Record, from: ref(load-config.settings) }ref(fetch-data.rows) means: take the rows output from the fetch-data node and feed it into this input. The graph executor resolves these references, validates types, and determines execution order automatically.