๐Ÿ”Ž Langfuse

TL;DR ๐Ÿš€ Observe every agent turn safely without blocking execution or hitting body size limits. Langfuse traces are exported via OpenTelemetry (primary) with a REST fallback (backup). Large payloads (long prompt histories, git diffs, file reads) can aggregate to hit the self-hosted Next.js HTTP 413 โ€œBody exceeded 4.5mb limitโ€ block on batch ingestion. We fix this via OTel visibility buffers, payload truncation, and chunked batch flushes. ๐Ÿ”Œ


๐Ÿงฌ Instrumentation & Fallback Flow

flowchart TD
  subgraph Agent["๐Ÿค– Pi Coding Agent Run"]
    P[Prompt Start] -->|Turn Start| T[Turn Span]
    T -->|Call LLM| G[Generation Observation]
    T -->|Call Tool| O[Tool Span]
    T -->|Turn End| E[Turn End]
  end

  subgraph Export["๐Ÿ“ก Trace Export & Visibility Fallback"]
    E -->|agent_end| FLUSH["forceFlush() OTel Spans"]
    FLUSH -->|Wait up to 8s| POLL{"Trace visible in DB?<br/>(POLL_INTERVAL=500ms)"}

    POLL -->|๐ŸŸข Yes| DONE[OTel Succeeded]
    POLL -->|๐Ÿ”ด No| REST[REST Fallback Ingestion]
  end

  subgraph Fallback["โœ‚๏ธ REST Fallback Processing (pi-langfuse)"]
    REST -->|Tier 1| TRUNC["Truncate String values >200KB<br/>(Recursively prune deep objects)"]
    TRUNC -->|Tier 2| CHUNK["Slice batch into chunks of 15 items"]
    CHUNK -->|Tier 3| SEND["POST sequentially to /api/public/ingestion<br/>(Individual try/catches)"]
  end

  classDef primary fill:#5865F2,stroke:#fff,stroke-width:2px,color:#fff;
  classDef muted fill:#2F3136,stroke:#7289DA,stroke-width:1px,color:#fff;
  classDef success fill:#43B581,stroke:#fff,stroke-width:1px,color:#fff;
  class E,FLUSH primary;
  class POLL,REST,TRUNC,CHUNK,SEND muted;
  class DONE success;

๐Ÿšจ The HTTP 413 Body Size Limit

Self-hosted Langfuse servers use Next.js behind the hood, enforcing a default bodyParser: { sizeLimit: '4.5mb' } on the ingestion endpoint.

When a long-running agent session is flushed, the REST fallback bundles all observations (system prompt, conversation history, tool calls, and final answers) into a single batch JSON array. This payload easily aggregates to 5MB - 10MB, causing the server to reject the entire batch with statusCode: 413.

/* โŒ Next.js body-parser rejection raw body */
{
  "ok": false,
  "error": {
    "reason": "non-json",
    "statusCode": 413,
    "rawBody": "Body exceeded 4.5mb limit"
  }
}

๐Ÿ›ก๏ธ The Three-Tier Reliability Fix

We implement three layers of defensive ingestion inside pi-langfuseโ€™s REST fallback:

1๏ธโƒฃ OpenTelemetry Visibility Buffer

Self-hosted Langfuse ingests traces asynchronously (Ingestion queue โž” Worker โž” DB). Spans are rarely queryable within ~1.5s after flush.

  • Fix: We increase OTEL_VISIBILITY_TIMEOUT_MS to 8_000ms (polling every 500ms) so the REST fallback only fires on genuine OTel exporter failures, preventing duplicate trace writes.

2๏ธโƒฃ Recursive Payload Truncation (safeValue)

No single trace input, output, or metadata object should choke the network. We recursively traverse payloads and truncate individual strings to a safe threshold.

// ๐Ÿš€ why: prevents massive string values (like git diffs) from exceeding limits
function truncateString(str: string, limit = 200_000): string {
  if (str.length > limit) {
    return str.slice(0, limit) + "\n\n... [Truncated by pi-langfuse due to length limits]"
  }
  return str
}
 
// ๐Ÿš€ what: recursively prunes objects and limits nesting depth to avoid stack overflows
function safeValue(val: unknown, depth = 0): unknown {
  if (val === null || val === undefined) return val
  if (typeof val === "string") return truncateString(val)
 
  if (Array.isArray(val)) {
    if (depth > 5) return "[Array truncated due to nesting depth]"
    return val.map((item) => safeValue(item, depth + 1))
  }
  if (typeof val === "object") {
    if (depth > 5) return "[Object truncated due to nesting depth]"
    const cleaned: Record<string, unknown> = {}
    for (const key of Object.keys(val)) {
      cleaned[key] = safeValue((val as any)[key], depth + 1)
    }
    return cleaned
  }
  return val
}

3๏ธโƒฃ Chunked REST Ingestion (chunkSize = 15)

Instead of sending the whole execution history in one request, we partition the batch.

// ๐Ÿš€ how: slice the payload array and wrap each POST in an independent try/catch block
const chunkSize = 15
for (let i = 0; i < batch.length; i += chunkSize) {
  const chunk = batch.slice(i, i + chunkSize)
  try {
    await rt.scoreClient.api.ingestion.batch({
      batch: chunk,
      metadata: {
        source: "pi-langfuse",
        fallback: "rest-ingestion",
        chunkIndex: Math.floor(i / chunkSize),
        totalChunks: Math.ceil(batch.length / chunkSize),
      },
    })
  } catch (e) {
    // ๐Ÿ’ก non-blocking: one chunk failing doesn't abort the remaining queue flushes
    console.warn(
      `๐Ÿ“Š Langfuse: Failed to ingest fallback batch chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(batch.length / chunkSize)}`,
      e,
    )
  }
}

๐Ÿ’ก Best Practices Checklist

  • โณ Deferred Flush: Always defer the final Langfuse shutdown/flush using setTimeout(shutdownRuntime, 0) on agent_end. Never let telemetry block the main agent execution turn or increase user latency.
  • ๐Ÿ›ก๏ธ PII & Credential Masking: Never log environment arrays or raw configs. Function arguments containing secret keys must be stripped before logging inputs.
  • ๐Ÿท๏ธ Clean Trace Naming: Choose descriptive names (code-generation, tool-execution) over generic IDs to ensure UI filtering is actually usable.
  • ๐Ÿ”— Session Attribution: Bind sessionId on traces using the Discord thread ID. This groups conversational multi-turn histories in the Langfuse Sessions view.