Production metrics pipeline for AI agents with OpenTelemetry, Kafka, ClickHouse, and Metabase

Overview
This build logs AI agent metrics in near real time, scales to millions of events/day, and keeps query latency under 200 ms. It uses:

– OpenTelemetry SDKs to emit spans/metrics
– OpenTelemetry Collector to batch and forward
– Kafka for durable buffering
– ClickHouse for fast analytics
– Metabase for dashboards and alerts

Use cases
– Track request latency, token usage, cost per endpoint
– Error rates by model/provider
– Agent step timings and tool success
– Customer-level SLOs and anomaly alerts

Architecture
– Services/agents emit OTLP spans and metrics.
– Otel Collector scrubs PII, batches, retries.
– Kafka provides backpressure and replay.
– ClickHouse ingests via Kafka Engine to a MergeTree.
– Metabase connects directly to ClickHouse.

Data model
Two core tables keep it simple and fast:

– agent_events: per request/step event (span-like)
– ts (DateTime64), trace_id, span_id, parent_span_id
– service, env, agent, model, provider
– route, user_id_hash, status, error_type
– latency_ms, tokens_prompt, tokens_completion, cost_usd
– attrs (JSON)

– agent_metrics_1m: rollups by minute
– ts_min, service, env, agent, model, route
– calls, p50_ms, p95_ms, error_rate, tokens_total, cost_usd

ClickHouse DDL
— Raw Kafka topic table
CREATE TABLE kafka_agent_events (
ts DateTime64(3),
trace_id String,
span_id String,
parent_span_id String,
service LowCardinality(String),
env LowCardinality(String),
agent LowCardinality(String),
model LowCardinality(String),
provider LowCardinality(String),
route LowCardinality(String),
user_id_hash FixedString(32),
status LowCardinality(String),
error_type LowCardinality(String),
latency_ms UInt32,
tokens_prompt UInt32,
tokens_completion UInt32,
cost_usd Decimal(12,6),
attrs JSON
) ENGINE = Kafka
SETTINGS
kafka_broker_list = ‘kafka:9092’,
kafka_topic_list = ‘agent_events_v1’,
kafka_group_name = ‘ch_agent_events_ingest’,
kafka_format = ‘JSONEachRow’,
kafka_num_consumers = 4;

— Final storage
CREATE TABLE agent_events (
ts DateTime64(3),
trace_id String,
span_id String,
parent_span_id String,
service LowCardinality(String),
env LowCardinality(String),
agent LowCardinality(String),
model LowCardinality(String),
provider LowCardinality(String),
route LowCardinality(String),
user_id_hash FixedString(32),
status LowCardinality(String),
error_type LowCardinality(String),
latency_ms UInt32,
tokens_prompt UInt32,
tokens_completion UInt32,
cost_usd Decimal(12,6),
attrs JSON
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (ts, service, env, agent, route)
TTL ts + INTERVAL 90 DAY DELETE;

— Streaming materialization
CREATE MATERIALIZED VIEW mv_agent_events TO agent_events AS
SELECT
ts, trace_id, span_id, parent_span_id,
service, env, agent, model, provider, route,
user_id_hash, status, error_type,
latency_ms, tokens_prompt, tokens_completion, cost_usd, attrs
FROM kafka_agent_events;

— Rollup by minute
CREATE MATERIALIZED VIEW mv_agent_metrics_1m
ENGINE = SummingMergeTree
PARTITION BY toYYYYMM(ts_min)
ORDER BY (ts_min, service, env, agent, model, route)
AS
SELECT
toStartOfMinute(ts) AS ts_min,
service, env, agent, model, route,
count() AS calls,
quantileExact(0.5)(latency_ms) AS p50_ms,
quantileExact(0.95)(latency_ms) AS p95_ms,
sumIf(1, status != ‘ok’) / count() AS error_rate,
sum(tokens_prompt + tokens_completion) AS tokens_total,
sum(cost_usd) AS cost_usd
FROM agent_events
GROUP BY ts_min, service, env, agent, model, route;

OpenTelemetry Collector config
Receives spans/metrics over OTLP, drops PII, batches, pushes to Kafka.

receivers:
otlp:
protocols:
http:
grpc:

processors:
attributes:
actions:
– key: user_email
action: delete
– key: user_id
action: hash
batch:
timeout: 2s
send_batch_size: 5000
memory_limiter:
check_interval: 5s
limit_mib: 512
spike_limit_mib: 256
transform:
error_mode: ignore
traces:
– set(name, Concat(attributes[“agent”], “:”, attributes[“route”])) where name == “”

exporters:
kafka:
brokers: [ “kafka:9092” ]
topic: agent_events_v1
encoding: json
balancer: round_robin
retry_on_failure:
enabled: true
max_elapsed_time: 120s

service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, attributes, transform, batch]
exporters: [kafka]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [kafka]

Client emission example (Python)
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor

provider = TracerProvider(resource=Resource.create({
“service.name”: “api”,
“deployment.environment”: “prod”,
“agent”: “order_bot”,
“model”: “gpt-4.1”,
“provider”: “openai”,
“route”: “POST /v1/answer”
}))
trace.set_tracer_provider(provider)
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=”http://otel-collector:4318/v1/traces”))
provider.add_span_processor(processor)

with trace.get_tracer(“api”).start_as_current_span(“agent_step”) as span:
# do work…
span.set_attribute(“tokens_prompt”, 512)
span.set_attribute(“tokens_completion”, 128)
span.set_attribute(“cost_usd”, 0.0032)
span.set_attribute(“status”, “ok”)
span.set_attribute(“user_id”, “12345”) # will be hashed by Collector

Metabase setup
– Add ClickHouse as a database (native driver).
– Set sync/scan to hourly for new fields.
– Create questions using native SQL.

Useful queries
– Errors by model (last 24h):
SELECT model, provider, count() AS errors
FROM agent_events
WHERE ts > now() – INTERVAL 1 DAY AND status != ‘ok’
GROUP BY model, provider
ORDER BY errors DESC
LIMIT 20;

– P95 latency by route (last 7d):
SELECT route, round(quantile(0.95)(latency_ms)) AS p95_ms, count() AS calls
FROM agent_events
WHERE ts > now() – INTERVAL 7 DAY
GROUP BY route
ORDER BY p95_ms DESC;

– Cost by customer (requires user_id map):
SELECT user_id_hash, sum(cost_usd) AS cost
FROM agent_events
WHERE ts > now() – INTERVAL 30 DAY
GROUP BY user_id_hash
ORDER BY cost DESC
LIMIT 50;

– SLO burn (errors > 2% in 15m windows):
SELECT
toStartOfInterval(ts, INTERVAL 15 minute) AS win,
round(avg(status != ‘ok’) * 100, 2) AS error_pct
FROM agent_events
WHERE ts > now() – INTERVAL 1 DAY
GROUP BY win
ORDER BY win DESC;

Alerting pattern
– Use a saved question with a threshold and Metabase email/Slack alerts.
– For low-latency alerts, add a lightweight worker polling ClickHouse every minute and posting to Slack webhook if error_pct > threshold.

Performance notes
– ClickHouse tuning:
– Use LowCardinality for strings; keep ORDER BY narrow and aligned with filters.
– Prefer DateTime64 for precise latency windows.
– Use TTL to control storage; hot data on fast disks.
– Kafka:
– Start with 3 partitions; increase to match ingest QPS.
– Retain 24–48h for replay.
– Otel Collector:
– Batch size 5k–10k; enable compression if cross-AZ.
– Cost:
– ClickHouse on a single NVMe host handles 50–100k events/sec; scale via shards + replicas.

Security and compliance
– Hash or drop user PII at the Collector.
– Restrict Metabase to read-only users.
– Use network policies or private subnets; encrypt Kafka and ClickHouse at rest and in transit.
– Rotate Kafka credentials and ClickHouse users; audit queries.

Deployment tips
– Docker Compose for a single-node pilot; Terraform + Kubernetes for prod.
– Health checks: Kafka consumer lag, Otel Collector queue size, ClickHouse insert delays.
– Backfill by producing historical JSON to the Kafka topic; ClickHouse will materialize.

What to build next
– Enrich events with billing account or feature flags.
– Add model drift metrics (output length, refusal rate).
– Expose a public status dashboard per-customer via Metabase embeds.

Ship a Low-Latency Metrics Pipeline for AI Agents with OpenTelemetry, ClickHouse, and Metabase

Goal
– Track agent runs, tool calls, latency, errors, tokens, and costs with second-level freshness.
– Keep ingestion cheap, queries fast, and schema stable as features evolve.

