Pipeline Engine
Event-driven pipeline automation with retry, checkpointing, and restart recovery.
Source: pkg/pipeline/
Overview
The pipeline engine executes multi-step workflows in response to DataEvent messages published via Redis Stream. Each pipeline is defined in YAML configuration and consists of a trigger event and an ordered sequence of steps that invoke capability operations.
DataEvent (MySQL data_events + Redis Stream)
│
▼
Pipeline Engine (pkg/pipeline/engine.go)
│
├── Idempotency check (event_consumptions)
├── Create pipeline_run record
├── For each step:
│ ├── Save checkpoint (if resumable)
│ ├── Render template params
│ ├── Create step_run record
│ ├── ability.Invoke (with retry)
│ └── Update step_run result
└── Update pipeline_run status
YAML Schema
Pipelines are defined under the pipelines key in flowbot.yaml:
pipelines:
- name: rss_fetch_and_notify # unique name, used as consumer_name
description: "Fetch RSS feeds and send notification"
enabled: true # false to skip loading
resumable: true # enable checkpoint + restart recovery
trigger:
event: rss.fetch.requested # DataEvent.EventType to match
steps:
- name: fetch_feeds
capability: rss # capability type
operation: fetch # operation name
params: # template-rendered input
url: "{{event.url}}"
max_items: 10
retry: # step-level retry (optional)
max_attempts: 3
delay: 1s
backoff: exponential # fixed | linear | exponential
max_delay: 60s
jitter: true
retry_on: # filter which errors to retry
- timeout
- rate_limited
- name: send_notification
capability: notify
operation: send
params:
channel: slack
message: "New feeds: {{step "fetch_feeds" "count"}}"
Retry Strategy
Configuration
Each pipeline step can specify an optional retry block. If omitted, the step runs exactly once.
| Field | Type | Default | Description |
|---|---|---|---|
max_attempts |
int | 0 |
Maximum retry attempts. 0 disables retry. |
delay |
duration | 0s |
Initial delay before first retry |
backoff |
string | "exponential" |
fixed (constant delay), linear (multiplier=1.0), exponential (multiplier=2.0) |
max_delay |
duration | 0s |
Caps the delay between retries |
jitter |
bool | false |
Adds +/-50% randomization to delay |
retry_on |
[]string | (all errors) | Filter: only retry errors matching these codes or with Retryable=true |
Behavior
- On first failure, the engine checks if
max_attempts > 0. - If
retry_onis set, the error is checked against the filter:types.Error.Retryable == truealways qualifies for retry.types.Error.Codeis matched againstretry_onentries.- If no filter is configured, all errors are retried.
- The engine waits for the computed delay, then retries.
- Retries continue until success,
max_attemptsis exhausted, or context is cancelled. - Each attempt is recorded (the
attemptcolumn onpipeline_step_runs).
Backoff Calculation
Fixed (backoff: fixed): every retry waits the same delay.
Linear (backoff: linear): delay = delay * attempt_number, capped at max_delay.
Exponential (backoff: exponential): delay = delay * 2^(attempt-1), capped at max_delay.
Jitter is only applied to linear and exponential modes (built on ExponentialBackOff).
Database Recording
The pipeline_step_runs table tracks retries:
| Column | Description |
|---|---|
attempt |
Count of attempts including the first (1-based) |
retry_config |
JSON snapshot of the retry configuration used |
Checkpointing
Enabling
Set resumable: true on the pipeline definition. Without this flag, no checkpoints are saved.
Save Mechanics
Before each step executes, the engine serializes and persists a CheckpointData JSON blob to pipeline_runs.checkpoint_data:
{
"step_index": 2,
"step_results": {
"fetch_feeds": {
"name": "fetch_feeds",
"capability": "rss",
"operation": "fetch",
"output": {"count": 42, "items": [...]},
"completed_at": "2026-05-03T10:00:00Z"
}
},
"event": { /* the triggering DataEvent */ },
"heartbeat_at": "2026-05-03T10:00:05Z"
}
This captures enough state to reconstruct the RenderContext and resume from the checkpointed step.
Heartbeat
When resumable: true, each step starts a background goroutine that writes last_heartbeat to pipeline_runs every 30 seconds. This allows the Recovery Manager to distinguish genuinely in-progress runs from orphaned ones.
Heartbeats stop automatically when the step completes (context cancelled via defer hbCancel()).
Resume
Engine.ResumePipeline(ctx, runID) restores execution from the last checkpoint:
- Loads
pipeline_runsto get the pipeline name. - Loads
checkpoint_datato get step index, step results, and the original event. - Matches the pipeline definition by name.
- Reconstructs
RenderContextfrom saved step results. - Continues executing from
step_index, saving new checkpoints along the way.
Execution States
| State | Value | Meaning |
|---|---|---|
PipelineStateUnknown |
0 | Default |
PipelineStart |
1 | Run in progress |
PipelineDone |
2 | All steps succeeded |
PipelineCancel |
3 | Step failed or run cancelled |
Database Tables
| Table | Purpose |
|---|---|
pipeline_definitions |
Persisted YAML definitions (upserted on startup) |
pipeline_runs |
One row per pipeline execution: status, error, checkpoint, heartbeat |
pipeline_step_runs |
Per-step execution: params, result, attempt, status, error |
event_consumptions |
Idempotency guard: (consumer_name, event_id) unique |
Event Flow
ability.InvokereturnsInvokeResultwithEvents— a list of business events (bookmark.created,rss.item.fetched, etc.).- The event emitter (registered in
initPipeline) creates aDataEventand persists it:data_eventstable (durable store)event_outboxtable (transactional outbox pattern)
- The event is published to Redis Stream topic
pipeline:data_event. - A Watermill consumer deserializes the event and calls
engine.Handler(). - The engine runs
FindByEventto match pipeline definitions, executes them sequentially.
Idempotency
Each pipeline run is gated by event_consumptions which has a unique composite index on (consumer_name, event_id). Before execution:
HasConsumed(pipelineName, eventID)checks if this pipeline already processed this event.- If consumed, the event is skipped (logged, no error).
- Otherwise,
RecordConsumptioninserts a row, then the pipeline executes.
This guarantees at-most-once processing per (pipeline, event) pair.
Template Rendering
Step params are rendered through the template engine before each invocation. See Pipeline Template Engine for syntax reference.
Testing
go test ./pkg/pipeline/... # Unit tests
go test ./pkg/pipeline/template/... # Template engine tests
Recovery
See Recovery Manager for restart recovery of incomplete pipeline runs.