Skip to content

MCP Server

Radhflow exposes an MCP server so AI agents can create, modify, and run pipelines programmatically.

The Model Context Protocol (MCP) is a standard for AI agents to interact with external tools. It provides typed tool definitions, structured inputs, and machine-readable responses. Instead of parsing CLI output, an agent calls a tool with structured arguments and gets back JSON.

Terminal window
rf mcp serve

Add to your MCP client config (Claude Code .claude/mcp.json, Cursor, or any MCP-compatible client):

{
"mcpServers": {
"radhflow": {
"command": "npx",
"args": ["@radh/flow-mcp"],
"env": { "RF_PROJECT_PATH": "/path/to/your/project" }
}
}
}

The server speaks standard MCP over stdio.

Creates a new project with a flow.yaml.

Parameters:

ParameterTypeRequiredDescription
namestringyesPipeline name. Lowercase, hyphens, no spaces.
yamlstringyesComplete flow.yaml content as a YAML string.

Example request:

{
"tool": "create_pipeline",
"arguments": {
"name": "lead-scoring",
"yaml": "name: lead-scoring\nversion: 1\nnodes:\n read-leads:\n type: file.source\n path: leads.csv\n format: csv\n score:\n type: data.sql\n query: \"SELECT *, clicks * 0.3 + opens * 0.5 AS score FROM input\"\nedges:\n - \"read-leads.data -> score.input\""
}
}

Example response:

{
"status": "created",
"pipeline": "lead-scoring",
"nodes": 2,
"edges": 1,
"path": "/rf/project/lead-scoring/flow.yaml"
}

Adds a node to an existing pipeline.

Parameters:

ParameterTypeRequiredDescription
pipelinestringyesPipeline name.
node_idstringyesNode ID. Lowercase, hyphens.
typestringyesNode type (e.g., data.sql, file.source).
configobjectyesType-specific configuration fields.

Example request:

{
"tool": "add_node",
"arguments": {
"pipeline": "lead-scoring",
"node_id": "filter-active",
"type": "data.filter",
"config": {
"expression": "status = 'active' AND email IS NOT NULL"
}
}
}

Example response:

{
"status": "added",
"pipeline": "lead-scoring",
"node_id": "filter-active",
"total_nodes": 3
}

Creates an edge between two ports.

Parameters:

ParameterTypeRequiredDescription
pipelinestringyesPipeline name.
sourcestringyesSource in nodeId.portName format.
targetstringyesTarget in nodeId.portName format.

Example request:

{
"tool": "connect_nodes",
"arguments": {
"pipeline": "lead-scoring",
"source": "read-leads.data",
"target": "filter-active.input"
}
}

Example response:

{
"status": "connected",
"edge": "read-leads.data -> filter-active.input",
"total_edges": 2
}

Executes the pipeline and returns results.

Parameters:

ParameterTypeRequiredDescription
pipelinestringyesPipeline name.
dry_runbooleannoIf true, validate without executing. Default: false.

Example request:

{
"tool": "run_pipeline",
"arguments": {
"pipeline": "lead-scoring",
"dry_run": false
}
}

Example response:

{
"status": "completed",
"duration_ms": 1240,
"nodes_executed": 3,
"results": {
"score": {
"output_rows": 847,
"artifact": "nodes/score/artifacts/output.ndjson"
}
}
}

Returns pipeline state and results.

Parameters:

ParameterTypeRequiredDescription
pipelinestringyesPipeline name.
run_idstringnoSpecific run ID. Default: latest run.

Example request:

{
"tool": "inspect_pipeline",
"arguments": {
"pipeline": "lead-scoring"
}
}

Example response:

{
"pipeline": "lead-scoring",
"last_run": {
"id": "run-20260304-001",
"status": "completed",
"duration_ms": 1240,
"nodes": {
"read-leads": { "status": "success", "output_rows": 1204 },
"filter-active": { "status": "success", "output_rows": 962 },
"score": { "status": "success", "output_rows": 962 }
}
}
}

Checks for errors without executing.

Parameters:

ParameterTypeRequiredDescription
pipelinestringyesPipeline name.
strictbooleannoEnable strict mode (flags security issues). Default: false.

Example request:

{
"tool": "validate_pipeline",
"arguments": {
"pipeline": "lead-scoring"
}
}

Response (valid):

{
"valid": true,
"nodes": 3,
"edges": 2,
"warnings": []
}

Response (invalid):

{
"valid": false,
"errors": [
{
"type": "schema_mismatch",
"edge": "read-leads.data -> score.input",
"message": "Field 'clicks' required by score.input but not in read-leads.data schema"
}
]
}

All tools return errors in a consistent format:

{
"error": true,
"code": "PIPELINE_NOT_FOUND",
"message": "Pipeline 'lead-scoring' does not exist"
}

Error codes:

CodeDescription
PIPELINE_NOT_FOUNDNamed pipeline does not exist.
PIPELINE_EXISTSPipeline already exists (on create).
INVALID_YAMLYAML syntax error in pipeline definition.
VALIDATION_FAILEDSchema or type validation failed. See errors array.
NODE_NOT_FOUNDReferenced node ID does not exist.
PORT_NOT_FOUNDReferenced port name does not exist on the node.
CYCLE_DETECTEDEdge creates a cycle in the graph.
EXECUTION_FAILEDPipeline execution failed. See errors array.

A typical AI agent workflow:

  1. Call validate_pipeline or inspect_pipeline to understand current state.
  2. Read the Pipeline Spec and Node Spec for schema details.
  3. Call create_pipeline with generated YAML (or add_node / connect_nodes to modify).
  4. Call validate_pipeline to check for errors.
  5. If errors, fix the YAML and re-validate.
  6. Call run_pipeline to execute.
  7. Call inspect_pipeline to check results and iterate if needed.

Every generated file is committed to Git for review.