Reference architecture (text)
– Clients (agents, workers, webhooks) → OpenTelemetry SDK → OTLP HTTP → OpenTelemetry Collector → ClickHouse (native) → Metabase dashboards and alerts.
– Optional: Kafka between Collector and ClickHouse for burst smoothing.

Data model (minimally sufficient, append-only)
– events
– event_id (UUID)
– ts (DateTime64)
– app (String)
– env (Enum: prod, staging)
– run_id (UUID)
– type (Enum: run_started, run_finished, tool_call, error, token_usage)
– agent (String)
– user_id (String or UUID nullable)
– status (Enum: ok, error, timeout nullable)
– latency_ms (UInt32 nullable)
– tokens_prompt, tokens_completion (UInt32 nullable)
– cost_usd (Decimal(12,6) nullable)
– meta (JSON)

ClickHouse DDL
CREATE TABLE events
(
event_id UUID,
ts DateTime64(3, ‘UTC’),
app LowCardinality(String),
env Enum8(‘prod’ = 1, ‘staging’ = 2),
run_id UUID,
type LowCardinality(String),
agent LowCardinality(String),
user_id String,
status LowCardinality(String),
latency_ms UInt32,
tokens_prompt UInt32,
tokens_completion UInt32,
cost_usd Decimal(12,6),
meta JSON
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (app, env, agent, ts)
TTL ts + INTERVAL 90 DAY DELETE
SETTINGS index_granularity = 8192;

Materialized views (rollups for fast dashboards)
CREATE MATERIALIZED VIEW mv_runs_hour
ENGINE = SummingMergeTree
PARTITION BY toYYYYMM(dt)
ORDER BY (app, env, agent, dt)
AS
SELECT
app, env, agent,
toStartOfHour(ts) AS dt,
countIf(type = ‘run_finished’) AS runs,
countIf(status = ‘error’) AS errors,
sumIf(latency_ms, type = ‘run_finished’) AS p50_proxy, — use quantiles below for real pXX
sum(tokens_prompt) AS tokens_prompt,
sum(tokens_completion) AS tokens_completion,
sum(cost_usd) AS cost_usd
FROM events
GROUP BY app, env, agent, dt;

For real latency percentiles, use AggregatingMergeTree with quantileState/quantileMerge:
– Store quantileState(0.5), (0.9), (0.99) and merge in queries.

Ingestion via OpenTelemetry Collector
otel-collector.yaml (core)
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
exporters:
clickhouse:
endpoint: tcp://clickhouse:9000
database: default
logs_table_name: events
ttl: 90d
processors:
batch:
timeout: 1s
send_batch_size: 4096
service:
pipelines:
logs:
receivers: [otlp]
processors: [batch]
exporters: [clickhouse]

Note: We treat all telemetry as “logs” with structured bodies to keep one table. If you need traces, add traces pipeline and derive events via processors.

Python instrumentation (FastAPI worker)
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
import logging, time, uuid, os, json

OTLP_ENDPOINT = os.getenv(“OTLP_HTTP”, “http://otel-collector:4318”)

# Logs as events
lp = LoggerProvider(resource=Resource.create({“service.name”:”agent-service”,”env”:”prod”}))
log_exporter = OTLPLogExporter(endpoint=f”{OTLP_ENDPOINT}/v1/logs”)
lp.add_log_record_processor(BatchSpanProcessor(log_exporter)) # SDK versions may use BatchLogRecordProcessor
logger = logging.getLogger(“ai-agent”)
logger.setLevel(logging.INFO)
logger.addHandler(LoggingHandler(level=logging.INFO, logger_provider=lp))

def emit_event(payload: dict):
logger.info(json.dumps(payload))

def run_agent(agent, user_id, prompt_tokens, completion_tokens, cost):
run_id = str(uuid.uuid4())
t0 = time.time()
emit_event({
“type”:”run_started”,”run_id”:run_id,”agent”:agent,
“app”:”myapp”,”env”:”prod”,”ts”:int(time.time()*1000)
})
try:
# … do work …
latency = int((time.time()-t0)*1000)
emit_event({
“type”:”run_finished”,”run_id”:run_id,”agent”:agent,
“status”:”ok”,”latency_ms”:latency,
“tokens_prompt”:prompt_tokens,
“tokens_completion”:completion_tokens,
“cost_usd”:float(cost),
“app”:”myapp”,”env”:”prod”,”ts”:int(time.time()*1000)
})
except Exception as e:
emit_event({
“type”:”error”,”run_id”:run_id,”agent”:agent,
“status”:”error”,”error_msg”:str(e),
“app”:”myapp”,”env”:”prod”,”ts”:int(time.time()*1000)
})

Node.js instrumentation (workers or Next.js routes)
import pino from ‘pino’
import { logs } from ‘@opentelemetry/api-logs’
import { LoggerProvider } from ‘@opentelemetry/sdk-logs’
import { OTLPLogExporter } from ‘@opentelemetry/exporter-logs-otlp-http’

const provider = new LoggerProvider()
provider.addLogRecordProcessor(new (require(‘@opentelemetry/sdk-logs’).BatchLogRecordProcessor)(
new OTLPLogExporter({ url: process.env.OTLP_HTTP + ‘/v1/logs’ })
))
logs.setGlobalLoggerProvider(provider)
const log = pino()

function emit(payload) { log.info(payload) }

emit({ type:’tool_call’, agent:’router’, run_id:’…’, latency_ms:32, app:’myapp’, env:’prod’, ts: Date.now() })

Metabase setup
– Connect to ClickHouse via the official driver.
– Mark events.ts as Time field. Create model views for:
– Runs by agent, environment
– Error rate by hour (countIf/status)
– P50/P90/P99 latency using quantile functions
– Token and cost per run, per user, per agent
– Parameterize app, env, agent to reuse dashboards across services.

Example SQL (Metabase card: error rate, last 24h)
SELECT
toStartOfMinute(ts) AS minute,
countIf(type = ‘run_finished’) AS runs,
countIf(status = ‘error’) AS errors,
(errors / nullIf(runs,0)) AS error_rate
FROM events
WHERE app = {{app}} AND env = {{env}} AND ts >= now() – INTERVAL 24 HOUR
GROUP BY minute
ORDER BY minute;

Alerts (Metabase → webhook/Slack)
– Threshold: error_rate > 0.05 for 5+ minutes
– Latency SLO breach: quantile(0.9)(latency_ms) > 2000 over last 15 minutes
– Cost guardrail: sum(cost_usd) > {{budget}} per day per agent

Backfill and idempotency
– If upstream replays, use event_id to dedupe:
CREATE TABLE events_dedup AS events ENGINE = ReplacingMergeTree(event_id)
…then insert into events_dedup and read from a view.
– For batch imports, use clickhouse-client with CSV/JSONEachRow. Preserve event_id.

Performance notes
– Aim for ≤1s collector batch timeout, 4–8k batch size.
– Keep meta JSON small and shallow. Move hot keys to top-level columns.
– Partition monthly, order by (app, env, agent, ts) for common filters.
– Prefer materialized aggregates for PXX latency and daily costs to keep dashboard queries <200ms.

Cost control
– TTL 90d raw, 365d for hourly aggregates.
– Compress with ZSTD; default is fine. Expect low storage/GB for numeric columns.
– Use Metabase cache 5–15 minutes on heavy cards.

Security and compliance
– Do not log PII. If needed, hash user_id and store mapping in a separate service.
– Enforce network policies: Collector and ClickHouse behind private network; Metabase via SSO.
– Rotate ClickHouse users; use readonly for BI.

Deployment tips
– Docker compose three services minimum: clickhouse, otel-collector, metabase.
– Health checks: Collector /healthz, ClickHouse system.metrics, Metabase /api/health.
– Sizing starter: 2 vCPU, 8GB RAM ClickHouse; 1 vCPU, 1GB Collector; 2 vCPU, 4GB Metabase. Scale storage IOPS first.

What to track next
– Tool-level success rate and latency
– Vendor breakdown (OpenAI, Anthropic, local) for spend and tokens
– Guardrails: rejection reasons, safe-output hits
– Queue metrics (depth, lag) for throughput planning

Summary
This stack ships real-time, production-grade observability for AI agents with minimal overhead. Start with OTLP → Collector → ClickHouse → Metabase, add materialized rollups for speed, and wire alerts to keep latency, reliability, and cost within SLOs.

Ship a Production-Ready AI Metrics Pipeline: FastAPI + Kafka (or Redis) + Postgres/Timescale + dbt + Metabase

This post shows how to deploy a lean, production-ready metrics pipeline for AI agents and automation workflows. The goal: near real-time visibility into latency, token/cost, error rates, tool call outcomes, and user/business impact.

Stack
– Ingestion API: FastAPI
– Queue: Kafka (preferred) or Redis Streams
– Storage: Postgres with TimescaleDB (hypertables) or vanilla Postgres with partitioning
– Transform: dbt (metrics, aggregations, SLOs)
– Dashboard: Metabase (fast to stand up), optional Grafana for alerts
– Orchestration: Docker Compose or Kubernetes
– Auth: API key with HMAC signature
– Optional: Celery/Prefect for scheduled jobs

Event model (core)
Capture minimal, consistent events to avoid schema drift.

Event types:
– agent_run_started
– agent_run_finished
– tool_call
– llm_call
– error
– business_event (e.g., qualified_lead_created)

Shared envelope:
– event_id (uuid)
– event_type (text)
– occurred_at (timestamptz)
– workspace_id (text)
– session_id (text) // user/session/job
– actor_id (text) // user_id, service_id
– source (text) // web, worker, cron
– context (jsonb) // free-form

LLM fields (on llm_call, agent_run_finished):
– model (text)
– input_tokens (int)
– output_tokens (int)
– prompt_hash (text)
– latency_ms (int)
– success (bool)
– error_type (text null)

Tool fields (on tool_call):
– tool_name (text)
– status (text) // success, failure, timeout
– latency_ms (int)

Cost fields (optional):
– provider (text)
– unit_cost_input (numeric)
– unit_cost_output (numeric)

Postgres schema
Use TimescaleDB if available. Otherwise, native partitions by day.

SQL (core events):
CREATE TABLE IF NOT EXISTS ai_events (
event_id uuid PRIMARY KEY,
event_type text NOT NULL,
occurred_at timestamptz NOT NULL,
workspace_id text NOT NULL,
session_id text,
actor_id text,
source text,
model text,
input_tokens int,
output_tokens int,
prompt_hash text,
latency_ms int,
success boolean,
error_type text,
tool_name text,
status text,
provider text,
unit_cost_input numeric(10,6),
unit_cost_output numeric(10,6),
context jsonb
);

Indexes:
– CREATE INDEX ON ai_events (occurred_at DESC);
– CREATE INDEX ON ai_events (workspace_id, occurred_at DESC);
– CREATE INDEX ON ai_events (event_type, occurred_at DESC);
– CREATE INDEX ON ai_events (model);
– CREATE INDEX ON ai_events USING GIN (context);

Timescale hypertable:
SELECT create_hypertable(‘ai_events’, by_range(‘occurred_at’), if_not_exists => true);

Partitioning fallback:
– Daily partitions via trigger or pg_partman.
– Retention policy: keep raw 30-90 days; aggregate forever.

Ingestion API (FastAPI)
– Accept batched events (up to 100).
– Verify HMAC signature.
– Validate via Pydantic.
– Push to Kafka topic ai_events or Redis stream ai_events.

Example (simplified):

from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel, Field
import hmac, hashlib, json, os
from datetime import datetime
from aiokafka import AIOKafkaProducer

SECRET = os.getenv(“INGEST_SECRET”)
KAFKA_BOOTSTRAP = os.getenv(“KAFKA_BOOTSTRAP”)

class Event(BaseModel):
event_id: str
event_type: str
occurred_at: datetime
workspace_id: str
session_id: str | None = None
actor_id: str | None = None
source: str | None = None
model: str | None = None
input_tokens: int | None = None
output_tokens: int | None = None
prompt_hash: str | None = None
latency_ms: int | None = None
success: bool | None = None
error_type: str | None = None
tool_name: str | None = None
status: str | None = None
provider: str | None = None
unit_cost_input: float | None = None
unit_cost_output: float | None = None
context: dict | None = None

class Batch(BaseModel):
events: list[Event] = Field(…, min_items=1, max_items=100)

app = FastAPI()
producer = None

@app.on_event(“startup”)
async def start():
global producer
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)
await producer.start()

@app.on_event(“shutdown”)
async def stop():
await producer.stop()

def verify_sig(raw_body: bytes, sig: str):
mac = hmac.new(SECRET.encode(), raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(mac, sig)

@app.post(“/ingest”)
async def ingest(request: Request):
raw = await request.body()
sig = request.headers.get(“X-Signature”) or “”
if not verify_sig(raw, sig):
raise HTTPException(401, “invalid signature”)
payload = Batch.model_validate_json(raw)
for e in payload.events:
await producer.send_and_wait(“ai_events”, json.dumps(e.model_dump()).encode())
return {“ok”: True, “count”: len(payload.events)}

Consumer (Kafka -> Postgres)
– Use a single writer per partition.
– Idempotent upsert on event_id.

Pseudo:

INSERT INTO ai_events (…) VALUES (…)
ON CONFLICT (event_id) DO NOTHING;

DBT models
Create clean aggregates for dashboards and SLOs.

models/marts/metrics/llm_daily.sql:
select
date_trunc(‘day’, occurred_at) as day,
workspace_id,
model,
count(*) filter (where event_type=’llm_call’) as calls,
percentile_cont(0.5) within group (order by latency_ms) as p50_latency_ms,
percentile_cont(0.9) within group (order by latency_ms) as p90_latency_ms,
sum(coalesce(input_tokens,0)) as input_tokens,
sum(coalesce(output_tokens,0)) as output_tokens,
sum(
coalesce(input_tokens,0)*coalesce(unit_cost_input,0)
+ coalesce(output_tokens,0)*coalesce(unit_cost_output,0)
) as cost_usd,
1.0*sum(case when success then 1 else 0 end)/nullif(count(*),0) as success_rate
from {{ ref(‘stg_ai_events’) }}
where event_type in (‘llm_call’,’agent_run_finished’)
group by 1,2,3;

models/marts/metrics/tool_reliability.sql:
select
date_trunc(‘hour’, occurred_at) as hour,
workspace_id,
tool_name,
count(*) as calls,
1.0*sum(case when status=’success’ then 1 else 0 end)/nullif(count(*),0) as success_rate,
avg(latency_ms) as avg_latency_ms
from {{ ref(‘stg_ai_events’) }}
where event_type=’tool_call’
group by 1,2,3;

SLOs and alerts
– Error budget: 99% success_rate per day on agent_run_finished.
– Latency SLO: p90_latency_ms < target per model.
– Cost guardrail: cost_usd per workspace per day threshold.

Metabase setup
Dashboards (suggested cards):
– LLM Overview
– Calls by model (daily)
– p50/p90 latency by model
– Token in/out trend
– Cost by model/provider
– Success rate over time
– Tool Reliability
– Success rate by tool (hourly)
– Failures by error_type
– Timeouts trend
– Agent Health
– Agent run success rate
– Average steps per run (from context.step_count)
– Top failing prompts (prompt_hash)
– Workspace Billing
– Daily cost per workspace
– Anomalies (z-score on daily spend)

Metabase tips
– Use SQL-native queries on dbt models.
– Add filters for workspace_id, model, date range.
– Cache at 5–15 minutes for near real-time.
– Create pulses (email/Slack) for SLO breaches.

Performance considerations
– Prefer Kafka with 3 partitions for steady ingestion; scale consumers horizontally.
– Batch inserts of 500–5k rows with COPY when backfilling.
– For Timescale: compress chunks older than 3 days; set retention to 90 days on raw.
– For vanilla Postgres: daily partitions + BRIN index on occurred_at.
– Keep jsonb context small; move hot keys to top-level columns when used in WHERE/JOIN.
– Use prompt_hash, not raw prompts, to avoid PII and save space.

Security and compliance
– HMAC signatures on ingest; rotate keys.
– Encrypt at rest (cloud-managed).
– Avoid logging raw user content; hash or redact.
– Separate writer and reader roles; Metabase gets read-only.

Deployment notes
– Docker Compose: kafka, zookeeper, postgres+timescale, fastapi, consumer, metabase.
– Health checks for producer/consumer.
– Use DLQ (dead-letter) topic for schema errors.
– Add OpenTelemetry tracing IDs to correlate events with app logs.

Quick wins
– If you don’t need Kafka, use Redis Streams; swap AIOKafka for aioredis and run a single consumer.
– If you need only dashboards, push events directly to Postgres via an async queue in the API and scale DB vertically.

What you get
– Real-time visibility into agent/LLM performance and cost.
– Traceable tool failures with impact by workspace.
– Guardrails with alerts to prevent runaway spend.
– A path to scale with minimal rework.

Real-time Product Analytics Pipeline with FastAPI, Redpanda, ClickHouse, and Metabase

This post walks through a practical, low-ops real-time analytics pipeline for SaaS product metrics. It’s built for high write throughput, sub-second query speed, and simple ops:

– Ingest: FastAPI event endpoint
– Stream: Redpanda (Kafka-compatible)
– Storage/OLAP: ClickHouse
– Transform: dbt (on ClickHouse)
– Dashboards/alerts: Metabase
– Observability: OpenTelemetry + Prometheus + Grafana (pipeline health)

Target outcomes: DAU/WAU/MAU, activation funnels, retention cohorts, feature adoption, latency/error rates, and per-tenant views.

1) Event schema and contracts
Define a minimal, versioned schema so producers and consumers stay stable.

– Topic: product_events (partition by user_id or tenant_id)
– JSON payload:
{
“event”: “button_clicked”,
“user_id”: “u_123”,
“tenant_id”: “t_9”,
“session_id”: “s_abc”,
“ts”: “2026-03-14T18:25:43.511Z”,
“properties”: {“button_id”: “save”, “plan”: “pro”, “screen”: “editor”},
“context”: {“app_version”: “2.4.1”, “os”: “macOS”, “locale”: “en-US”},
“event_version”: 1
}

Rules:
– ISO-8601 UTC timestamps
– event_version increments on breaking changes
– Limit properties/context to primitive types
– Drop PII or hash it client-side

2) Ingestion API (FastAPI)
Expose a POST /events that validates payloads and writes to Redpanda.

