Skip to main content

ADR-0031: Redis Streams as Event Bus

Status

Accepted - 2025-01-26


Context

TVL Platform needs lightweight event bus for domain events (booking.created, property.updated) with consumer groups and persistence.


Decision

Redis Streams as event bus (not Kafka, RabbitMQ, or SNS/SQS).

Rationale

  1. Already Using Redis: For caching, sessions, rate limiting
  2. Persistent: Unlike Pub/Sub, messages stored on disk
  3. Consumer Groups: Multiple consumers, load balancing
  4. Simple: No separate infrastructure (Kafka, RabbitMQ)
  5. Fast: 1M+ messages/sec

Alternatives Considered

Alternative 1: Kafka

Rejected - Overkill for MVP (<10k events/day), requires Zookeeper, expensive ($100+/mo)

Alternative 2: RabbitMQ

Rejected - Additional infrastructure, complex routing, lower throughput

Alternative 3: AWS SNS/SQS

Rejected - Vendor lock-in, higher latency, poor local dev experience

Alternative 4: Redis Pub/Sub

Rejected - No persistence (messages lost if consumer offline), no consumer groups


Implementation

1. Publish Events

// src/events/eventBus.ts
import Redis from 'ioredis';

const redis = new Redis(process.env.REDIS_URL);

export async function publishEvent(
eventType: string,
payload: object,
metadata: object = {}
) {
const event = {
id: crypto.randomUUID(),
type: eventType,
payload,
metadata,
timestamp: new Date().toISOString(),
};

await redis.xadd(
`events:${eventType}`, // Stream key: events:booking.created
'*', // Auto-generate message ID
'data', JSON.stringify(event)
);
}

// Usage
await publishEvent('booking.created', {
bookingId: '123',
guestName: 'John Doe',
});

2. Consume Events (Consumer Groups)

// src/consumers/bookingCreatedConsumer.ts
export async function consumeBookingCreatedEvents() {
const groupName = 'booking-notifications';
const consumerName = `consumer-${process.pid}`;
const streamKey = 'events:booking.created';

// Create consumer group (idempotent)
try {
await redis.xgroup('CREATE', streamKey, groupName, '0', 'MKSTREAM');
} catch (error) {
// Group already exists
}

while (true) {
const events = await redis.xreadgroup(
'GROUP', groupName, consumerName,
'BLOCK', 5000, // Block for 5s
'COUNT', 10, // Read 10 messages
'STREAMS', streamKey, '>'
);

if (!events) continue;

for (const [stream, messages] of events) {
for (const [messageId, fields] of messages) {
const event = JSON.parse(fields[1]); // fields = ['data', '{...}']

try {
await handleBookingCreated(event.payload);

// Acknowledge message
await redis.xack(streamKey, groupName, messageId);
} catch (error) {
logger.error({ error, messageId }, 'Failed to process event');
// Message remains unacknowledged, will be retried
}
}
}
}
}

3. Retry Unacknowledged Messages

// Check for pending messages (unacknowledged >5min)
const pending = await redis.xpending(streamKey, groupName, '-', '+', 10);

for (const [messageId, consumer, idleTime, deliveryCount] of pending) {
if (idleTime > 300000) { // 5 minutes
// Claim message and retry
await redis.xclaim(streamKey, groupName, consumerName, 300000, messageId);
}
}

Stream Management

Retention Policy

// Trim stream to last 10,000 messages (FIFO)
await redis.xtrim('events:booking.created', 'MAXLEN', '~', 10000);

Stream Monitoring

# Stream length
XLEN events:booking.created

# Pending messages per consumer
XPENDING events:booking.created booking-notifications

# Consumer group info
XINFO GROUPS events:booking.created

Event Routing Patterns

1. Topic-Based Routing

// Stream per event type
events:booking.created
events:booking.updated
events:property.created

2. Consumer Groups (Fan-Out)

// Multiple consumers for same event
GROUP: booking-notifications → Send email
GROUP: booking-analytics → Update metrics
GROUP: booking-sync → Sync to Hostaway

Consequences

Positive

  • No Extra Infrastructure: Uses existing Redis
  • Persistent: Messages survive restarts
  • Consumer Groups: Multiple consumers, load balancing
  • Fast: <1ms latency

Negative

  • Redis Dependency: Single point of failure
  • Limited Retention: Must trim streams (not unlimited like Kafka)
  • No Complex Routing: No topic wildcards (unlike RabbitMQ)

Mitigations

  • Use Upstash Redis with replication (99.99% SLA)
  • Trim streams to 10k messages (sufficient for MVP)
  • Use multiple streams for different event types

Validation Checklist

  • Redis Streams used for all domain events
  • Consumer groups created for each subscriber
  • Unacknowledged message retry logic
  • Stream trimming (MAXLEN 10k)
  • XPENDING monitoring

References