---
title: Streaming
description: "SSE-based live updates for belief state and extraction pipelines."
---

The SDK exposes two streaming primitives over Server-Sent Events: a state stream (`subscribe` / `events`) and a per-request extraction stream (`streamExtraction`). Both share one transport — retries, abort propagation, and frame validation live in `HttpTransport`, so consumer code only sees parsed event objects.

## `beliefs.subscribe(handler, options?)`

Push state changes into a callback. Returns a `Subscription` with `unsubscribe()` and a `done` promise that resolves when the stream closes.

```ts
const sub = beliefs.subscribe(
  (event) => {
    if (event.type === 'belief_records_updated') {
      renderRecords(event.beliefRecords)
    } else if (event.type === 'belief_records_stale') {
      requestFullRefresh(event.reason)
    }
  },
  {
    onError: (err) => console.error(err),
    onClose: () => console.log('stream closed'),
  },
)

// Tear down when the React effect / job ends:
sub.unsubscribe()
await sub.done
```

Options:

| Option | Default | What it does |
|--------|---------|--------------|
| `onError` | `console.error` | Called when the underlying stream errors. |
| `onClose` | — | Called once after a clean close. |
| `dropHeartbeats` | `true` | Filter heartbeat frames before invoking `handler`. |
| `signal` | — | `AbortSignal`; aborting cancels the SSE connection and resolves `done`. |

The `signal` option pairs cleanly with React effect cleanup or React Query's abort handling — pass the same controller and you don't need a `finally` block.

## `beliefs.events(options?)`

Same stream, async-iterable face. Use this when you prefer `for await` or RxJS-style pipelines over the callback shape.

```ts
const ac = new AbortController()

for await (const event of beliefs.events({ signal: ac.signal })) {
  if (event.type === 'belief_records_updated') {
    renderRecords(event.beliefRecords)
  }
}
```

Aborting the signal ends the iteration cleanly.

## `BeliefStreamEvent`

Three frame types come off the state stream:

```ts
type BeliefStreamEvent =
  | {
      type: 'belief_records_updated'
      sessionId: string
      spaceId: string
      viewId: string
      beliefRecords: BeliefRecord[]
      groupIndex: number
      totalGroups: number
      timestamp: string
    }
  | {
      type: 'belief_records_stale'
      sessionId: string
      spaceId: string
      reason: string
      timestamp: string
    }
  | { type: 'heartbeat'; timestamp: string }
```

Heartbeats are dropped by default — set `dropHeartbeats: false` if you need keep-alive visibility for connection health UIs.

`belief_records_stale` is a hint to refresh from `read()` or `snapshot()` — the engine has detected something it cannot incrementally project.

## `beliefs.streamExtraction(request, handler?, options?)`

Per-request extraction of beliefs from a content payload. Yields `BeliefExtractionStreamChunk` frames as the engine extracts; ends with a `complete` chunk or an `error` chunk.

```ts
const stream = beliefs.streamExtraction({
  content: longTranscript,
  surface: 'voice',
})

for await (const chunk of stream) {
  if (chunk.type === 'belief_event') {
    appendIncrementalBelief(chunk.event)
  } else if (chunk.type === 'complete') {
    finalize(chunk.eventCount)
  } else if (chunk.type === 'error') {
    showError(chunk.message)
  }
}

// Cancel mid-extraction:
stream.cancel()
```

You can pass an optional `handler` callback in addition to iterating — both run on every frame. Useful when one consumer wants the iterable for control flow and another wants a fire-and-forget callback (e.g., an analytics hook).

## `BeliefExtractionStreamChunk`

```ts
type BeliefExtractionStreamChunk =
  | {
      type: 'belief_event'
      index: number
      event: {
        id: string
        baseEvent: string
        semanticLabel: string
        text: string
        actor: string
        sourceMessageIds: string[]
      }
    }
  | { type: 'complete'; lastProcessedMessageId: string; eventCount: number }
  | { type: 'error'; message: string }
```

Stream lifecycle: zero or more `belief_event` chunks → exactly one `complete` *or* one `error` chunk → stream ends.

## `beliefs.drift.watch(handler, options?)`

SSE stream of per-agent reliability drift events. The engine snapshots a baseline at stream start, then emits a `DriftEvent` per (agent, evidence type) on each polling tick, with a `driftDetected` boolean derived from a drift threshold scaled to the baseline's own uncertainty.

```ts
const sub = beliefs.drift.watch(
  (event) => {
    if (event.type === 'drift' && event.driftDetected) {
      alert(`${event.agentId} drift on ${event.evidenceType}: shift=${event.meanShift.toFixed(3)} > ci=${event.ciHalfWidth.toFixed(3)}`)
    }
  },
  { targetAgentId: 'researcher', pollIntervalMs: 30_000 },
)

// Tear down when done:
sub.unsubscribe()
await sub.done
```

Options:

| Option | Default | What it does |
|--------|---------|--------------|
| `targetAgentId` | — | Stream only this agent's events. |
| `pollIntervalMs` | `10000` | Polling interval (min 1000, max 300000). |
| `zThreshold` | `1.645` | Drift-threshold z-score (1.645 = 95% one-sided). |
| `dropHeartbeats` | `true` | Filter heartbeat frames before invoking `handler`. |
| `onError` | `console.error` | Stream-error callback. |
| `onClose` | — | Called once on clean close. |
| `signal` | — | `AbortSignal`. |

## `beliefs.drift.events(options?)`

Same stream, async-iterable face. Same options minus `onError`/`onClose`/`handler`.

```ts
for await (const event of beliefs.drift.events()) {
  if (event.type === 'drift' && event.driftDetected) {
    handleDrift(event)
  }
}
```

## `DriftStreamEvent`

```ts
type DriftStreamEvent =
  | {
      type: 'drift'
      agentId: string
      evidenceType: string
      timestamp: string
      /** Engine-rated reliability at baseline (0–1). */
      baselineMean: number
      /** Engine-rated reliability now (0–1). */
      currentMean: number
      /** |currentMean - baselineMean|. */
      meanShift: number
      /** Engine-computed 95% confidence interval half-width on the baseline. */
      ciHalfWidth: number
      /** Engine-internal divergence metric (opaque scalar, see note below). */
      klDivergence: number
      /** True when meanShift > ciHalfWidth — drift past baseline noise. */
      driftDetected: boolean
      observationCount: number
    }
  | { type: 'heartbeat'; timestamp: string }
```

`driftDetected` scales the threshold to the baseline's own uncertainty, so you don't have to pick scalars yourself. Use `meanShift` for magnitude (most interpretable), `driftDetected` for routing, and `klDivergence` only for cross-agent comparison — it's an engine-internal scalar with no fixed unit.

<Callout type="info" title="Auth">
Both streams require `apiKey` or `scopeToken` auth. See [Auth](/dev/sdk/auth) for setup details.
</Callout>