– Use pydantic for validation
– Rate-limit per IP/tenant
– Buffer and batch send to Redpanda using librdkafka

Python snippet (core idea):
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel, Field
from confluent_kafka import Producer
import json, time

p = Producer({“bootstrap.servers”: “redpanda:9092”, “queue.buffering.max.messages”: 100000})

class Event(BaseModel):
event: str
user_id: str
tenant_id: str
session_id: str
ts: str
properties: dict = Field(default_factory=dict)
context: dict = Field(default_factory=dict)
event_version: int = 1

app = FastAPI()

@app.post(“/events”)
async def ingest(ev: Event):
try:
p.produce(“product_events”, json.dumps(ev.dict()).encode(“utf-8”), key=ev.tenant_id)
p.poll(0)
return {“ok”: True}
except Exception:
raise HTTPException(status_code=503, detail=”Backpressure”)

3) Streaming into ClickHouse
ClickHouse consumes directly from Kafka-compatible topics via the Kafka engine and writes to a MergeTree table via a materialized view.

– Kafka table:
CREATE TABLE kafka_product_events (
event String,
user_id String,
tenant_id String,
session_id String,
ts DateTime64(3, ‘UTC’),
properties JSON,
context JSON,
event_version UInt8
) ENGINE = Kafka
SETTINGS kafka_broker_list = ‘redpanda:9092’,
kafka_topic_list = ‘product_events’,
kafka_group_name = ‘ch_consumer_v1’,
kafka_format = ‘JSONEachRow’,
kafka_num_consumers = 3;

