Nexus

Streaming API Reference

Go types and HTTP wire formats for streamed completions.

This reference enumerates every public type and option introduced for streamed completions. For high-level usage see the Streaming guide.

provider package

provider.Stream

type Stream interface {
    Next(ctx context.Context) (*StreamChunk, error) // io.EOF when done
    Close() error
    Usage() *Usage
}

provider.StreamChunk

type StreamChunk struct {
    ID, Provider, Model string
    Kind         EventKind  // discriminator; "" = legacy delta
    Delta        Delta
    FinishReason string
    Usage        *Usage  // EventUsage / piggyback
    Err          string  // EventError
    Created      int64   // unix-ms; cache-replay timing
}

provider.EventKind

ConstantWhen emitted
EventDelta (zero value)legacy text/tool-call delta
EventMessageStartupstream start event (Anthropic)
EventMessageStopupstream stop event
EventReasoningextended-thinking content
EventToolCallDeltaincremental tool-call fragments
EventAudiomulti-modal audio chunk
EventImagemulti-modal image chunk
EventCitationcitation reference
EventUsageterminal token snapshot
EventHeartbeatkeepalive ping
EventErrorrecoverable in-band error

provider.Delta

type Delta struct {
    Role, Content string
    ToolCalls     []ToolCall
    Reasoning     string      // o-series, Claude thinking, DeepSeek R1
    Refusal       string      // OpenAI refusal
    Citations     []Citation
    Audio         *AudioChunk
    Image         *ImageChunk
    Transcript    string
}

provider.Accumulate / provider.NewAccumulator

func Accumulate(ctx context.Context, s Stream) (*CompletionResponse, error)

type Accumulator struct{ /* ... */ }
func NewAccumulator() *Accumulator
func (a *Accumulator) Add(c *StreamChunk)
func (a *Accumulator) Finalize(usageFallback func() *Usage) *CompletionResponse

Drains a stream (or accepts incremental chunks) and produces the merged CompletionResponse — the same shape Provider.Complete would have returned.

provider.NewChannelAdapter

type ChannelOptions struct {
    Buffer         int  // default 16
    EmitErrorChunk bool // default true
}

func NewChannelAdapter(ctx context.Context, s Stream, opts ChannelOptions) <-chan StreamChunk

Spawns a goroutine that reads chunks and publishes them on the returned channel. Closes on io.EOF or ctx cancel. On error, emits a final EventError chunk before close.

provider.Capabilities (streaming flags)

type Capabilities struct {
    // ... pre-existing fields ...
    StreamingReasoning bool // delta.reasoning
    StreamingTools     bool // tool_call deltas with index
    StreamingAudio     bool // delta.audio
    StreamingCitations bool // delta.citations
    RealtimeAudio      bool // OpenAI Realtime
    RealtimeVideo      bool
    LiveBidi           bool // Gemini Live
}

httpstream package

httpstream.StreamEncoder

type StreamEncoder interface {
    ContentType() string
    WriteHeaders(w http.ResponseWriter)
    EncodeEvent(w io.Writer, e *StreamEvent) error
    EncodeError(w io.Writer, err *WireError) error
    Heartbeat(w io.Writer) error
    End(w io.Writer) error
}

Built-in encoders:

  • httpstream.NewSSEOpenAIEncoder()text/event-stream, OpenAI envelope (default).
  • httpstream.NewSSENativeEncoder()application/vnd.nexus.events+sse, named events.
  • httpstream.NewNDJSONEncoder()application/x-ndjson, one event per line.

httpstream.Run

type RunOptions struct {
    HeartbeatInterval time.Duration // default 15s; -1 disables
    RequestID         string
    OnError           func(error)
}

func Run(ctx context.Context, w http.ResponseWriter, s provider.Stream,
         enc StreamEncoder, opts RunOptions)

Single shared streaming event loop with heartbeats, sanitized error events, and one-shot Stream.Close.

httpstream.Negotiate

func Negotiate(r *http.Request, reg *Registry) StreamEncoder

Picks an encoder by precedence: ?stream_format= query → X-Nexus-Stream-Format header → Accept → registry default.

httpstream.WSHandler

type WSOptions struct {
    AcceptOrigins     []string      // OriginPatterns; ["*"] disables verify
    SendBuffer        int           // default 64
    HeartbeatInterval time.Duration // default 20s
}

func NewWSHandler(streamer CompletionStreamer, opts WSOptions) *WSHandler

Bidirectional WebSocket handler. Mounted automatically at /v1/realtime on proxy.New and api.New unless WithoutWebSocket() is passed.

cache package

cache.StreamCache

type StreamCache interface {
    GetStream(ctx context.Context, key string) ([]StreamFrame, error)
    SetStream(ctx context.Context, key string, frames []StreamFrame, ttl time.Duration) error
    DeleteStream(ctx context.Context, key string) error
}

type StreamFrame struct {
    Chunk    *provider.StreamChunk
    OffsetMs int64
}

type StreamCacheOptions struct {
    TTL       time.Duration
    MaxFrames int
    MaxBytes  int
    Mode      ReplayMode
    FFDivisor float64 // ReplayFastForward — defaults to 4 if non-positive
}

Built-in backends:

  • cache/stores.NewMemoryStream() — in-memory with TTL + soft max-keys cap.
  • cache/stores.NewRedisStream(client) — Redis-backed, gob-serialized.

pipeline/middlewares package

middlewares.NewStreamLifecycle

type StreamLifecycleConfig struct {
    EmitEveryNChunks int                              // 0 or 1 = every chunk
    QuotaResolver    func(ctx context.Context) StreamQuota
}

type StreamQuota struct {
    MaxDuration time.Duration
    MaxTokens   int
}

func NewStreamLifecycle(r *plugin.Registry, cfg StreamLifecycleConfig) *StreamLifecycleMiddleware

Priority 545. Fires the four streaming plugin hooks and (when configured) enforces per-request stream quotas.

middlewares.IsQuotaExceeded

func IsQuotaExceeded(err error) bool

Detects errors raised by the lifecycle middleware's quota watchdog.

plugin package

Streaming hooks

type StreamStarted   interface { OnStreamStarted(ctx, requestID, model, providerName) error }
type ChunkReceived   interface { OnChunkReceived(ctx, requestID, kind EventKind, byteSize int) error }
type StreamCompleted interface { OnStreamCompleted(ctx, requestID, model, providerName, elapsed, final *CompletionResponse) error }
type StreamFailed    interface { OnStreamFailed(ctx, requestID, model, err) error }

Implemented by observability.MetricsExtension, audit_hook.Extension, and relay_hook.Extension out of the box.

nexus (gateway options)

func WithStreamCache(sc cache.StreamCache, opts cache.StreamCacheOptions) Option
func WithStreamLifecycleConfig(cfg middlewares.StreamLifecycleConfig) Option

proxy / api (HTTP options)

func WithStreamEncoder(contentType string, enc httpstream.StreamEncoder) Option
func WithStreamEncoderAlias(alias, contentType string) Option // proxy only
func WithWebSocket(opts httpstream.WSOptions) Option
func WithoutWebSocket() Option

Proxy.Shutdown(ctx) cancels every in-flight streaming request.

tenant (streaming quotas)

type Quota struct {
    // ... pre-existing fields ...
    MaxStreamDuration time.Duration
    MaxStreamTokens   int
}

Read by your QuotaResolver and surfaced via StreamLifecycleConfig.

On this page