ADR-0031: Redis Streams as Event Bus
Status
Accepted - 2025-01-26
Context
TVL Platform needs lightweight event bus for domain events (booking.created, property.updated) with consumer groups and persistence.
Decision
Redis Streams as event bus (not Kafka, RabbitMQ, or SNS/SQS).
Rationale
- Already Using Redis: For caching, sessions, rate limiting
- Persistent: Unlike Pub/Sub, messages stored on disk
- Consumer Groups: Multiple consumers, load balancing
- Simple: No separate infrastructure (Kafka, RabbitMQ)
- Fast: 1M+ messages/sec
Alternatives Considered
Alternative 1: Kafka
Rejected - Overkill for MVP (<10k events/day), requires Zookeeper, expensive ($100+/mo)
Alternative 2: RabbitMQ
Rejected - Additional infrastructure, complex routing, lower throughput
Alternative 3: AWS SNS/SQS
Rejected - Vendor lock-in, higher latency, poor local dev experience
Alternative 4: Redis Pub/Sub
Rejected - No persistence (messages lost if consumer offline), no consumer groups
Implementation
1. Publish Events
// src/events/eventBus.ts
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL);
export async function publishEvent(
  eventType: string,
  payload: object,
  metadata: object = {}
) {
  const event = {
    id: crypto.randomUUID(),
    type: eventType,
    payload,
    metadata,
    timestamp: new Date().toISOString(),
  };
  await redis.xadd(
    `events:${eventType}`, // Stream key: events:booking.created
    '*', // Auto-generate message ID
    'data', JSON.stringify(event)
  );
}
// Usage
await publishEvent('booking.created', {
  bookingId: '123',
  guestName: 'John Doe',
});
2. Consume Events (Consumer Groups)
// src/consumers/bookingCreatedConsumer.ts
export async function consumeBookingCreatedEvents() {
  const groupName = 'booking-notifications';
  const consumerName = `consumer-${process.pid}`;
  const streamKey = 'events:booking.created';
  // Create consumer group (idempotent)
  try {
    await redis.xgroup('CREATE', streamKey, groupName, '0', 'MKSTREAM');
  } catch (error) {
    // Group already exists
  }
  while (true) {
    const events = await redis.xreadgroup(
      'GROUP', groupName, consumerName,
      'BLOCK', 5000, // Block for 5s
      'COUNT', 10, // Read 10 messages
      'STREAMS', streamKey, '>'
    );
    if (!events) continue;
    for (const [stream, messages] of events) {
      for (const [messageId, fields] of messages) {
        const event = JSON.parse(fields[1]); // fields = ['data', '{...}']
        try {
          await handleBookingCreated(event.payload);
          // Acknowledge message
          await redis.xack(streamKey, groupName, messageId);
        } catch (error) {
          logger.error({ error, messageId }, 'Failed to process event');
          // Message remains unacknowledged, will be retried
        }
      }
    }
  }
}
3. Retry Unacknowledged Messages
// Check for pending messages (unacknowledged >5min)
const pending = await redis.xpending(streamKey, groupName, '-', '+', 10);
for (const [messageId, consumer, idleTime, deliveryCount] of pending) {
  if (idleTime > 300000) { // 5 minutes
    // Claim message and retry
    await redis.xclaim(streamKey, groupName, consumerName, 300000, messageId);
  }
}
Stream Management
Retention Policy
// Trim stream to last 10,000 messages (FIFO)
await redis.xtrim('events:booking.created', 'MAXLEN', '~', 10000);
Stream Monitoring
# Stream length
XLEN events:booking.created
# Pending messages per consumer
XPENDING events:booking.created booking-notifications
# Consumer group info
XINFO GROUPS events:booking.created
Event Routing Patterns
1. Topic-Based Routing
// Stream per event type
events:booking.created
events:booking.updated
events:property.created
2. Consumer Groups (Fan-Out)
// Multiple consumers for same event
GROUP: booking-notifications → Send email
GROUP: booking-analytics → Update metrics
GROUP: booking-sync → Sync to Hostaway
Consequences
Positive
- ✅ No Extra Infrastructure: Uses existing Redis
- ✅ Persistent: Messages survive restarts
- ✅ Consumer Groups: Multiple consumers, load balancing
- ✅ Fast: <1ms latency
Negative
- ❌ Redis Dependency: Single point of failure
- ❌ Limited Retention: Must trim streams (not unlimited like Kafka)
- ❌ No Complex Routing: No topic wildcards (unlike RabbitMQ)
Mitigations
- Use Upstash Redis with replication (99.99% SLA)
- Trim streams to 10k messages (sufficient for MVP)
- Use multiple streams for different event types
Validation Checklist
- Redis Streams used for all domain events
- Consumer groups created for each subscriber
- Unacknowledged message retry logic
- Stream trimming (MAXLEN 10k)
- XPENDING monitoring