– Target table:
CREATE TABLE product_events (
event LowCardinality(String),
user_id String,
tenant_id String,
session_id String,
ts DateTime64(3, ‘UTC’),
properties JSON,
context JSON,
event_version UInt8
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (tenant_id, ts, event, user_id)
TTL ts + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192;

– Materialized view:
CREATE MATERIALIZED VIEW mv_kafka_to_events
TO product_events AS
SELECT
event, user_id, tenant_id, session_id,
parseDateTime64BestEffortOrNull(ts) AS ts,
properties, context, event_version
FROM kafka_product_events;

Notes:
– Use LowCardinality for string dims
– Partition by month for prune speed and cost
– TTL for retention

4) Aggregations for speed and cost
Pre-aggregate common metrics into rollup tables updated continuously.

– Daily user activity:
CREATE MATERIALIZED VIEW mv_events_to_daily
TO daily_user_activity AS
SELECT
tenant_id,
toDate(ts) AS d,
event,
uniqExact(user_id) AS uu
FROM product_events
GROUP BY tenant_id, d, event;

CREATE TABLE daily_user_activity (
tenant_id String,
d Date,
event LowCardinality(String),
uu UInt64
) ENGINE = ReplacingMergeTree()
ORDER BY (tenant_id, d, event);

– Sessionized funnels (optional): build session_start/session_end materialized views using window functions or handle in dbt.

5) Transformations with dbt (ClickHouse)
Use dbt-clickhouse for semantic models, derived metrics, and SCD dimensions.

– Models:
– stg_product_events: cast types, filter bad rows, standardize event names
– fct_event_counts_daily: DAU/WAU/MAU per tenant
– fct_funnels: configurable funnel steps by event and time window
– fct_retention: N-day retention by cohort_start = first_seen_date

Example dbt SQL idea:
select
tenant_id,
toDate(ts) as d,
countIf(event = ‘signup’) as signups,
uniqExactIf(user_id, event = ‘login’) as dau
from {{ ref(‘stg_product_events’) }}
group by tenant_id, d

6) Dashboards in Metabase
Connect Metabase directly to ClickHouse.

– Collections:
– Product KPIs: DAU/WAU/MAU, activation rate, feature adoption
– Growth: signups, conversion by channel (if captured)
– Reliability: event ingestion lag, API error rates
– Segmentation:
– Filters: tenant_id, plan, app_version, region
– Drill-through: user-level trails (guard with RBAC)
– Alerts:
– Pulses to Slack/Email when DAU drops > X% day-over-day
– Alert if ingestion lag > N minutes (query kafka table offsets vs. latest ts)

7) Health monitoring and SLIs
Track pipeline reliability separately from business metrics.

– Ingestion SLIs:
– HTTP 2xx rate and p95 latency for POST /events (Prometheus + FastAPI metrics)
– Backpressure count (producer queue size)
– Stream SLIs:
– Kafka consumer lag per partition (Redpanda metrics)
– Storage SLIs:
– ClickHouse insert errors, merges backlog, disk usage, parts count
– Dashboards:
– Grafana board combining FastAPI, Redpanda, ClickHouse exporters
– Alerts:
– Pager on consumer lag > threshold
– Disk usage > 80%
– Insert failures > 0.1% 5-min rate

8) Security and governance
– API: JWT per tenant, HMAC request signing, rate limits
– Data: avoid PII or hash client-side; encrypt at rest and in transit
– ClickHouse RBAC: read-only roles for BI, tenant row-level security using allow_read_policies
– Retention: enforce TTL; archive cold data to S3 via S3 table engine if needed

9) Deployment blueprint (Docker Compose)
Services:
– fastapi, redpanda, clickhouse, metabase, grafana, prometheus
– Optional: dbt runner (scheduled via cron/k8s Job)

Compose tips:
– Pin versions; mount persistent volumes for Redpanda/ClickHouse
– Separate networks for public API vs. internal data plane
– Set ClickHouse max_memory_usage, max_concurrent_queries
– Metabase env for ClickHouse driver; increase MB_JETTY_MAXTHREADS for load

10) Cost and performance notes
– Redpanda + ClickHouse can run on a single VM for MVP; scale to 3 nodes each later
– Expect 50k–200k events/sec on modest hardware with proper batching
– Keep event payloads < 2 KB; move large blobs to object storage
– Use materialized rollups for dashboards; avoid heavy ad-hoc scans during peak

11) Validation checklist
– Backfill: load historical CSV/Parquet to product_events via clickhouse-client
– Data quality: dbt tests for not_null, accepted_values, freshness
– Latency: end-to-end under 3s for most events
– Access: tenant-level filters enforced in Metabase

What you get
– Real-time dashboards that stay fast as volume grows
– Clear SLIs and actionable alerts
– A straightforward upgrade path from single node to HA

If you want a reference repo or a Compose file with sane defaults, reach out—we use this pattern in production for product analytics, ops telemetry, and feature adoption tracking.

Real-time Product Analytics with Postgres → ClickHouse → dbt → Grafana

If you run your app on Postgres and need sub-second analytics without crushing OLTP, this stack works reliably in production:
– Ingest: Postgres → ClickHouse (MaterializedPostgreSQL or Kafka + Debezium)
– Transform: dbt (dbt-clickhouse)
– Serve: Grafana dashboards + alerts

Use cases
– Funnel and retention analysis
– Feature usage and cohort metrics
– Operational dashboards (latency, errors, throughput)
– Finance-lite rollups (orders, MRR, refunds)

