Notification Gateway
Unified notification gateway with template-based message rendering, platform-specific overrides, Redis-backed rate limiting, time-window aggregation, and mute/DND rules to prevent notification fatigue.
Source: pkg/notify/, pkg/notify/template/, pkg/notify/rules/, pkg/ability/notify/
Overview
Homelab monitoring produces high-frequency events: disk alerts, download completions, RSS updates, agent status changes. Passively forwarding every event to Slack, Telegram, or ntfy causes notification fatigue – users mute channels and miss critical alerts.
The Notification Gateway inserts a processing layer between event producers and notification providers to:
- Separate data from presentation – templates define message formatting, callers provide structured payloads
- Rate-limit repetitive alerts – prevent a buggy script from sending 1000 messages in one minute
- Aggregate batched events – collapse 20 RSS fetch events into a single digest every 15 minutes
- Honor DND windows – silence all notifications during night hours
[Pipeline / Cron / Webhook / Agent] │ ▼ ┌─────────────────────────────────────────────┐ │ Notification Gateway │ │ │ │ ┌───────────────────────────────────────┐ │ │ │ Rule Engine (pkg/notify/rules/) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌────────┐ │ │ │ │ │ Mute/DND │ │ Throttle │ │Aggregate│ │ │ │ │ └──────────┘ └──────────┘ └────────┘ │ │ │ └─────────────────┬─────────────────────┘ │ │ ▼ │ │ ┌───────────────────────────────────────┐ │ │ │ Template Engine (pkg/notify/template/)│ │ │ │ Sprig functions + per-channel overrides│ │ │ └─────────────────┬─────────────────────┘ │ │ ▼ │ │ ┌───────────────────────────────────────┐ │ │ │ Channel Router (pkg/notify/) │ │ │ │ Existing Notifyer registry │ │ │ └───────────────────────────────────────┘ │ └─────────────────────────────────────────────┘ │ ▼ [Slack Webhook] [ntfy] [Pushover] [Message Pusher]
Architecture
Data Flow
Caller (module, cron, pipeline)
│ notify.GatewaySend(ctx, uid, templateID, channels, payload)
▼
┌──────────────────────────────────────────────────────────┐
│ GatewaySend │
│ 1. Resolve template by ID (template.Engine) │
│ 2. For each channel: │
│ a. Evaluate rules (rules.Engine) │
│ - Drop → skip │
│ - Mute → skip │
│ - Throttle → check Redis counter, skip if limited │
│ - Aggregate → push to Redis List, set timer │
│ b. Render template for channel (template.Engine) │
│ c. Look up user channel config from store │
│ d. Send via existing notify.Send() │
│ 3. Background Worker: │
│ - Scans expired aggregate timers every 60s │
│ - Flushes buffered items, renders digest template │
│ - Sends single aggregated message │
└──────────────────────────────────────────────────────────┘
Pipeline Integration
The gateway can be invoked from pipeline steps via the notify capability:
pipelines:
- name: bookmark-notify
enabled: true
trigger:
event: bookmark.created
steps:
- name: send-notification
capability: notify
operation: send
params:
template_id: "bookmark.created"
channels:
- slack
- ntfy
payload: "{{ .Event.data }}"
This uses the Pipeline Template Engine to pass event data through to the notification template.
Direct Invocation
Non-pipeline code (cron jobs, webhook handlers, agent actions) calls GatewaySend directly:
import "github.com/flowline-io/flowbot/pkg/notify"
err := notify.GatewaySend(ctx.Context(), ctx.AsUser, "server.offline", []string{"slack", "ntfy"}, map[string]any{
"hostname": item.Hostname,
"hostid": item.Hostid,
})
Template Engine
The template engine renders notification messages using Go text/template with the Sprig function library. Sprig provides 70+ template functions for string manipulation, date formatting, math, and type conversion.
Template Schema
Templates are defined in flowbot.yaml under the notify.templates key:
notify:
templates:
- id: bookmark.created
name: "New Bookmark Notification"
description: "Triggered when a bookmark is successfully created"
default_format: markdown
default_template: |
**New Bookmark Saved**
**URL:** {{ .url | default "N/A" }}
{{ if .title }}**Title:** {{ .title }}{{ end }}
overrides:
- channel: telegram
format: html
template: |
<b>New Bookmark Saved</b>
<b>URL:</b> <a href="{{ .url }}">{{ .url }}</a>
Field Reference
| Field | Type | Required | Description |
| —————— | —— | ——– | —————————————————– | ————— |
| id | string | yes | Unique template identifier (e.g., bookmark.created) |
| name | string | yes | Human-readable display name |
| description | string | no | Text describing when this template is used |
| default_format | string | yes | Output format: markdown or html |
| default_template | string | yes | Sprig template body (YAML | block scalar) |
| overrides | array | no | Per-channel template overrides |
Override Fields
| Field | Type | Required | Description |
|---|---|---|---|
channel |
string | yes | Channel name: slack, telegram, email |
format |
string | yes | Output format for this channel |
template |
string | yes | Channel-specific template body |
Template Data Context
Template payload data is accessed via {{ .key }} dot-notation. The payload is a map[string]any passed by the caller:
{{ .title }} -- string field
{{ .url | default "N/A" }} -- with fallback
{{ .tags | join ", " }} -- join a string slice
{{ .count | default 0 }} -- integer with default
{{ .name | upper }} -- uppercase transform
{{ shorten .text 80 }} -- truncate with "..."
{{ if .urgent }}URGENT: {{ end }}{{ .title }} -- conditional
Available Sprig Functions
All Sprig string functions, date functions, math functions, and list functions are available. Commonly used:
| Function | Description | Example |
|---|---|---|
upper str |
Uppercase | {{ .name \| upper }} |
lower str |
Lowercase | {{ .category \| lower }} |
default val default |
Default for nil/empty | {{ .title \| default "Untitled" }} |
join sep elems |
Join slice into string | {{ .tags \| join ", " }} |
date format time |
Format a time value | {{ now \| date "2006-01-02" }} |
now |
Current time | {{ now \| date "15:04" }} |
trunc n str |
Truncate to length | {{ .body \| trunc 100 }} |
contains str substr |
Check substring | {{ if contains .body "ERROR" }} |
replace old new str |
Replace substring | {{ .url \| replace "http:" "https:" }} |
quote str |
Wrap in double quotes | {{ .title \| quote }} |
toJson val |
Marshal to JSON | {{ .meta \| toJson }} |
indent n str |
Indent each line | {{ .body \| indent 2 }} |
Custom Functions
| Function | Description |
|---|---|
shorten str maxLen |
Truncate and append "..." (min output length 4) |
Template Rendering Logic
- Gateway passes
templateIDandchannelto the engine - Engine looks up the channel-specific override first
- If no override exists for the channel, uses the default template
- Template receives the payload as
.and renders viatext/template.Execute() - Output includes
Title(first line, stripped of markdown formatting),Body(full rendered output), andFormat
Rule Engine
Rules are evaluated before any notification is sent. They are defined in flowbot.yaml under notify.rules and processed in priority order (higher priority first).
Rule Schema
notify:
rules:
- id: "night_mute"
action: mute
match:
event: "*"
channel: "*"
condition: "time.hour >= 23 || time.hour < 8"
priority: 100
Rule Fields
| Field | Type | Required | Description |
|---|---|---|---|
id |
string | yes | Unique rule identifier |
action |
string | yes | mute, throttle, aggregate, or drop |
match |
object | yes | Event and channel matching criteria |
condition |
string | no | Time-based expression for conditional rules |
priority |
int | yes | Evaluation order (higher = first) |
params |
object | no | Action-specific parameters (see below) |
Match Fields
| Field | Type | Description |
|---|---|---|
event |
string | Event type pattern: exact match, * for all, prefix.* for prefix, *.suffix for suffix |
channel |
string | Channel pattern: same glob syntax as event match |
Match Examples
| Pattern | Matches |
|---|---|
* |
Everything |
bookmark.created |
Exact event type only |
infra.* |
infra.host.down, infra.host.up, etc. |
*.created |
bookmark.created, kanban.task.created |
server.* |
server.offline, server.online |
Rule Actions
Mute (DND)
Suppresses all matching notifications when the time condition is met. Useful for night-time silence.
- id: "night_mute"
action: mute
match:
event: "*"
channel: "*"
condition: "time.hour >= 23 || time.hour < 8"
priority: 100
Condition syntax: time.hour >= N, time.hour < N, time.hour == N connected with || (OR) and && (AND).
Throttle
Limits how many notifications of a specific type are sent within a time window. Uses Redis INCR with TTL for atomic counting.
- id: "infra_throttle"
action: throttle
match:
event: "infra.*"
channel: "*"
priority: 50
params:
window: "5m"
limit: 1
Throttle parameters:
| Field | Type | Required | Description |
|---|---|---|---|
window |
string | yes | Time window (Go duration format) |
limit |
int | yes | Max messages in window |
Redis key pattern: notify:throttle:{ruleID}:{eventType}:{channel}
Aggregate
Buffers individual events into a Redis List and flushes them as a single digest message when the window expires. A background worker scans for expired timers every 60 seconds.
- id: "download_batch"
action: aggregate
match:
event: "download.completed"
channel: "telegram"
priority: 40
params:
window: "15m"
digest_template_id: "download.digest"
Aggregate parameters:
| Field | Type | Required | Description |
|---|---|---|---|
window |
string | yes | Aggregation window |
digest_template_id |
string | no | Template for the digest message |
Redis key pattern: notify:agg:{ruleID}:{eventType}:{channel} (List), notify:agg:timer:{ruleID}:{eventType}:{channel} (timer key with TTL)
The digest template receives a .items field containing all aggregated payloads:
**Digest: {{ len .items }} items in the last 15 minutes**
{{ range .items }}
- {{ .title }} ({{ .size }})
{{ end }}
Drop
Silently discards matching notifications. Useful for suppressing known noise.
- id: "drop_test_events"
action: drop
match:
event: "test.*"
channel: "*"
priority: 10
Rule Evaluation Order
Rules are sorted by priority descending. The first matching rule wins. If a higher-priority mute rule matches, lower-priority throttle or aggregate rules are never evaluated.
Example with two rules:
- id: "night_mute"
action: mute
match: { event: "*", channel: "*" }
condition: "time.hour >= 23 || time.hour < 8"
priority: 100 # Evaluated first
- id: "infra_throttle"
action: throttle
match: { event: "infra.*", channel: "*" }
params: { window: "5m", limit: 1 }
priority: 50 # Only evaluated if mute doesn't match
At 2 PM: night_mute condition is false, so infra_throttle applies to infra.host.down events.
At 1 AM: night_mute condition is true, all notifications are silenced regardless of other rules.
Configuration
Minimum Setup
- Add templates to
flowbot.yaml:
notify:
templates:
- id: bookmark.created
name: "Bookmark Created"
default_format: markdown
default_template: |
**New Bookmark**
{{ .url }}
- Add optional rules:
rules: [] # or add rules as needed
- Configure at least one notification channel per user via the
notify configchat command:/notify config name: slack template: slack://tokenA/tokenB/tokenC
Full Configuration Reference
See config/notify.yaml for a complete template and rule configuration example.
Predefined Templates
The following templates are built into the reference configuration. Add them to your flowbot.yaml to enable notifications for common events.
| Template ID | Trigger | Key Payload Fields |
|---|---|---|
bookmark.created |
Bookmark created | url, title |
bookmark.archived |
Bookmark archived | id, title |
archive.item.added |
ArchiveBox item added | url, title |
kanban.task.created |
Kanban task created | title, project_id, description |
reader.news.summary |
Daily RSS news summary | body |
server.offline |
Server offline detection | hostname, hostid |
finance.transaction |
Finance webhook received | amount, currency, category, payee, account, date |
github.deployment |
GitHub deployment triggered | user, repo, build, drone_url |
agent.status |
Agent online/offline/message | hostid, hostname, status, message |
cron.output |
Generic cron job output | body, cron_job |
Usage Patterns
From Pipeline Steps
pipelines:
- name: notify-on-bookmark
enabled: true
trigger:
event: bookmark.created
steps:
- name: send-notify
capability: notify
operation: send
params:
template_id: "bookmark.created"
channels: ["slack", "ntfy"]
payload: "{{ .Event.data }}"
From Cron Jobs
func(ctx types.Context) []types.MsgPayload {
// ... fetch data ...
err := notify.GatewaySend(ctx.Context(), ctx.AsUser, "reader.news.summary",
[]string{"slack", "ntfy"}, map[string]any{
"body": summaryText,
"entry_count": count,
})
if err != nil {
flog.Error(err)
}
return nil
}
From Webhook Handlers
err := notify.GatewaySend(ctx.Context(), ctx.AsUser, "finance.transaction",
[]string{"slack", "ntfy"}, map[string]any{
"amount": payload.Amount,
"currency": payload.Currency,
"category": payload.Category,
})
From Agent Actions
err := notify.GatewaySend(ctx.Context(), uid, "agent.status",
[]string{"slack", "ntfy"}, map[string]any{
"hostid": hostid,
"hostname": hostname,
"status": "online",
})
Integration Points
Call Sites Migrated from event.SendMessage
The following 13 code locations formerly used event.SendMessage() and now route through notify.GatewaySend():
| Module | File | Template ID |
|---|---|---|
| bookmark | event.go:26 |
bookmark.archived |
| bookmark | event.go:46 |
bookmark.created |
| bookmark | event.go:66 |
archive.item.added |
| kanban | event.go:45 |
kanban.task.created |
| reader | cron.go:116 |
reader.news.summary |
| server | cron.go:165 |
server.offline |
| finance | webhook.go:76 |
finance.transaction |
| github | utils.go:33 |
github.deployment |
| server (internal) | func.go:328 |
agent.status |
| server (internal) | func.go:418 |
agent.status |
| server (internal) | func.go:447 |
agent.status |
| server (internal) | func.go:457 |
agent.status |
| cron ruleset | cron.go:209 |
cron.output |
Interactive chat messages (command responses, form interactions) continue to use event.SendMessage() directly.
Redis Usage
The rule engine uses Redis for three state-tracking patterns:
| Pattern | Data Structure | Key Format | TTL |
|---|---|---|---|
| Throttle | String (counter) | notify:throttle:{rule}:{event}:{channel} |
Rule window |
| Aggregate | List (buffer) | notify:agg:{rule}:{event}:{channel} |
Manual del |
| Timers | String (sentinel) | notify:agg:timer:{rule}:{event}:{channel} |
Rule window |
Throttle counters use atomic INCR with EXPIRE on first increment, avoiding TOCTOU race conditions. Aggregate lists are flushed by a background worker that scans for expired timer keys.
Error Handling
- Missing template: returns
types.ErrNotFound, gateway logs warning and skips - Template parse error: caught at startup during
notifytmpl.Init(), server fails to start - Render failure: logged per-channel, other channels continue
- Channel not configured: logged and skipped, no error returned
- Redis unavailable: throttle/aggregate operations fail-open (notifications allowed through)
- Invalid rule window: logged and rule is skipped (fails-open)
Testing
# Template engine tests (11 test cases)
go test ./pkg/notify/template/...
# Rule engine tests (5 test cases)
go test ./pkg/notify/rules/...
# Existing notify tests (13 test cases)
go test ./pkg/notify/...
# Full suite
go tool task test