Streaming API Reference
Go types and HTTP wire formats for streamed completions.
This reference enumerates every public type and option introduced for streamed completions. For high-level usage see the Streaming guide.
provider package
provider.Stream
type Stream interface {
Next(ctx context.Context) (*StreamChunk, error) // io.EOF when done
Close() error
Usage() *Usage
}provider.StreamChunk
type StreamChunk struct {
ID, Provider, Model string
Kind EventKind // discriminator; "" = legacy delta
Delta Delta
FinishReason string
Usage *Usage // EventUsage / piggyback
Err string // EventError
Created int64 // unix-ms; cache-replay timing
}provider.EventKind
| Constant | When emitted |
|---|---|
EventDelta (zero value) | legacy text/tool-call delta |
EventMessageStart | upstream start event (Anthropic) |
EventMessageStop | upstream stop event |
EventReasoning | extended-thinking content |
EventToolCallDelta | incremental tool-call fragments |
EventAudio | multi-modal audio chunk |
EventImage | multi-modal image chunk |
EventCitation | citation reference |
EventUsage | terminal token snapshot |
EventHeartbeat | keepalive ping |
EventError | recoverable in-band error |
provider.Delta
type Delta struct {
Role, Content string
ToolCalls []ToolCall
Reasoning string // o-series, Claude thinking, DeepSeek R1
Refusal string // OpenAI refusal
Citations []Citation
Audio *AudioChunk
Image *ImageChunk
Transcript string
}provider.Accumulate / provider.NewAccumulator
func Accumulate(ctx context.Context, s Stream) (*CompletionResponse, error)
type Accumulator struct{ /* ... */ }
func NewAccumulator() *Accumulator
func (a *Accumulator) Add(c *StreamChunk)
func (a *Accumulator) Finalize(usageFallback func() *Usage) *CompletionResponseDrains a stream (or accepts incremental chunks) and produces the merged
CompletionResponse — the same shape Provider.Complete would have
returned.
provider.NewChannelAdapter
type ChannelOptions struct {
Buffer int // default 16
EmitErrorChunk bool // default true
}
func NewChannelAdapter(ctx context.Context, s Stream, opts ChannelOptions) <-chan StreamChunkSpawns a goroutine that reads chunks and publishes them on the returned
channel. Closes on io.EOF or ctx cancel. On error, emits a final
EventError chunk before close.
provider.Capabilities (streaming flags)
type Capabilities struct {
// ... pre-existing fields ...
StreamingReasoning bool // delta.reasoning
StreamingTools bool // tool_call deltas with index
StreamingAudio bool // delta.audio
StreamingCitations bool // delta.citations
RealtimeAudio bool // OpenAI Realtime
RealtimeVideo bool
LiveBidi bool // Gemini Live
}httpstream package
httpstream.StreamEncoder
type StreamEncoder interface {
ContentType() string
WriteHeaders(w http.ResponseWriter)
EncodeEvent(w io.Writer, e *StreamEvent) error
EncodeError(w io.Writer, err *WireError) error
Heartbeat(w io.Writer) error
End(w io.Writer) error
}Built-in encoders:
httpstream.NewSSEOpenAIEncoder()—text/event-stream, OpenAI envelope (default).httpstream.NewSSENativeEncoder()—application/vnd.nexus.events+sse, named events.httpstream.NewNDJSONEncoder()—application/x-ndjson, one event per line.
httpstream.Run
type RunOptions struct {
HeartbeatInterval time.Duration // default 15s; -1 disables
RequestID string
OnError func(error)
}
func Run(ctx context.Context, w http.ResponseWriter, s provider.Stream,
enc StreamEncoder, opts RunOptions)Single shared streaming event loop with heartbeats, sanitized error
events, and one-shot Stream.Close.
httpstream.Negotiate
func Negotiate(r *http.Request, reg *Registry) StreamEncoderPicks an encoder by precedence: ?stream_format= query →
X-Nexus-Stream-Format header → Accept → registry default.
httpstream.WSHandler
type WSOptions struct {
AcceptOrigins []string // OriginPatterns; ["*"] disables verify
SendBuffer int // default 64
HeartbeatInterval time.Duration // default 20s
}
func NewWSHandler(streamer CompletionStreamer, opts WSOptions) *WSHandlerBidirectional WebSocket handler. Mounted automatically at /v1/realtime
on proxy.New and api.New unless WithoutWebSocket() is passed.
cache package
cache.StreamCache
type StreamCache interface {
GetStream(ctx context.Context, key string) ([]StreamFrame, error)
SetStream(ctx context.Context, key string, frames []StreamFrame, ttl time.Duration) error
DeleteStream(ctx context.Context, key string) error
}
type StreamFrame struct {
Chunk *provider.StreamChunk
OffsetMs int64
}
type StreamCacheOptions struct {
TTL time.Duration
MaxFrames int
MaxBytes int
Mode ReplayMode
FFDivisor float64 // ReplayFastForward — defaults to 4 if non-positive
}Built-in backends:
cache/stores.NewMemoryStream()— in-memory with TTL + soft max-keys cap.cache/stores.NewRedisStream(client)— Redis-backed, gob-serialized.
pipeline/middlewares package
middlewares.NewStreamLifecycle
type StreamLifecycleConfig struct {
EmitEveryNChunks int // 0 or 1 = every chunk
QuotaResolver func(ctx context.Context) StreamQuota
}
type StreamQuota struct {
MaxDuration time.Duration
MaxTokens int
}
func NewStreamLifecycle(r *plugin.Registry, cfg StreamLifecycleConfig) *StreamLifecycleMiddlewarePriority 545. Fires the four streaming plugin hooks and (when configured) enforces per-request stream quotas.
middlewares.IsQuotaExceeded
func IsQuotaExceeded(err error) boolDetects errors raised by the lifecycle middleware's quota watchdog.
plugin package
Streaming hooks
type StreamStarted interface { OnStreamStarted(ctx, requestID, model, providerName) error }
type ChunkReceived interface { OnChunkReceived(ctx, requestID, kind EventKind, byteSize int) error }
type StreamCompleted interface { OnStreamCompleted(ctx, requestID, model, providerName, elapsed, final *CompletionResponse) error }
type StreamFailed interface { OnStreamFailed(ctx, requestID, model, err) error }Implemented by observability.MetricsExtension, audit_hook.Extension,
and relay_hook.Extension out of the box.
nexus (gateway options)
func WithStreamCache(sc cache.StreamCache, opts cache.StreamCacheOptions) Option
func WithStreamLifecycleConfig(cfg middlewares.StreamLifecycleConfig) Optionproxy / api (HTTP options)
func WithStreamEncoder(contentType string, enc httpstream.StreamEncoder) Option
func WithStreamEncoderAlias(alias, contentType string) Option // proxy only
func WithWebSocket(opts httpstream.WSOptions) Option
func WithoutWebSocket() OptionProxy.Shutdown(ctx) cancels every in-flight streaming request.
tenant (streaming quotas)
type Quota struct {
// ... pre-existing fields ...
MaxStreamDuration time.Duration
MaxStreamTokens int
}Read by your QuotaResolver and surfaced via StreamLifecycleConfig.