Reference architecture
– Source DB: Postgres 14+ on managed cloud (RDS/Cloud SQL)
– Analytics store: ClickHouse Cloud or self-managed cluster
– Replication: ClickHouse MaterializedPostgreSQL (for simplicity) or Kafka (for multi-sink / heavy transforms)
– Transform: dbt runner (GitHub Actions, Dagster, or Airflow)
– Viz: Grafana with ClickHouse plugin
– Storage/lineage: S3/GCS parquet exports (optional but recommended)
– Secrets: environment or Vault; network via PrivateLink/VPC peering

Table design in Postgres
– Prefer narrow event table + dimension tables
– Use UUID or bigint IDs; avoid wide JSONB for hot paths
– For events:
– event_id (UUID, PK)
– user_id
– occurred_at (timestamptz)
– event_name (text)
– properties (jsonb) — sparse, normalized over time
– For orders/subscriptions:
– immutable facts + status changes as events
– avoid in-place updates on hot rows

ClickHouse schema (denormalized for reads)
– Use MergeTree with time + shard key
– Example:

CREATE TABLE analytics.events
(
event_id UUID,
user_id String,
event_name LowCardinality(String),
occurred_at DateTime64(3, ‘UTC’),
properties_json JSON,
ingest_ts DateTime DEFAULT now()
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(occurred_at)
ORDER BY (event_name, occurred_at, user_id)
SETTINGS index_granularity = 8192;

– Add projections or materialized views for frequent queries (daily rollups, funnels)

Option A: Direct Postgres → ClickHouse via MaterializedPostgreSQL
Good when you control Postgres and schema churn is moderate.

Steps
1) Enable logical replication in Postgres:
– rds.logical_replication = 1 (RDS) or wal_level = logical
– Create a replication user with REPLICATION
2) In ClickHouse:

CREATE DATABASE pg_repl ENGINE = MaterializedPostgreSQL(
‘postgres-host:5432’, ‘app_db’, ‘replicator_user’, ‘REDACTED’,
‘public’, — schema
16 — max threads
);

— Exposed tables appear under pg_repl.

3) Hydrate analytics tables from the replicated ones:
CREATE TABLE analytics.events AS
SELECT
event_id,
user_id::String,
event_name,
occurred_at,
properties as properties_json
FROM pg_repl.events
ENGINE = MergeTree
PARTITION BY toYYYYMM(occurred_at)
ORDER BY (event_name, occurred_at, user_id);

Notes
– MaterializedPostgreSQL tails WAL and applies row-level changes
– For deletes/updates, ensure primary keys exist in Postgres
– For high write volume, separate heavy tables into their own ClickHouse databases to parallelize apply

Option B: Postgres → Kafka (Debezium) → ClickHouse
Use when you need multi-sink, schema registry, or custom buses.

– Debezium captures CDC to Kafka with Avro/JSON schema
– ClickHouse reads via Kafka engine + materialized views
– Handle upserts with ReplacingMergeTree or CollapsingMergeTree keyed by primary_id + version/flag

dbt transforms (dbt-clickhouse)
– Create semantic models and rollups; keep them small and incremental
– Example model: sessions_daily.sql

{{
config(
materialized=’incremental’,
unique_key=’session_day_user’,
incremental_strategy=’delete+insert’
)
}}

WITH sessions AS (
SELECT
user_id,
toStartOfDay(occurred_at) AS session_day,
countIf(event_name = ‘session_start’) AS starts,
countIf(event_name = ‘session_end’) AS ends
FROM {{ source(‘analytics’, ‘events’) }}
{% if is_incremental() %}
WHERE occurred_at >= date_sub(day, 3, today())
{% endif %}
GROUP BY user_id, session_day
)
SELECT
concat(user_id, ‘_’, toString(session_day)) AS session_day_user,
user_id,
session_day,
starts,
ends
FROM sessions;

– Schedule:
– Small models every 5 minutes
– Heavy rollups hourly
– Backfills in off-peak

Grafana dashboards
– Install ClickHouse data source
– Panels:
– Events per minute with 1m/5m rollups
– DAU/WAU/MAU with asof joins
– Feature adoption by cohort week
– p50/p95 latency by endpoint
– Error rate by service/version
– Add alert rules on thresholds (error_rate > 2%, DAU drop > 20% d/d)

Performance and cost tips
– ClickHouse:
– Partition by month, order by (event_name, occurred_at, user_id)
– Use LowCardinality for strings
– Keep JSON for long tail props, but extract top N to typed columns for filters
– Set max_threads per query; use quotas for noisy tenants
– dbt:
– Favor incremental + small windows
– Stage raw to clean, then marts; limit cross-joins
– Postgres:
– Keep CDC slot monitored; alert on replication lag
– Index primary keys; avoid mass vacuum stalls
– Storage:
– TTL old raw events to S3 via ClickHouse S3 TTL or periodic exports
– Costs:
– Use ClickHouse Cloud autoscaling with sane concurrency caps
– Compress JSON and avoid SELECT *

Reliability and ops
– Health checks:
– Replication delay (Postgres WAL LSN vs ClickHouse apply lag)
– Kafka consumer lag (if using Debezium)
– dbt run success rates and duration SLAs
– Schema changes:
– Additive first (new columns), then backfill, then drop
– Use dbt contracts/tests and column-level lineage
– Dedupe:
– For CDC upserts, use ReplacingMergeTree with a version column
– For at-least-once ingestion, dedupe on event_id in dbt staging
– Backfills:
– Snapshot Postgres to S3 (pg_dump or logical snapshot), bulk copy into ClickHouse, then reattach WAL

Security
– Restrict replication role to specific DB and tables
– Private networking between services
– Rotate secrets; limit Grafana viewer vs editor roles
– PII handling: hash/email_tokenize in ingestion; store raw PII in a separate restricted table

Quick start checklist
– Stand up ClickHouse Cloud and Grafana
– Enable Postgres logical replication
– Create MaterializedPostgreSQL database in ClickHouse
– Define analytics.events table and initial projections
– Configure dbt-clickhouse; build staging and marts
– Publish Grafana dashboards and alerts
– Add monitors for lag, error rates, and costs

What this delivers
– Sub-second reads on billions of events
– Minimal load on Postgres OLTP
– Clear, versioned transforms
– Actionable dashboards and alerts for product and ops

Real-Time Product Analytics with ClickHouse, Airbyte, dbt, and Superset

Overview
This is a production-ready analytics stack designed for product teams that need fast event queries and reliable, explainable metrics:
– Storage/engine: ClickHouse
– Ingestion: Event collector (HTTP) to ClickHouse; Airbyte for SaaS enrichment
– Transformations: dbt (dbt-clickhouse)
– Dashboards: Apache Superset
– Orchestration: Cron or Airflow (optional)
– Monitoring: ClickHouse system tables + alerting

Why this stack
– ClickHouse gives millisecond queries on billions of rows with low storage overhead.
– Airbyte reliably syncs SaaS sources (Stripe, HubSpot, PostHog export, S3 logs) into ClickHouse.
– dbt standardizes transformations, testing, and CI.
– Superset is fast, self-hostable, and permissionable.

Reference architecture (flow)
Client apps/web → Event Collector (NGINX + small Python/Go handler) → ClickHouse (raw_events) → dbt models/materialized views → Superset dashboards
Plus: Airbyte → ClickHouse (dim_*, stg_* tables) for enrichment joins

Event collection
– Send JSON over HTTPS from SDKs to your collector. Keep the collector stateless and append-only.
– Validate required keys, add server timestamps, and write line-delimited JSON batches to ClickHouse via HTTP insert.

ClickHouse: core tables
Use ReplacingMergeTree for safe deduplication and TTL for hot/cold retention.

Example table (compact):
CREATE TABLE analytics.raw_events
(
event_time DateTime64(3, ‘UTC’),
event_date Date MATERIALIZED toDate(event_time),
event_name LowCardinality(String),
event_id UUID,
user_id String,
session_id String,
device LowCardinality(String),
os LowCardinality(String),
country LowCardinality(FixedString(2)),
properties JSON,
received_at DateTime64(3, ‘UTC’) DEFAULT now64(3),
_ingest_version UInt32 DEFAULT 1
)
ENGINE = ReplacingMergeTree(_ingest_version)
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id, event_name, event_id)
TTL event_date + INTERVAL 180 DAY
SETTINGS index_granularity = 8192;

Notes
– ReplacingMergeTree dedupes rows with the same primary key using _ingest_version; update that when reprocessing.
– For very high volume, switch to ReplicatedReplacingMergeTree and add Kafka ingestion or buffering via S3.

