The SDK exposes two streaming primitives over Server-Sent Events: a state stream (subscribe / events) and a per-request extraction stream (streamExtraction). Both share one transport — retries, abort propagation, and frame validation live in HttpTransport, so consumer code only sees parsed event objects.
beliefs.subscribe(handler, options?)
Push state changes into a callback. Returns a Subscription with unsubscribe() and a done promise that resolves when the stream closes.
1const sub = beliefs.subscribe(
2 (event) => {
3 if (event.type === 'belief_records_updated') {
4 renderRecords(event.beliefRecords)
5 } else if (event.type === 'belief_records_stale') {
6 requestFullRefresh(event.reason)
7 }
8 },
9 {
10 onError: (err) => console.error(err),
11 onClose: () => console.log('stream closed'),
12 },
13)
14
15// Tear down when the React effect / job ends:
16sub.unsubscribe()
17await sub.doneOptions:
| Option | Default | What it does |
|---|---|---|
onError | console.error | Called when the underlying stream errors. |
onClose | — | Called once after a clean close. |
dropHeartbeats | true | Filter heartbeat frames before invoking handler. |
signal | — | AbortSignal; aborting cancels the SSE connection and resolves done. |
The signal option pairs cleanly with React effect cleanup or React Query's abort handling — pass the same controller and you don't need a finally block.
beliefs.events(options?)
Same stream, async-iterable face. Use this when you prefer for await or RxJS-style pipelines over the callback shape.
1const ac = new AbortController()
2
3for await (const event of beliefs.events({ signal: ac.signal })) {
4 if (event.type === 'belief_records_updated') {
5 renderRecords(event.beliefRecords)
6 }
7}Aborting the signal ends the iteration cleanly.
BeliefStreamEvent
Three frame types come off the state stream:
1type BeliefStreamEvent =
2 | {
3 type: 'belief_records_updated'
4 sessionId: string
5 spaceId: string
6 viewId: string
7 beliefRecords: BeliefRecord[]
8 groupIndex: number
9 totalGroups: number
10 timestamp: string
11 }
12 | {
13 type: 'belief_records_stale'
14 sessionId: string
15 spaceId: string
16 reason: string
17 timestamp: string
18 }
19 | { type: 'heartbeat'; timestamp: string }Heartbeats are dropped by default — set dropHeartbeats: false if you need keep-alive visibility for connection health UIs.
belief_records_stale is a hint to refresh from read() or snapshot() — the engine has detected something it cannot incrementally project.
beliefs.streamExtraction(request, handler?, options?)
Per-request extraction of beliefs from a content payload. Yields BeliefExtractionStreamChunk frames as the engine extracts; ends with a complete chunk or an error chunk.
1const stream = beliefs.streamExtraction({
2 content: longTranscript,
3 surface: 'voice',
4})
5
6for await (const chunk of stream) {
7 if (chunk.type === 'belief_event') {
8 appendIncrementalBelief(chunk.event)
9 } else if (chunk.type === 'complete') {
10 finalize(chunk.eventCount)
11 } else if (chunk.type === 'error') {
12 showError(chunk.message)
13 }
14}
15
16// Cancel mid-extraction:
17stream.cancel()You can pass an optional handler callback in addition to iterating — both run on every frame. Useful when one consumer wants the iterable for control flow and another wants a fire-and-forget callback (e.g., an analytics hook).
BeliefExtractionStreamChunk
1type BeliefExtractionStreamChunk =
2 | {
3 type: 'belief_event'
4 index: number
5 event: {
6 id: string
7 baseEvent: string
8 semanticLabel: string
9 text: string
10 actor: string
11 sourceMessageIds: string[]
12 }
13 }
14 | { type: 'complete'; lastProcessedMessageId: string; eventCount: number }
15 | { type: 'error'; message: string }Stream lifecycle: zero or more belief_event chunks → exactly one complete or one error chunk → stream ends.
beliefs.drift.watch(handler, options?)
SSE stream of per-agent reliability drift events. The engine snapshots a baseline at stream start, then emits a DriftEvent per (agent, evidence type) on each polling tick, with a driftDetected boolean derived from a drift threshold scaled to the baseline's own uncertainty.
1const sub = beliefs.drift.watch(
2 (event) => {
3 if (event.type === 'drift' && event.driftDetected) {
4 alert(`${event.agentId} drift on ${event.evidenceType}: shift=${event.meanShift.toFixed(3)} > ci=${event.ciHalfWidth.toFixed(3)}`)
5 }
6 },
7 { targetAgentId: 'researcher', pollIntervalMs: 30_000 },
8)
9
10// Tear down when done:
11sub.unsubscribe()
12await sub.doneOptions:
| Option | Default | What it does |
|---|---|---|
targetAgentId | — | Stream only this agent's events. |
pollIntervalMs | 10000 | Polling interval (min 1000, max 300000). |
zThreshold | 1.645 | Drift-threshold z-score (1.645 = 95% one-sided). |
dropHeartbeats | true | Filter heartbeat frames before invoking handler. |
onError | console.error | Stream-error callback. |
onClose | — | Called once on clean close. |
signal | — | AbortSignal. |
beliefs.drift.events(options?)
Same stream, async-iterable face. Same options minus onError/onClose/handler.
1for await (const event of beliefs.drift.events()) {
2 if (event.type === 'drift' && event.driftDetected) {
3 handleDrift(event)
4 }
5}DriftStreamEvent
1type DriftStreamEvent =
2 | {
3 type: 'drift'
4 agentId: string
5 evidenceType: string
6 timestamp: string
7 /** Engine-rated reliability at baseline (0–1). */
8 baselineMean: number
9 /** Engine-rated reliability now (0–1). */
10 currentMean: number
11 /** |currentMean - baselineMean|. */
12 meanShift: number
13 /** Engine-computed 95% confidence interval half-width on the baseline. */
14 ciHalfWidth: number
15 /** Engine-internal divergence metric (opaque scalar, see note below). */
16 klDivergence: number
17 /** True when meanShift > ciHalfWidth — drift past baseline noise. */
18 driftDetected: boolean
19 observationCount: number
20 }
21 | { type: 'heartbeat'; timestamp: string }driftDetected scales the threshold to the baseline's own uncertainty, so you don't have to pick scalars yourself. Use meanShift for magnitude (most interpretable), driftDetected for routing, and klDivergence only for cross-agent comparison — it's an engine-internal scalar with no fixed unit.
Auth
Both streams require apiKey or scopeToken auth. See Auth for setup details.