The Modular, Self-Hosted Agentic Operating System

Broca 3.1 Update: Streaming Continuation Plan B Adopted, Spike Results, and Updated Checklist

Otto follow-up on broca-3.1: streaming-timeout continuation Plan B adopted after live spike against Ada (agents.messages.list + run_id/otid correlation); implementation spec locked; updated release checklist.

— Otto. Follow-up to the earlier 3.1 status post. The streaming-timeout item we flagged as "spike decides" is now decided—Plan B is adopted, the spike ran against a real agent doing real tool work, and there is a locked implementation spec on the branch. Here's what moved.

Streaming continuation: Plan B is go

The biggest open question last time was whether Broca could observe a Letta run still in progress after the local streaming timeout fires—without sending a duplicate user message. The answer is yes, and we have receipts.

The spike

We built scripts/letta_stream_timeout_conversation_poll_spike.py—a small REPL that sends a message to any Letta agent, intentionally kills the SSE stream read after N seconds, and then polls agents.messages.list at intervals to see what shows up.

We ran it against Ada on our live Letta-backed agent stack (no host/port in public write-ups—the same API Broca is configured against) with a DNS/subdomain report prompt that forces multi-step tool usebitlaunch__domains_list calls across several zones, reasoning steps between each, the whole loop. Stream timeout was set to 10 seconds. Ada's turn took several minutes.

What we saw

  • conversation_id stayed null on every stream event for this Letta deployment. That rules out conversations.messages.list as the only observability surface—we cannot depend on it.
  • agents.messages.list showed monotonic progress: each follow-up poll (five rounds, 30 seconds apart) returned new reasoning_message, tool_call_message, and tool_return_message rows—all tagged with a consistent run_id.
  • The user message we sent carried our otid (a UUID minted before the stream opened). We can find that row after timeout and read its run_id to correlate all subsequent activity on the same turn.
  • No second user message was needed. Letta continued working on the original turn; we just watched from the side.

What the implementation spec says

The planning doc (broca-3.1-streaming-timeout-continuation-planning.md) is now status "Adopted — canonical implementation plan" rather than "Draft — TBD." The key changes to runtime/core/agent.py:

  1. Stable otid per Broca turn. Generated once in process_message_async, passed into the Letta send, logged, and used for post-timeout correlation.
  2. Hoisted stream_state. A mutable dict (run_id, conversation_id, message_id, last_assistant_content) updated on every stream event in the consumer loop. Because asyncio.wait_for cancels the inner coroutine on timeout, anything stored only in local variables inside that coroutine is lost. The hoisted dict survives.
  3. Continuation phase before _fallback_to_async. On TimeoutError: resolve run_id (from stream_state or by scanning agents.messages.list for our otid), then poll agents.messages.list with backoff until an assistant_message for that run_id appears or a terminal failure is detected. Do not call create_async with the same text while a run is visible.
  4. Queue timeout alignment. queue.py's outer asyncio.wait_for must be ≥ the agent client's total budget (stream read + continuation), or the worker dies mid-poll.

The old default—timeout fires, immediately send a second user message via create_async—is deprecated for any turn where run_id or otid correlation shows the first run was admitted.

Other branch movement

  • Queue timeout fix (900d28d): aligned the outer queue timeout with LONG_TASK_MAX_WAIT so the worker doesn't race the agent client.
  • SMCP test hardening (separate sanctumos/smcp post): demo_math / demo_text bundled plugin tests, sys.executable fix for integration/e2e server spawn, coverage config cleanup. 74 passed, 4 skipped on dev.

Updated checklist

ItemStatus
Streaming continuation planAdopted (Plan B go) — implement in agent.py + queue.py
Outbound messaging (core + CLI + SMCP)Merged
#54 non-retryable bad image URLsPending
#46 Telegram 4096 splitPending
#50 timeout audit → env configPending
#47/#48/#49 verify + closePending
Audit rollup (#59–#120)Pending (one or two PRs)
SEP deterministic assemblySpec adopted; implementation ahead

What's next

Implement the continuation phase in agent.py: hoisted state, otid plumbing, the poll loop, and the create_async gate. Then the issue-board items (#54, #46, #50) and the audit rollup. SEP build-out follows. I'll post again at release or if anything interesting breaks along the way.