Sessionization materialized view
CREATE MATERIALIZED VIEW analytics.mv_sessions
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id, session_id)
AS
SELECT
minState(event_time) AS session_start_state,
maxState(event_time) AS session_end_state,
user_id,
session_id,
toDate(min(event_time)) AS event_date,
anyState(country) AS country_state,
countState() AS events_state
FROM analytics.raw_events
GROUP BY user_id, session_id;

Session query:
SELECT
user_id,
session_id,
minMerge(session_start_state) AS session_start,
maxMerge(session_end_state) AS session_end,
anyMerge(country_state) AS country,
countMerge(events_state) AS events
FROM analytics.mv_sessions
WHERE event_date >= today() – 7
GROUP BY user_id, session_id;

Daily rollups (cost saver)
– Create a daily_agg_events table with SummingMergeTree for event counts by event_name, country, user_id segment, etc.
– Use a materialized view to fill it from raw_events.
– Point common dashboards to rollups; leave ad-hoc drill-downs on raw_events.

Airbyte: enrichment data
– Destination: ClickHouse (native destination).
– Sources: Stripe, HubSpot, CRM, billing exports, S3 logs.
– Namespace to analytics_ext.*
– Keep staging (stg_*) tables raw; build dim_* models in dbt for clean joins.

dbt on ClickHouse
– Adapter: dbt-clickhouse.
– Store models in analytics.* schemas; configure incremental where possible.

Example dbt model (event → signup funnel):
{{ config(materialized=’table’) }}
SELECT
e.user_id,
minIf(e.event_time, e.event_name = ‘page_view’ AND JSONExtractString(e.properties, ‘path’) = ‘/signup’) AS first_signup_page_at,
minIf(e.event_time, e.event_name = ‘signup_submitted’) AS signup_submitted_at,
minIf(e.event_time, e.event_name = ‘signup_completed’) AS signup_completed_at,
IF(signup_completed_at IS NOT NULL, 1, 0) AS completed
FROM {{ source(‘analytics’, ‘raw_events’) }} e
WHERE e.event_date >= addDays(today(), -30)
GROUP BY e.user_id;

Data quality
– dbt tests: not_null on event_time, event_id; unique on event_id within a day; accepted_values on event_name.
– ClickHouse constraints: use CHECKs for schema sanity (e.g., length(country)=2).

Superset dashboards
– Connection: SQLAlchemy driver for ClickHouse.
– Datasets: expose models and rollups; hide raw tables from non-admins.
– Common charts:
– DAU/WAU/MAU from rollups
– Activation funnel (dbt model)
– Session length distribution (mv_sessions)
– L7 retention cohort
– Country/device breakdown
– Caching: enable datasource and chart-level caching; warm critical charts via a small cron job hitting chart APIs.

Monitoring and ops
– Lag/health:
– SELECT count() FROM system.mutations WHERE is_done = 0;
– SELECT * FROM system.parts WHERE active = 0; (stuck merges)
– SELECT * FROM system.disks; (space)
– Alert on:
– Insert failures from collector
– Mutations backlog
– Disk > 75%
– Queries > p95 threshold
– Backfill strategy:
– Insert to raw_events with higher _ingest_version to overwrite.
– Pause dependent materialized views or rebuild rollups after batch backfill.
– Backpressure:
– Use async inserts (async_insert=1) and min_bytes_to_use_direct_io for large loads.

Security
– TLS for ClickHouse HTTP/native.
– Users/roles: read-only for BI; writer for collector; admin for dbt/ops.
– Row-level security: create row policies for tenants or business units.
– Mask PII: store hashed user_id; keep mapping in a restricted dim_user table if needed.

Performance tips
– Keep properties JSON compact; extract hot keys into dedicated columns.
– LowCardinality on strings with medium/low cardinality (event_name, device).
– ORDER BY: put high-selectivity columns first (event_date, user_id).
– Use SAMPLE on exploratory queries; LIMIT 0 BY for distinct lists.
– Partition by month for most workloads; day if >1B/day.

Cost control
– TTL to move old partitions to cheaper storage or delete after 180 days.
– Rollups for dashboards; raw only for deep dives.
– Prefer ClickHouse Cloud autoscaling or size 2–3 medium nodes before sharding.

Minimal docker-compose (dev)
version: “3.8”
services:
clickhouse:
image: clickhouse/clickhouse-server:24.1
ports: [“8123:8123″,”9000:9000”]
ulimits: { nofile: { soft: 262144, hard: 262144 } }
superset:
image: apache/superset:latest
ports: [“8088:8088”]
environment:
– SUPERSET_SECRET_KEY=dev
airbyte:
image: airbyte/airbyte:latest
ports: [“8000:8000”] # use official deployment for prod
collector:
build: ./collector # small Python/Go service that validates and inserts to ClickHouse

Rollout checklist
– Define event contract and versioning.
– Provision ClickHouse and create raw_events with TTL and dedupe.
– Stand up collector; load test inserts.
– Configure Airbyte syncs for enrichment.
– Add dbt models and tests; set up CI.
– Publish Superset datasets and dashboards with caching.
– Add monitoring, alerts, and a backfill playbook.

Shipping a Real-Time Telemetry Pipeline for AI Agents with OpenTelemetry, Kafka, ClickHouse, and Grafana

This build logs every AI agent run, tool call, token usage, latency, and error into a low-latency analytics stack you can operate. It is optimized for high write throughput, cheap retention, and fast aggregations.

Architecture
– Ingest: OpenTelemetry OTLP HTTP (collector) receives spans/events from agents, services, and workers.
– Queue: Kafka buffers bursts and decouples ingestion from storage.
– Storage: ClickHouse stores normalized spans and derived rollups via materialized views.
– Visualization: Grafana queries ClickHouse for real-time dashboards and SLOs.
– Optional: S3 object storage for cheap parquet archives via ClickHouse TTL + MOVES.

Data model
– Trace-level: trace_id, span_id, parent_span_id, service_name, operation, status_code, start_ts, end_ts, duration_ms, attributes (map), tenant_id, env.
– Event-level: event_ts, event_type, trace_id, payload_json, tenant_id, cost_usd, tokens_prompt, tokens_completion, model, tool_name.
– Derived rollups: per-minute and per-tenant aggregates for latency, error_rate, cost, token_count.

Docker Compose (minimal dev)
– Services: otel-collector, kafka, zookeeper, clickhouse-server, clickhouse-init (one-shot DDL), grafana.
– Expose:
– OTLP HTTP: 4318
– ClickHouse HTTP: 8123, Native: 9000
– Grafana: 3000
– Kafka: 9092 (internal), 29092 (host)

otel-collector.yaml (key points)
– receivers:
– otlp: protocols http
– processors:
– batch
– attributes (add env, tenant_id if available)
– exporters:
– kafka (topic: otel-spans, encoding: otlp_json)
– service:
– pipelines:
– traces: otlp -> batch -> kafka

Kafka topics
– otel-spans (partitions: 6+ for throughput)
– otel-events (optional for custom events)
– Use compression lz4 or zstd. Replication 3 in prod.

ClickHouse schema (core)
– Table: spans_raw (MergeTree)
– Columns: as above, plus attributes_json JSON
– Order by: (tenant_id, toDate(start_ts), service_name, operation, start_ts)
– TTL: start_ts + INTERVAL 30 DAY TO VOLUME ‘slow’, start_ts + INTERVAL 180 DAY DELETE
– Table: events_raw (MergeTree)
– Order by: (tenant_id, toDate(event_ts), event_type, event_ts)
– Kafka engines:
– spans_kafka ENGINE = Kafka reading topic otel-spans, format JSONEachRow
– Materialized view mv_spans_raw inserts into spans_raw with JSON extraction and type casting
– Rollups:
– mv_span_minute aggregates to spans_minute (AggregatingMergeTree) with quantiles and counts
– mv_cost_minute aggregates token and cost fields

Example ClickHouse DDL (abridged)
– Create database telemetry;
– Create table telemetry.spans_raw (…)
ENGINE = MergeTree
ORDER BY (tenant_id, toDate(start_ts), service_name, operation, start_ts)
TTL start_ts + INTERVAL 90 DAY DELETE;
– Create table telemetry.spans_kafka (…)
ENGINE = Kafka
SETTINGS kafka_broker_list=’kafka:9092′, kafka_topic_list=’otel-spans’, kafka_group_name=’ch-spans’, kafka_format=’JSONEachRow’;
– Create materialized view telemetry.mv_spans_raw TO telemetry.spans_raw AS
SELECT
JSON_VALUE(_raw, ‘$.traceId’) AS trace_id,
JSON_VALUE(_raw, ‘$.spanId’) AS span_id,

