Streaming Responses
SSE, NDJSON, WebSocket, and the Go channel API for streamed completions.
Nexus supports four wire formats for streamed completions, plus an idiomatic
Go consumer API. The same gateway, the same Engine.CompleteStream, the
same middleware pipeline — only the encoder at the HTTP boundary changes.
Wire formats
The proxy negotiates encoders from the request's Accept header,
?stream_format= query parameter, or X-Nexus-Stream-Format header
(precedence: query → header → Accept → default). The default registry
includes:
| Encoder | Content type | Aliases | When to use |
|---|---|---|---|
| SSE (OpenAI envelope) | text/event-stream | sse, openai | Drop-in compatibility with OpenAI SDKs (default). |
| SSE (nexus-native) | application/vnd.nexus.events+sse | nexus, nexus-sse | Named events for fine-grained client routing. |
| NDJSON | application/x-ndjson | ndjson, jsonl | Line-delimited JSON. Friendly for curl -N and Go HTTP clients. |
| WebSocket | /v1/realtime | n/a | Bidirectional. Required for multi-modal realtime. |
Register additional encoders with proxy.WithStreamEncoder("custom/type", &myEncoder{}).
OpenAI-compatible SSE (default)
The proxy emits data: frames using the OpenAI chat.completion.chunk
envelope. Tool-call deltas are forwarded with index/function.name/
function.arguments preserved. Reasoning content is surfaced as
delta.reasoning_content (an Anthropic-on-OpenAI extension also used by
DeepSeek and other gateways) so OpenAI clients silently ignore it while
nexus-aware clients can read it. The final chunk carries usage followed
by data: [DONE]. Mid-stream errors emit a typed event: error frame
followed by [DONE] for clean client recovery.
data: {"id":"…","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"…","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"…","choices":[],"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}
data: [DONE]NDJSON
One nexus-native StreamEvent per line, terminator \n. Discriminated by
type. Heartbeats are {"type":"heartbeat"} lines; the stream ends with
{"type":"done"} (no [DONE] sentinel — the discriminated event is
self-describing).
{"type":"delta","model":"gpt-4o","delta":{"content":"Hello"}}
{"type":"reasoning","delta":{"reasoning":"Let me think..."}}
{"type":"tool_call","delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"q"}}]}}
{"type":"usage","usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}
{"type":"done"}Native SSE (named events)
Same payload shape as NDJSON, but using SSE named events so EventSource
listeners can route on event: without parsing the payload.
event: delta
data: {"type":"delta","delta":{"content":"Hello"}}
event: reasoning
data: {"type":"reasoning","delta":{"reasoning":"Let me think..."}}
event: usage
data: {"type":"usage","usage":{"total_tokens":8}}
event: done
data: {"type":"done"}WebSocket
GET /v1/realtime upgrades to a WebSocket connection. The client sends a
start envelope, then receives nexus-native StreamEvent frames as JSON
text. Cancel mid-stream by sending an abort envelope. Multi-modal input
(audio_chunk, image) is reserved for bidirectional providers like
OpenAI Realtime — text deltas already flow through today.
client → server:
{"type":"start","request":{"model":"gpt-4o","messages":[…]}}
{"type":"abort"} (cancel mid-stream)
{"type":"audio_chunk","format":"pcm16","b64":"…"} (multi-modal in)
{"type":"image","mime_type":"image/png","b64":"…"} (multi-modal in)
server → client:
{"type":"delta","delta":{"content":"Hello"}}
{"type":"reasoning","delta":{"reasoning":"…"}}
{"type":"usage","usage":{…}}
{"type":"done"} (graceful close)
{"type":"error","error":{…}} (then close StatusInternalError)Event types
| Type | When emitted | Payload |
|---|---|---|
delta | Every text/role/refusal/tool-call increment | delta.content, delta.role, delta.refusal, delta.tool_calls |
reasoning | Extended-thinking models (Claude 3.7+, OpenAI o-series, DeepSeek R1) | delta.reasoning |
tool_call | Incremental tool-call argument fragments | delta.tool_calls[i].function.arguments |
audio | Multi-modal audio chunks (Realtime/Live providers) | audio.data, audio.format, audio.transcript |
image | Multi-modal image chunks | image.data, image.url, image.mime_type |
citation | Anthropic citation deltas | citation.url, citation.title, citation.quoted |
usage | Token totals (typically the final pre-done frame) | usage.prompt_tokens, usage.completion_tokens, usage.total_tokens |
error | Mid-stream upstream failure | error.type, error.message, error.retryable |
heartbeat | Idle keepalive (15s default) | none — SSE comment / NDJSON line / WS ping |
done | Stream complete | none |
Go consumer API
Engine.CompleteStream returns a provider.Stream with two consumption
styles. Pick whichever fits your call site.
Iterator
stream, err := engine.CompleteStream(ctx, req)
if err != nil {
return err
}
defer stream.Close()
for {
chunk, err := stream.Next(ctx)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return err
}
fmt.Print(chunk.Delta.Content)
}Channel
events := provider.NewChannelAdapter(ctx, stream, provider.ChannelOptions{Buffer: 32})
for ev := range events {
switch ev.Kind {
case provider.EventReasoning:
fmt.Printf("[think] %s", ev.Delta.Reasoning)
case provider.EventDelta:
fmt.Print(ev.Delta.Content)
case provider.EventError:
return fmt.Errorf("stream error: %s", ev.Err)
}
}Accumulator
For callers that want the merged final response (the same shape
Engine.Complete would have returned):
resp, err := provider.Accumulate(ctx, stream)
// resp.Choices[0].Message.Content — concatenated text
// resp.ThinkingContent — concatenated reasoning
// resp.Choices[0].Message.ToolCalls — merged tool calls
// resp.Usage — final usage totalsStream cache (record-and-replay)
Streaming responses can be recorded into a cache.StreamCache and replayed
on subsequent identical requests. Useful for demos, integration tests, and
deterministic latency benchmarks.
engine := nexus.NewEngine(
nexus.WithProvider(openai.New(apiKey)),
nexus.WithStreamCache(stores.NewMemoryStream(), cache.StreamCacheOptions{
TTL: 10 * time.Minute,
MaxFrames: 4096, // abandon recording past this
MaxBytes: 4 * 1024 * 1024,
Mode: cache.ReplayPaced, // preserve original cadence
}),
)Replay modes:
ReplayBurst— emit all frames immediately. Default.ReplayPaced— sleep between frames to reproduce upstream timing.ReplayFastForward— divide gaps byFFDivisor(default 4×).
Plugin lifecycle hooks
Streaming responses fire four discrete hooks that are not emitted for non-streaming completions:
type StreamStarted interface { OnStreamStarted(ctx, requestID, model, providerName) error }
type ChunkReceived interface { OnChunkReceived(ctx, requestID, kind, byteSize) error }
type StreamCompleted interface { OnStreamCompleted(ctx, requestID, model, providerName, elapsed, final) error }
type StreamFailed interface { OnStreamFailed(ctx, requestID, model, err) error }StreamCompleted receives the merged *CompletionResponse produced by
provider.Accumulate — the same shape Engine.Complete would have
returned, with content concatenated, tool calls merged by index/id, and
usage from the final chunk. Use WithStreamLifecycleConfig to throttle
OnChunkReceived (default: every chunk; recommended: 16 in production).
Cancellation and shutdown
The runner closes provider streams when:
- The HTTP client disconnects (
r.Context()cancels). - The proxy's
Shutdown(ctx)is called (cancels every in-flight stream). - The provider returns an error (typed error event then
[DONE]). - The consumer abandons the WebSocket connection.
Each provider stream registers a context.AfterFunc that closes the
upstream HTTP body the moment ctx cancels — so a 30-second cancellation
tears the upstream socket within one TCP RTT, not at the next chunk
boundary.
See also
_examples/streaming— channel API + accumulator demo._examples/proxy— full proxy with stream cache + WebSocket + metrics.