Publisher Helpers
Publisher helpers provide a unified way to publish and subscribe to events across different storage backends in oRPC applications. They support both static and dynamic event names, along with optional replay of missed events for subscribers.
Installation
npm install @orpc/publisher@betayarn add @orpc/publisher@betapnpm add @orpc/publisher@betabun add @orpc/publisher@betadeno add npm:@orpc/publisher@betaBasic Usage
The core concept is the Publisher interface, which defines a standard way to publish events and subscribe to them. You can create your own publisher or use one of the provided adapters for popular storage backends. The publish method accepts an event name and payload, while subscribe lets you listen to specific events using either callback or iterator styles.
const publisher = new MemoryPublisher<{
'something-updated': {
id: string
}
}>()
const live = os
.handler(async function* ({ input, signal, lastEventId }) {
const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
for await (const payload of iterator) {
// Handle payload here or yield directly to client
yield payload
}
})
const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
await publisher.publish('something-updated', { id: input.id })
})TIP
The publisher supports both static and dynamic event names.
const publisher = new MemoryPublisher<Record<string, { message: string }>>()Adapters
| Name | Replay Support | Adapter for |
|---|---|---|
MemoryPublisher | ✅ | In-memory storage |
RedisPublisher | ✅ | Redis |
UpstashPublisher | ✅ | Upstash Redis |
import { MemoryPublisher } from '@orpc/publisher/memory'import { createClient } from 'redis'
import { RedisPublisher } from '@orpc/publisher/redis'
const client = createClient({ url: 'redis://localhost:6379' })
// RedisRateLimiter lazily connects to Redis when needed.
// You can still call `client.connect()` manually, but it is optional.
await client.connect()
const publisher = new RedisPublisher(client, {
subscriber: client.duplicate(), // Redis client for subscribing to pub/sub (default: client.duplicate())
prefix: 'orpc:', // Optional Redis key prefix
serializer: undefined, // Optional custom serializer
})import { Redis } from '@upstash/redis'
import { UpstashPublisher } from '@orpc/publisher/upstash'
const redis = Redis.fromEnv()
const publisher = new UpstashPublisher(redis, {
prefix: 'orpc:', // Optional Redis key prefix
serializer: undefined, // Optional custom serializer
})Replay Missing Events
Some adapters can replay events missed while a subscriber is offline. This feature is usually disabled by default, but you can enable it when creating the publisher. When enabled, the publisher automatically manages event ids and attempts to replay events since the last event id provided by the subscriber.
const publisher = new MemoryPublisher({
replay: {
enabled: true, // Enable replaying missed events
seconds: 60 * 5, // TTL in seconds
}
})
const iterator = publisher.subscribe('something-updated', {
signal,
lastEventId, // The publisher will attempt to replay missed events since this event id
})WARNING
When replay is enabled, the publisher manages event ids automatically. This means:
- Any event id provided during publishing is ignored
- When subscribing, you must preserve and forward the event id when yielding custom payloads
import { getEventMeta, withEventMeta } from '@orpc/server'
const live = os
.handler(async function* ({ input, signal, lastEventId }) {
const iterator = publisher.subscribe('something-updated', { signal, lastEventId })
for await (const payload of iterator) {
// Preserve event id when yielding custom payloads
const id = getEventMeta(payload)?.id
yield withEventMeta({ custom: 'value' }, { id })
}
})
const publish = os
.input(z.object({ id: z.string() }))
.handler(async ({ input }) => {
// The event id 'this-will-be-ignored' will be replaced by the publisher
await publisher.publish(
'something-updated',
withEventMeta({ id: input.id }, { id: 'this-will-be-ignored' })
)
})Client Reconnection
On the client, you can use the Retry Plugin, which automatically controls and passes lastEventId to the server when reconnecting. Alternatively, you can manage lastEventId manually:
import { getEventMeta } from '@orpc/client'
let lastEventId: string | undefined
while (true) {
try {
const iterator = await client.live('input', { lastEventId })
for await (const payload of iterator) {
lastEventId = getEventMeta(payload)?.id // Update lastEventId
console.log(payload)
}
}
catch {
await new Promise(resolve => setTimeout(resolve, 1000)) // Wait 1 second before retrying
}
}
