Skip to content

SQL Transforms

transform.sql lets you write raw DuckDB SQL when built-in operations are not enough. You get the full power of DuckDB — window functions, CTEs, JSON extraction, regex, date arithmetic — while staying connected to the pipeline graph through ref() references.

Use SQL transforms for queries that combine multiple operations in a single step. For straightforward filter/sort/group tasks, prefer the dedicated data operations.

Input: One or more Tables (via ref()) Output: Table

nodes:
top-customers:
type: transform.sql
config:
query: |
SELECT
customer_id,
sum(amount) AS total_spend,
count(*) AS order_count
FROM ref('load-orders')
GROUP BY customer_id
HAVING sum(amount) > 1000
ORDER BY total_spend DESC

ref('node_id') references the output of an upstream node. Radhflow resolves it to the node’s NDJSON output file at execution time. Edges are created automatically from ref() calls — you do not need to declare them manually.

-- Single source
SELECT * FROM ref('clean-data') WHERE status = 'active'
-- Multiple sources
SELECT a.*, b.category_name
FROM ref('orders') a
JOIN ref('categories') b ON a.category_id = b.id

Node IDs in ref() must match existing node IDs in the pipeline. The type checker validates these references at parse time.

nodes:
ranked-sales:
type: transform.sql
config:
query: |
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY region
ORDER BY revenue DESC
) AS rank_in_region,
revenue / SUM(revenue) OVER (
PARTITION BY region
) AS pct_of_region
FROM ref('monthly-sales')
nodes:
cohort-analysis:
type: transform.sql
config:
query: |
WITH first_purchase AS (
SELECT
customer_id,
MIN(order_date) AS cohort_date
FROM ref('orders')
GROUP BY customer_id
),
labeled AS (
SELECT
o.*,
fp.cohort_date,
DATE_DIFF('month', fp.cohort_date, o.order_date) AS months_since
FROM ref('orders') o
JOIN first_purchase fp ON o.customer_id = fp.customer_id
)
SELECT
cohort_date,
months_since,
COUNT(DISTINCT customer_id) AS customers,
SUM(amount) AS revenue
FROM labeled
GROUP BY cohort_date, months_since
ORDER BY cohort_date, months_since
nodes:
parse-metadata:
type: transform.sql
config:
query: |
SELECT
id,
metadata->>'$.name' AS name,
CAST(metadata->>'$.score' AS DOUBLE) AS score,
json_array_length(metadata->'$.tags') AS tag_count
FROM ref('raw-events')
nodes:
extract-domains:
type: transform.sql
config:
query: |
SELECT
email,
regexp_extract(email, '@(.+)$', 1) AS domain,
CASE
WHEN email LIKE '%@gmail.com' THEN 'personal'
WHEN email LIKE '%@company.com' THEN 'internal'
ELSE 'other'
END AS email_type
FROM ref('contacts')
nodes:
daily-metrics:
type: transform.sql
config:
query: |
SELECT
DATE_TRUNC('day', created_at) AS day,
COUNT(*) AS events,
COUNT(DISTINCT user_id) AS unique_users,
DATE_DIFF('hour', MIN(created_at), MAX(created_at)) AS active_hours
FROM ref('event-log')
GROUP BY DATE_TRUNC('day', created_at)
ORDER BY day
nodes:
full-report:
type: transform.sql
config:
query: |
SELECT
o.id AS order_id,
c.name AS customer_name,
p.name AS product_name,
o.quantity,
o.quantity * p.unit_price AS line_total
FROM ref('orders') o
LEFT JOIN ref('customers') c ON o.customer_id = c.id
LEFT JOIN ref('products') p ON o.product_id = p.id
WHERE o.status != 'cancelled'
ORDER BY o.created_at DESC
ScenarioRecommended approach
Filter rows by a conditiondata.filter
Sort and take top Ndata.sort + data.limit
Join two tablesdata.join
Group and aggregatedata.group
Window ranking within groupstransform.sql
CTE-based multi-step logictransform.sql
Multi-table join with conditionstransform.sql
Regex extraction + conditional logictransform.sql
Anything with HAVING, QUALIFY, or LATERALtransform.sql