message-queues
Implement message queues, event-driven architecture, and asynchronous processing with RabbitMQ, Kafka, Redis Streams, and other message brokers.
DIRECTIVE_TEXTUELLE
Message Queues Skill
Build scalable event-driven systems with message queues and asynchronous processing.
When to Use
Use this skill when the user wants to:
- Implement message queues for asynchronous processing
- Build event-driven architectures
- Decouple services with message brokers
- Handle background jobs and task queues
- Implement pub/sub patterns
- Process streams of data
- Build reliable distributed systems
- Handle high-throughput data processing
- Implement CQRS and event sourcing patterns
Message Queue Technologies
RabbitMQ
- Protocol: AMQP (Advanced Message Queuing Protocol)
- Use Cases: Task queues, pub/sub, RPC, routing
- Features: Message acknowledgment, dead letter queues, priorities
- Patterns: Work queues, publish/subscribe, routing, topics, RPC
Apache Kafka
- Protocol: Custom binary protocol over TCP
- Use Cases: Event streaming, log aggregation, real-time analytics
- Features: High throughput, distributed, fault-tolerant, replay capability
- Patterns: Event sourcing, CQRS, stream processing
Redis Streams
- Protocol: Redis protocol (RESP)
- Use Cases: Lightweight messaging, real-time data streams
- Features: Consumer groups, message acknowledgment, persistence
- Patterns: Pub/sub, streams, lists as queues
AWS SQS/SNS
- Type: Managed cloud service
- Use Cases: Serverless architectures, cloud-native apps
- Features: Fully managed, scalable, integrated with AWS services
- Patterns: Queue-based load leveling, fan-out
Other Options
- NATS: High-performance messaging
- Apache Pulsar: Unified messaging and streaming
- ActiveMQ: Java-based message broker
- ZeroMQ: Lightweight messaging library
Common Patterns
1. Work Queue (Task Queue)
Distribute tasks among workers for parallel processing.
# Producer (RabbitMQ)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='Task data',
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
)
)
# Consumer (Worker)
def callback(ch, method, properties, body):
print(f"Processing task: {body}")
# Process task
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
2. Publish/Subscribe
Broadcast messages to multiple consumers.
# Publisher
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='Log message')
# Subscriber
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
3. Topic-Based Routing
Route messages based on patterns.
# Publisher with routing key
channel.exchange_declare(exchange='topics', exchange_type='topic')
channel.basic_publish(
exchange='topics',
routing_key='user.created',
body='User data'
)
# Consumer with pattern
channel.queue_bind(exchange='topics', queue=queue_name, routing_key='user.*')
4. Event Sourcing with Kafka
Store all changes as events.
from kafka import KafkaProducer, KafkaConsumer
import json
# Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
event = {
'event_type': 'OrderCreated',
'order_id': '12345',
'timestamp': '2024-01-01T00:00:00Z',
'data': {'customer_id': '67890', 'total': 99.99}
}
producer.send('order-events', value=event)
# Consumer
consumer = KafkaConsumer(
'order-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
group_id='order-processor'
)
for message in consumer:
event = message.value
# Process event
print(f"Processing {event['event_type']}: {event['order_id']}")
5. Dead Letter Queue
Handle failed messages.
# Declare DLQ
channel.exchange_declare(exchange='dlx', exchange_type='fanout')
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(exchange='dlx', queue='dead_letter_queue')
# Main queue with DLQ
args = {
'x-dead-letter-exchange': 'dlx',
'x-message-ttl': 60000 # 60 seconds
}
channel.queue_declare(queue='main_queue', arguments=args)
Best Practices
Message Design
- Idempotent consumers: Handle duplicate messages gracefully
- Small messages: Keep message payloads small, reference large data
- Schema versioning: Version your message schemas
- Include metadata: Timestamps, correlation IDs, message IDs
Reliability
- Acknowledgments: Use manual acknowledgment for critical messages
- Persistence: Enable message persistence for durability
- Retries: Implement exponential backoff for retries
- Dead letter queues: Handle poison messages
- Monitoring: Monitor queue depth, consumer lag, throughput
Performance
- Batching: Batch messages when possible
- Prefetch: Configure consumer prefetch count
- Partitioning: Use partitions for parallel processing (Kafka)
- Connection pooling: Reuse connections
- Async processing: Use async consumers when appropriate
Security
- Authentication: Use credentials, SASL, or certificates
- Encryption: Enable TLS for data in transit
- Authorization: Control access to queues and topics
- Network isolation: Use VPCs or private networks
Architecture Patterns
Microservices Communication
Service A → Queue → Service B
↘ Queue → Service C
Event-Driven Architecture
Event Producer → Event Bus → Event Consumers
↘ Event Store
CQRS (Command Query Responsibility Segregation)
Command → Command Bus → Write Model → Events
Events → Event Bus → Read Model (Projections)
Saga Pattern
Distributed transactions across services using events.
# Order Saga Orchestrator
def order_saga(order_data):
# Step 1: Reserve inventory
publish_event('inventory.reserve', order_data)
# Step 2: Process payment
publish_event('payment.process', order_data)
# Step 3: Ship order
publish_event('shipping.create', order_data)
# Compensation on failure
def compensate_order(order_id, failed_step):
if failed_step >= 2:
publish_event('payment.refund', {'order_id': order_id})
if failed_step >= 1:
publish_event('inventory.release', {'order_id': order_id})
Error Handling
Retry Strategies
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Retry {attempt + 1}/{max_retries} after {delay}s")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def process_message(message):
# Process message
pass
Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = 'CLOSED'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
Monitoring & Observability
Key Metrics
- Queue depth: Number of messages in queue
- Consumer lag: Time/count behind latest message
- Throughput: Messages per second
- Error rate: Failed message percentage
- Processing time: Message processing duration
- DLQ size: Dead letter queue depth
Health Checks
def check_queue_health():
metrics = {
'queue_depth': get_queue_depth('task_queue'),
'consumer_count': get_consumer_count('task_queue'),
'message_rate': get_message_rate('task_queue'),
}
if metrics['queue_depth'] > 10000:
alert("High queue depth", metrics)
if metrics['consumer_count'] == 0:
alert("No active consumers", metrics)
return metrics
Quick Start Examples
RabbitMQ with Python (Pika)
# Install
pip install pika
# Run RabbitMQ with Docker
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Kafka with Python (kafka-python)
# Install
pip install kafka-python
# Run Kafka with Docker
docker-compose up -d # Using kafka docker-compose.yml
Redis Streams with Python (redis-py)
# Install
pip install redis
# Run Redis with Docker
docker run -d --name redis -p 6379:6379 redis
Testing
Unit Testing
import pytest
from unittest.mock import Mock, patch
def test_message_processing():
mock_channel = Mock()
message = b'{"task": "process_data"}'
process_message(mock_channel, None, None, message)
mock_channel.basic_ack.assert_called_once()
Integration Testing
@pytest.fixture
def rabbitmq_container():
# Use testcontainers or docker-compose for testing
container = start_rabbitmq_container()
yield container
container.stop()
def test_message_flow(rabbitmq_container):
# Publish message
publish('test_queue', 'test message')
# Consume message
message = consume('test_queue', timeout=5)
assert message == 'test message'
Common Use Cases
- Background Job Processing: Email sending, image processing, report generation
- Microservices Communication: Async service-to-service messaging
- Event-Driven Systems: React to business events across services
- Log Aggregation: Collect and process logs from multiple sources
- Real-Time Analytics: Stream processing and aggregation
- Order Processing: E-commerce order workflows
- Notification Systems: Push notifications, SMS, email queues
- Data Pipeline: ETL processes, data transformation
Deliverables
- Message queue setup and configuration
- Producer and consumer implementations
- Error handling and retry logic
- Dead letter queue handling
- Monitoring and alerting setup
- Documentation and runbooks
- Testing suite
Quality Checklist
- Messages are processed idempotently
- Dead letter queues are configured
- Monitoring and alerting is in place
- Error handling includes retries
- Consumers acknowledge messages properly
- Message persistence is configured
- Security (TLS, auth) is enabled
- Documentation includes failure scenarios
- Load testing has been performed
- Graceful shutdown is implemented