๐ Langfuse
TL;DR ๐ Observe every agent turn safely without blocking execution or hitting body size limits. Langfuse traces are exported via OpenTelemetry (primary) with a REST fallback (backup). Large payloads (long prompt histories, git diffs, file reads) can aggregate to hit the self-hosted Next.js HTTP 413 โBody exceeded 4.5mb limitโ block on batch ingestion. We fix this via OTel visibility buffers, payload truncation, and chunked batch flushes. ๐
๐งฌ Instrumentation & Fallback Flow
flowchart TD subgraph Agent["๐ค Pi Coding Agent Run"] P[Prompt Start] -->|Turn Start| T[Turn Span] T -->|Call LLM| G[Generation Observation] T -->|Call Tool| O[Tool Span] T -->|Turn End| E[Turn End] end subgraph Export["๐ก Trace Export & Visibility Fallback"] E -->|agent_end| FLUSH["forceFlush() OTel Spans"] FLUSH -->|Wait up to 8s| POLL{"Trace visible in DB?<br/>(POLL_INTERVAL=500ms)"} POLL -->|๐ข Yes| DONE[OTel Succeeded] POLL -->|๐ด No| REST[REST Fallback Ingestion] end subgraph Fallback["โ๏ธ REST Fallback Processing (pi-langfuse)"] REST -->|Tier 1| TRUNC["Truncate String values >200KB<br/>(Recursively prune deep objects)"] TRUNC -->|Tier 2| CHUNK["Slice batch into chunks of 15 items"] CHUNK -->|Tier 3| SEND["POST sequentially to /api/public/ingestion<br/>(Individual try/catches)"] end classDef primary fill:#5865F2,stroke:#fff,stroke-width:2px,color:#fff; classDef muted fill:#2F3136,stroke:#7289DA,stroke-width:1px,color:#fff; classDef success fill:#43B581,stroke:#fff,stroke-width:1px,color:#fff; class E,FLUSH primary; class POLL,REST,TRUNC,CHUNK,SEND muted; class DONE success;
๐จ The HTTP 413 Body Size Limit
Self-hosted Langfuse servers use Next.js behind the hood, enforcing a default bodyParser: { sizeLimit: '4.5mb' } on the ingestion endpoint.
When a long-running agent session is flushed, the REST fallback bundles all observations (system prompt, conversation history, tool calls, and final answers) into a single batch JSON array. This payload easily aggregates to 5MB - 10MB, causing the server to reject the entire batch with statusCode: 413.
/* โ Next.js body-parser rejection raw body */
{
"ok": false,
"error": {
"reason": "non-json",
"statusCode": 413,
"rawBody": "Body exceeded 4.5mb limit"
}
}๐ก๏ธ The Three-Tier Reliability Fix
We implement three layers of defensive ingestion inside pi-langfuseโs REST fallback:
1๏ธโฃ OpenTelemetry Visibility Buffer
Self-hosted Langfuse ingests traces asynchronously (Ingestion queue โ Worker โ DB). Spans are rarely queryable within ~1.5s after flush.
- Fix: We increase
OTEL_VISIBILITY_TIMEOUT_MSto8_000ms(polling every500ms) so the REST fallback only fires on genuine OTel exporter failures, preventing duplicate trace writes.
2๏ธโฃ Recursive Payload Truncation (safeValue)
No single trace input, output, or metadata object should choke the network. We recursively traverse payloads and truncate individual strings to a safe threshold.
// ๐ why: prevents massive string values (like git diffs) from exceeding limits
function truncateString(str: string, limit = 200_000): string {
if (str.length > limit) {
return str.slice(0, limit) + "\n\n... [Truncated by pi-langfuse due to length limits]"
}
return str
}
// ๐ what: recursively prunes objects and limits nesting depth to avoid stack overflows
function safeValue(val: unknown, depth = 0): unknown {
if (val === null || val === undefined) return val
if (typeof val === "string") return truncateString(val)
if (Array.isArray(val)) {
if (depth > 5) return "[Array truncated due to nesting depth]"
return val.map((item) => safeValue(item, depth + 1))
}
if (typeof val === "object") {
if (depth > 5) return "[Object truncated due to nesting depth]"
const cleaned: Record<string, unknown> = {}
for (const key of Object.keys(val)) {
cleaned[key] = safeValue((val as any)[key], depth + 1)
}
return cleaned
}
return val
}3๏ธโฃ Chunked REST Ingestion (chunkSize = 15)
Instead of sending the whole execution history in one request, we partition the batch.
// ๐ how: slice the payload array and wrap each POST in an independent try/catch block
const chunkSize = 15
for (let i = 0; i < batch.length; i += chunkSize) {
const chunk = batch.slice(i, i + chunkSize)
try {
await rt.scoreClient.api.ingestion.batch({
batch: chunk,
metadata: {
source: "pi-langfuse",
fallback: "rest-ingestion",
chunkIndex: Math.floor(i / chunkSize),
totalChunks: Math.ceil(batch.length / chunkSize),
},
})
} catch (e) {
// ๐ก non-blocking: one chunk failing doesn't abort the remaining queue flushes
console.warn(
`๐ Langfuse: Failed to ingest fallback batch chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(batch.length / chunkSize)}`,
e,
)
}
}๐ก Best Practices Checklist
- โณ Deferred Flush: Always defer the final Langfuse shutdown/flush using
setTimeout(shutdownRuntime, 0)onagent_end. Never let telemetry block the main agent execution turn or increase user latency. - ๐ก๏ธ PII & Credential Masking: Never log environment arrays or raw configs. Function arguments containing secret keys must be stripped before logging inputs.
- ๐ท๏ธ Clean Trace Naming: Choose descriptive names (
code-generation,tool-execution) over generic IDs to ensure UI filtering is actually usable. - ๐ Session Attribution: Bind
sessionIdon traces using the Discord thread ID. This groups conversational multi-turn histories in the Langfuse Sessions view.