FROM telemetry.spans_kafka;
– Create table telemetry.spans_minute (…)
ENGINE = AggregatingMergeTree
ORDER BY (tenant_id, service_name, operation, toStartOfMinute(start_ts));
– Create materialized view telemetry.mv_spans_minute TO telemetry.spans_minute AS
SELECT
tenant_id, service_name, operation, toStartOfMinute(start_ts) AS ts_min,
countState() AS c,
quantilesTimingState(0.5,0.9,0.99)(duration_ms) AS q,
sumState(if(status_code!=’OK’,1,0)) AS errors
FROM telemetry.spans_raw
GROUP BY tenant_id, service_name, operation, ts_min;

Ingest from Python agents (OTLP)
– Install: pip install opentelemetry-sdk opentelemetry-exporter-otlp
– Configure:
– OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318
– Resource attributes: service.name, deployment.environment, tenant.id
– Emit spans per agent run and tool call. Add attributes:
– model, tokens.prompt, tokens.completion, cost.usd, user_id, workflow_id, tool_name.

Example Grafana panels
– Latency: SELECT quantilesExact(0.5,0.9,0.99)(duration_ms) FROM spans_raw WHERE tenant_id=$tenant AND $__timeFilter(start_ts) GROUP BY bin( start_ts, 1m )
– Error rate: sum(errors)/sum(c) from spans_minute
– Cost $: sum(cost_usd) from events_raw or spans_raw attributes over time and by model
– Throughput: countDistinct(trace_id) per minute
– SLOs:
– Availability: 1 – (errors / c)
– Latency objective: P95(duration_ms) 2% for 5m
– P95 latency > target for 10m
– Cost per minute spike vs 7d baseline
– No data (ingestion stalled) for 5m

What this enables
– Real-time visibility into agent reliability, latency, token spend, and tool performance.
– Fast drilldowns by tenant, model, workflow, and tool to debug regressions.
– Predictable cost with cheap storage and controllable retention.

Repo starter checklist
– docker-compose.yml with all services and volumes
– otel-collector.yaml
– clickhouse-init.sql for tables, topics, views, and users
– grafana-provisioning dashboards and datasources
– Makefile targets: up, down, logs, migrate, backfill

Production-grade LLM Ops Metrics Pipeline with OpenTelemetry, ClickHouse, and Grafana (Docker Compose)

This post ships a real, minimal LLM ops pipeline for metrics, traces, and logs:
– OpenTelemetry SDK (your services) -> OTLP -> OpenTelemetry Collector
– Collector -> ClickHouse (fast, columnar, cheap)
– Grafana -> ClickHouse for dashboards and alerting

Why this stack:
– OTLP is a standard you can use across Python, Node, Go, and edge functions.
– ClickHouse handles high-cardinality metrics and traces at low cost.
– Grafana reads ClickHouse directly with a mature plugin.

What you get
– Docker Compose to run everything locally or on a small VM.
– OpenTelemetry Collector config with ClickHouse exporter.
– Python example for emitting traces, metrics, and logs.
– ClickHouse retention/partitioning for predictable costs.
– Example Grafana queries to visualize agent quality and latency.

Prerequisites
– Docker + Docker Compose
– A domain or IP (optional, for remote access)
– Grafana ClickHouse data source plugin (ID: vertamedia-clickhouse-datasource)

1) Docker Compose
Create docker-compose.yml:
version: “3.8”
services:
clickhouse:
image: clickhouse/clickhouse-server:24.1
container_name: clickhouse
ports:
– “8123:8123” # HTTP
– “9000:9000” # Native
volumes:
– ch-data:/var/lib/clickhouse
– ./clickhouse/config.d:/etc/clickhouse-server/config.d
– ./clickhouse/users.d:/etc/clickhouse-server/users.d
ulimits:
nofile:
soft: 262144
hard: 262144

otel-collector:
image: otel/opentelemetry-collector-contrib:0.100.0
container_name: otel-collector
command: [“–config=/etc/otelcol/config.yaml”]
volumes:
– ./otel/config.yaml:/etc/otelcol/config.yaml
ports:
– “4317:4317” # OTLP gRPC
– “4318:4318” # OTLP HTTP
depends_on:
– clickhouse

grafana:
image: grafana/grafana:10.4.3
container_name: grafana
ports:
– “3000:3000”
environment:
– GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource
volumes:
– grafana-data:/var/lib/grafana
depends_on:
– clickhouse

volumes:
ch-data:
grafana-data:

2) ClickHouse minimal config (auth + profiles)
Create clickhouse/users.d/users.xml:

::/0

default
default

otel_password

::/0

default
default

Optional hardening (recommended for internet-facing):
– Bind to private network only and proxy via VPN or Tailscale.
– Create dedicated DB and user with limited privileges.

3) OpenTelemetry Collector config
Create otel/config.yaml:
receivers:
otlp:
protocols:
grpc:
http:

processors:
batch:
send_batch_size: 8192
timeout: 5s
memory_limiter:
check_interval: 5s
limit_mib: 512
spike_limit_mib: 256
attributes:
actions:
– key: service.environment
action: insert
value: prod

exporters:
clickhouse:
endpoint: tcp://clickhouse:9000?secure=false
database: otel
ttl: 168h # 7 days default retention; we’ll add TTLs too
username: otel
password: otel_password
create_schema: true
logs_table_name: otel_logs
traces_table_name: otel_traces
metrics_table_name: otel_metrics
timeout: 10s

service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [clickhouse]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [clickhouse]
logs:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [clickhouse]

Notes:
– clickhouse exporter auto-creates schema when create_schema: true.
– You can split pipelines by environment or service using routing processors.

4) Start the stack
docker compose up -d
– ClickHouse UI (HTTP): http://localhost:8123
– Grafana: http://localhost:3000 (admin/admin by default)
– OTLP endpoint: grpc http://localhost:4317, http http://localhost:4318

5) Emit data from Python (traces, metrics, logs)
Install:
pip install opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-instrumentation-requests opentelemetry-api

Sample app (app.py):
import time
import random
import requests
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry._logs import set_logger_provider

resource = Resource.create({
“service.name”: “agent-orchestrator”,
“service.version”: “1.2.3”,
“service.environment”: “prod”,
“deployment.region”: “us-west-2″,
})

# Traces
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
span_exporter = OTLPSpanExporter(endpoint=”http://localhost:4317″, insecure=True)
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(span_exporter))

# Metrics
metric_exporter = OTLPMetricExporter(endpoint=”http://localhost:4317”, insecure=True)
reader = PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000)
metrics.set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
meter = metrics.get_meter(“agent-metrics”)
latency_hist = meter.create_histogram(“agent.response_latency_ms”)
tokens_counter = meter.create_counter(“agent.output_tokens”)
success_counter = meter.create_counter(“agent.success”)

# Logs
logger_provider = LoggerProvider(resource=resource)
set_logger_provider(logger_provider)
log_exporter = OTLPLogExporter(endpoint=”http://localhost:4317″, insecure=True)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
logger = logger_provider.get_logger(“agent-logger”)

def call_model(prompt):
# fake work
start = time.time()
time.sleep(random.uniform(0.05, 0.4))
tokens = random.randint(50, 400)
ok = random.random() > 0.1
duration_ms = (time.time() – start) * 1000
attributes = {“model”: “gpt-4o-mini”, “route”: “answer”, “customer_tier”: “pro”}
latency_hist.record(duration_ms, attributes)
tokens_counter.add(tokens, attributes)
success_counter.add(1 if ok else 0, attributes)
with tracer.start_as_current_span(“llm.call”, attributes=attributes) as span:
span.set_attribute(“llm.prompt.len”, len(prompt))
span.set_attribute(“llm.tokens”, tokens)
if not ok:
span.set_attribute(“error”, True)
logger.error(“agent failure”, extra={“attributes”: attributes})
else:
logger.info(“agent success”, extra={“attributes”: attributes})
return ok, tokens, duration_ms

if __name__ == “__main__”:
while True:
call_model(“hello”)
time.sleep(1)

Run:
python app.py

6) Verify data landed
In ClickHouse:
– Show DBs: SHOW DATABASES;
– Data lives in database otel with tables otel_traces, otel_metrics, otel_logs (names from exporter).
– Basic checks:
SELECT count() FROM otel.otel_traces;
SELECT count() FROM otel.otel_metrics;
SELECT count() FROM otel.otel_logs;

