Nexus

Streaming Responses

SSE, NDJSON, WebSocket, and the Go channel API for streamed completions.

Nexus supports four wire formats for streamed completions, plus an idiomatic Go consumer API. The same gateway, the same Engine.CompleteStream, the same middleware pipeline — only the encoder at the HTTP boundary changes.

Wire formats

The proxy negotiates encoders from the request's Accept header, ?stream_format= query parameter, or X-Nexus-Stream-Format header (precedence: query → header → Accept → default). The default registry includes:

EncoderContent typeAliasesWhen to use
SSE (OpenAI envelope)text/event-streamsse, openaiDrop-in compatibility with OpenAI SDKs (default).
SSE (nexus-native)application/vnd.nexus.events+ssenexus, nexus-sseNamed events for fine-grained client routing.
NDJSONapplication/x-ndjsonndjson, jsonlLine-delimited JSON. Friendly for curl -N and Go HTTP clients.
WebSocket/v1/realtimen/aBidirectional. Required for multi-modal realtime.

Register additional encoders with proxy.WithStreamEncoder("custom/type", &myEncoder{}).

OpenAI-compatible SSE (default)

The proxy emits data: frames using the OpenAI chat.completion.chunk envelope. Tool-call deltas are forwarded with index/function.name/ function.arguments preserved. Reasoning content is surfaced as delta.reasoning_content (an Anthropic-on-OpenAI extension also used by DeepSeek and other gateways) so OpenAI clients silently ignore it while nexus-aware clients can read it. The final chunk carries usage followed by data: [DONE]. Mid-stream errors emit a typed event: error frame followed by [DONE] for clean client recovery.