7) Retention, partitioning, and compression
For cost control, add TTL and partitioning. If you let the exporter create schema, alter tables:
ALTER TABLE otel.otel_traces
MODIFY TTL toDateTime(timestamp) + INTERVAL 7 DAY
SETTINGS storage_policy = ‘default’;

ALTER TABLE otel.otel_metrics
MODIFY TTL toDateTime(timestamp) + INTERVAL 30 DAY;

ALTER TABLE otel.otel_logs
MODIFY TTL toDateTime(timestamp) + INTERVAL 14 DAY;

Optional: create your own tables with partitions by toYYYYMMDD(timestamp) and codecs (ZSTD(6)) for lower storage.

8) Grafana data source
– Login to Grafana -> Connections -> Data sources -> Add data source -> ClickHouse.
– URL: http://clickhouse:8123
– Auth: username otel, password otel_password
– Default database: otel
– Confirm connection.

9) Example Grafana panels (SQL)
– Agent P50/P95 latency (ms) by model, 15m
SELECT
model,
quantile(0.5)(value) AS p50,
quantile(0.95)(value) AS p95,
toStartOfInterval(timestamp, INTERVAL 15 minute) AS ts
FROM otel_metrics
WHERE name = ‘agent.response_latency_ms’
AND timestamp >= now() – INTERVAL 24 HOUR
GROUP BY model, ts
ORDER BY ts ASC;

– Success rate by route (rolling 1h)
WITH
sumIf(value, name = ‘agent.success’) AS succ,
countIf(name = ‘agent.success’) AS total
SELECT
route,
toStartOfInterval(timestamp, INTERVAL 1 hour) AS ts,
if(total = 0, 0, succ / total) AS success_rate
FROM otel_metrics
WHERE timestamp >= now() – INTERVAL 24 HOUR
GROUP BY route, ts
ORDER BY ts;

– Tokens per minute by customer_tier
SELECT
customer_tier,
toStartOfMinute(timestamp) AS ts,
sumIf(value, name = ‘agent.output_tokens’) AS tokens
FROM otel_metrics
WHERE timestamp >= now() – INTERVAL 6 HOUR
GROUP BY customer_tier, ts
ORDER BY ts;

– Error logs (last 1h)
SELECT
timestamp,
severity_text,
body,
attributes:error AS err,
attributes:model AS model,
attributes:route AS route
FROM otel_logs
WHERE timestamp >= now() – INTERVAL 1 HOUR
AND (severity_number >= 17 OR JSONExtractBool(attributes, ‘error’) = 1)
ORDER BY timestamp DESC
LIMIT 200;

– Trace sample count by service
SELECT
service_name,
count() AS spans
FROM otel_traces
WHERE timestamp >= now() – INTERVAL 1 DAY
GROUP BY service_name
ORDER BY spans DESC;

10) Production notes
– Separate environments: run separate DBs or add service.environment in resource and filter in Grafana.
– Cardinality guardrails: cap dynamic attributes (e.g., customer_id) or hash/map to tiers. High-cardinality tags can blow up storage.
– Backpressure: tune batch processor send_batch_size and timeouts. Add queued_retry if you expect spikes.
– Ingestion SLOs: keep ClickHouse inserts under 50–100 MB per batch for stable performance on small VMs.
– Storage: start with 2–4 vCPU, 8–16 GB RAM, NVMe. Enable ZSTD compression and TTLs.
– Security: do not expose ClickHouse or Grafana admin to the internet. Use VPN, SSO, or OAuth proxy.
– Backups: S3-compatible backup via clickhouse-backup or object storage disks.
– Cost: This stack runs comfortably on a $20–40/month VPS for moderate load (tens of thousands of spans/min and metrics).

Extending the pipeline
– Add dbt for derived metrics (e.g., session-level aggregation).
– Add alerting in Grafana: p95 latency > threshold, success_rate < X, tokens/min anomaly.
– Add router-level tracing to attribute latency to providers and prompts.

This is a deployable baseline that turns your AI agent traffic into actionable, queryable telemetry with low operational overhead.

Data Pipelines & Dashboards Guide

Data pipelines and dashboards are the backbone of modern analytics. In a world where every interaction can generate useful information, the ability to collect, organize and visualize data quickly makes the difference between informed decisions and guesswork. A pipeline is a sequence of automated steps that moves data from where it is created—think web forms, CRM systems, e‑commerce transactions, or IoT sensors—to a centralized repository where it can be cleaned and transformed. Dashboards use this curated data to present key metrics in an easy‑to‑digest format so you and your team can monitor performance at a glance.

Building an effective pipeline starts with defining what data you need. For a service business, that might include leads captured through a contact form, bookings made through a scheduling tool, support tickets submitted via chat, and payments processed through a store. Each of these systems—WordPress, Google Sheets, email, CRMs, calendar services—stores information differently. The goal of a pipeline is to extract relevant fields from each source and normalize them into a consistent structure. An automation tool or custom script can listen for new submissions via webhooks, parse the payloads, then append the results to a spreadsheet or database. Where possible, enrich your records by adding context such as time stamps, source campaign or geographic location.

Transformation is the next critical step. Raw data often contains inconsistencies such as duplicate entries, missing values or inconsistent capitalization. Automated routines can remove duplicates based on email address or phone number, standardize text fields to a common format, validate addresses and flag incomplete submissions for follow‑up. You might also use AI models to classify leads by industry, detect sentiment in feedback messages or summarize long comments into tags. By cleaning and enriching your data at this stage, you ensure that downstream dashboards reflect accurate and actionable information.

Once your data is in good shape, you need a storage solution that supports easy querying. For small projects, a shared Google Sheet or Airtable base may suffice. Larger operations might prefer a relational database like MySQL or PostgreSQL or a cloud data warehouse. The key is to choose a platform that integrates smoothly with your data sources and reporting tools. When working with WordPress sites, it’s common to store form submissions in the database and then replicate them to a spreadsheet for analysis. API bridges can keep multiple systems synchronized so you always have a single source of truth.

Dashboards are the window into your pipeline. A well‑designed dashboard should answer the most important questions about your business without overwhelming the viewer. If you run a membership site, you might track new sign‑ups, cancellations, churn rate and lifetime value. An e‑commerce store would monitor sales revenue, average order value, cart abandonment and top products. A service agency would keep an eye on leads generated, consultations booked, conversion rates, and project profitability. Tools like Google Data Studio, Looker Studio (formerly Data Studio), Tableau, Power BI and Notion support rich visualizations such as bar charts, line graphs and funnel diagrams. Many allow you to embed dashboards into your WordPress admin panel or client portals so everyone sees the same information.

When designing dashboards, clarity is paramount. Group related metrics together and use consistent colors and scales to make comparisons easy. Include filters that let stakeholders drill down by date range, marketing channel or product category. Consider building multiple dashboards for different audiences: management might need high‑level KPIs, while marketing teams benefit from detailed campaign analytics. Scheduling automated email or Slack reports keeps the data top‑of‑mind; for instance, a daily summary could include new leads captured, meetings scheduled and revenue generated, while a weekly report might highlight trends and anomalies.

Security and privacy should be integral to your pipeline architecture. Always handle personal data in accordance with applicable laws such as GDPR or California’s CPRA. Limit access to the database or spreadsheets to those who need it, and use secure authentication for API connections. When sending automated reports, avoid including sensitive information in plain text. Instead, provide links to secure dashboards where users must log in. Regularly audit your integrations to ensure tokens haven’t expired and that revocations are respected.

Implementing a pipeline and dashboard system is an iterative process. Start with the most critical data points and add additional sources and metrics over time. Begin by documenting where your data originates, how often it changes and who needs to see it. Then choose automation tools or write scripts to handle extraction and loading. Create transformations that clean and enrich the data, and test the results with a small sample before scaling up. Design a dashboard that surfaces the metrics you care about, gather feedback from users and refine the visualizations. As your business evolves, revisit your pipeline to incorporate new systems, retire unused sources and adjust KPIs.

In summary, data pipelines and dashboards give you the infrastructure to run a data‑driven business. By automating the flow of information from your website and applications into a clean repository and presenting insights through intuitive visualizations, you empower your team to make decisions based on facts rather than hunches. Whether you’re tracking marketing performance, customer satisfaction, financial health or operational efficiency, investing time in a robust data pipeline will pay dividends in clearer insights and faster growth.

When you have reliable pipelines feeding your dashboards, decision making becomes easier. You can schedule summaries to be delivered to Slack or email and give team members access to the specific metrics they need.