data: {"id":"…","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"…","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"…","choices":[],"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}

data: [DONE]

NDJSON

One nexus-native StreamEvent per line, terminator \n. Discriminated by type. Heartbeats are {"type":"heartbeat"} lines; the stream ends with {"type":"done"} (no [DONE] sentinel — the discriminated event is self-describing).

{"type":"delta","model":"gpt-4o","delta":{"content":"Hello"}}
{"type":"reasoning","delta":{"reasoning":"Let me think..."}}
{"type":"tool_call","delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"q"}}]}}
{"type":"usage","usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}
{"type":"done"}

Native SSE (named events)

Same payload shape as NDJSON, but using SSE named events so EventSource listeners can route on event: without parsing the payload.

event: delta
data: {"type":"delta","delta":{"content":"Hello"}}

event: reasoning
data: {"type":"reasoning","delta":{"reasoning":"Let me think..."}}

event: usage
data: {"type":"usage","usage":{"total_tokens":8}}

event: done
data: {"type":"done"}

WebSocket

GET /v1/realtime upgrades to a WebSocket connection. The client sends a start envelope, then receives nexus-native StreamEvent frames as JSON text. Cancel mid-stream by sending an abort envelope. Multi-modal input (audio_chunk, image) is reserved for bidirectional providers like OpenAI Realtime — text deltas already flow through today.

client → server:
  {"type":"start","request":{"model":"gpt-4o","messages":[…]}}
  {"type":"abort"}                       (cancel mid-stream)
  {"type":"audio_chunk","format":"pcm16","b64":"…"}     (multi-modal in)
  {"type":"image","mime_type":"image/png","b64":"…"}    (multi-modal in)

server → client:
  {"type":"delta","delta":{"content":"Hello"}}
  {"type":"reasoning","delta":{"reasoning":"…"}}
  {"type":"usage","usage":{…}}
  {"type":"done"}                        (graceful close)
  {"type":"error","error":{…}}           (then close StatusInternalError)

Event types

TypeWhen emittedPayload
deltaEvery text/role/refusal/tool-call incrementdelta.content, delta.role, delta.refusal, delta.tool_calls
reasoningExtended-thinking models (Claude 3.7+, OpenAI o-series, DeepSeek R1)delta.reasoning
tool_callIncremental tool-call argument fragmentsdelta.tool_calls[i].function.arguments
audioMulti-modal audio chunks (Realtime/Live providers)audio.data, audio.format, audio.transcript
imageMulti-modal image chunksimage.data, image.url, image.mime_type
citationAnthropic citation deltascitation.url, citation.title, citation.quoted
usageToken totals (typically the final pre-done frame)usage.prompt_tokens, usage.completion_tokens, usage.total_tokens
errorMid-stream upstream failureerror.type, error.message, error.retryable
heartbeatIdle keepalive (15s default)none — SSE comment / NDJSON line / WS ping
doneStream completenone

Go consumer API

Engine.CompleteStream returns a provider.Stream with two consumption styles. Pick whichever fits your call site.

Iterator

stream, err := engine.CompleteStream(ctx, req)
if err != nil {
    return err
}
defer stream.Close()

for {
    chunk, err := stream.Next(ctx)
    if errors.Is(err, io.EOF) {
        break
    }
    if err != nil {
        return err
    }
    fmt.Print(chunk.Delta.Content)
}

Channel

events := provider.NewChannelAdapter(ctx, stream, provider.ChannelOptions{Buffer: 32})

for ev := range events {
    switch ev.Kind {
    case provider.EventReasoning:
        fmt.Printf("[think] %s", ev.Delta.Reasoning)
    case provider.EventDelta:
        fmt.Print(ev.Delta.Content)
    case provider.EventError:
        return fmt.Errorf("stream error: %s", ev.Err)
    }
}

Accumulator

For callers that want the merged final response (the same shape Engine.Complete would have returned):

resp, err := provider.Accumulate(ctx, stream)
// resp.Choices[0].Message.Content   — concatenated text
// resp.ThinkingContent              — concatenated reasoning
// resp.Choices[0].Message.ToolCalls — merged tool calls
// resp.Usage                        — final usage totals

Stream cache (record-and-replay)

Streaming responses can be recorded into a cache.StreamCache and replayed on subsequent identical requests. Useful for demos, integration tests, and deterministic latency benchmarks.

engine := nexus.NewEngine(
    nexus.WithProvider(openai.New(apiKey)),
    nexus.WithStreamCache(stores.NewMemoryStream(), cache.StreamCacheOptions{
        TTL:       10 * time.Minute,
        MaxFrames: 4096,         // abandon recording past this
        MaxBytes:  4 * 1024 * 1024,
        Mode:      cache.ReplayPaced, // preserve original cadence
    }),
)

Replay modes:

  • ReplayBurst — emit all frames immediately. Default.
  • ReplayPaced — sleep between frames to reproduce upstream timing.
  • ReplayFastForward — divide gaps by FFDivisor (default 4×).

Plugin lifecycle hooks

Streaming responses fire four discrete hooks that are not emitted for non-streaming completions:

type StreamStarted   interface { OnStreamStarted(ctx, requestID, model, providerName) error }
type ChunkReceived   interface { OnChunkReceived(ctx, requestID, kind, byteSize) error }
type StreamCompleted interface { OnStreamCompleted(ctx, requestID, model, providerName, elapsed, final) error }
type StreamFailed    interface { OnStreamFailed(ctx, requestID, model, err) error }

StreamCompleted receives the merged *CompletionResponse produced by provider.Accumulate — the same shape Engine.Complete would have returned, with content concatenated, tool calls merged by index/id, and usage from the final chunk. Use WithStreamLifecycleConfig to throttle OnChunkReceived (default: every chunk; recommended: 16 in production).

Cancellation and shutdown

The runner closes provider streams when:

  • The HTTP client disconnects (r.Context() cancels).
  • The proxy's Shutdown(ctx) is called (cancels every in-flight stream).
  • The provider returns an error (typed error event then [DONE]).
  • The consumer abandons the WebSocket connection.

Each provider stream registers a context.AfterFunc that closes the upstream HTTP body the moment ctx cancels — so a 30-second cancellation tears the upstream socket within one TCP RTT, not at the next chunk boundary.

See also

On this page