caching
Implement high-performance caching solutions with Redis and Memcached, including cache strategies, distributed caching, and performance optimization.
Caching Skill
Implement high-performance caching solutions with Redis and Memcached for scalable applications.
When to Use
Use this skill when the user wants to:
- Implement Redis or Memcached caching
- Reduce database load and improve performance
- Implement cache-aside, write-through, or write-behind patterns
- Set up distributed caching for microservices
- Implement session storage and management
- Cache API responses and database queries
- Implement rate limiting with Redis
- Set up cache invalidation strategies
- Implement real-time features with Redis pub/sub
- Create leaderboards and ranking systems
- Implement distributed locks and semaphores
- Set up cache warming and preloading
Technology Stack
Core Technologies
Redis
- In-memory data structure store
- Strings, hashes, lists, sets, sorted sets
- Pub/sub messaging
- Streams for event sourcing
- Geospatial indexes
- HyperLogLog for counting
- Bitmap operations
Memcached
- Simple, high-performance caching
- Key-value storage
- LRU eviction
- Distributed architecture
- Multi-threaded performance
Client Libraries
Python
- redis-py: Official Redis client
- aioredis: Async Redis client
- python-memcached: Memcached client
- pymemcache: High-performance Memcached client
- redis-om: Object mapper for Redis
Node.js
- ioredis: Feature-rich Redis client
- node-redis: Official Redis client
- memjs: Memcached client
Java
- Jedis: Redis Java client
- Lettuce: Advanced Redis client
- Redisson: Redis framework
Supporting Tools
- Redis Sentinel: High availability
- Redis Cluster: Horizontal scaling
- RedisInsight: GUI management tool
- redis-cli: Command-line interface
- Twemproxy: Proxy for Redis/Memcached
Cache Strategies
1. Cache-Aside (Lazy Loading)
Pattern:
- Application checks cache first
- On cache miss, load from database
- Store result in cache for future requests
Use Cases:
- Read-heavy workloads
- Data that doesn’t change frequently
- Queries with variable parameters
Python Example:
import redis
from typing import Optional
import json
class CacheAside:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_ttl = 3600 # 1 hour
def get_user(self, user_id: int) -> Optional[dict]:
"""Get user with cache-aside pattern."""
cache_key = f"user:{user_id}"
# Try cache first
cached_data = self.redis.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Cache miss - load from database
user = self._load_user_from_db(user_id)
if user:
# Store in cache
self.redis.setex(
cache_key,
self.default_ttl,
json.dumps(user)
)
return user
def _load_user_from_db(self, user_id: int) -> Optional[dict]:
"""Load user from database."""
# Database query implementation
pass
2. Write-Through Cache
Pattern:
- Write to cache and database simultaneously
- Cache always in sync with database
- Slower writes, faster reads
Use Cases:
- Data consistency is critical
- Read-heavy workloads with some writes
- Data that must always be available
Python Example:
class WriteThrough:
def __init__(self, redis_client: redis.Redis, db_connection):
self.redis = redis_client
self.db = db_connection
def update_user(self, user_id: int, data: dict) -> bool:
"""Update user with write-through pattern."""
cache_key = f"user:{user_id}"
try:
# Write to database first
self.db.update_user(user_id, data)
# Update cache
self.redis.setex(
cache_key,
3600,
json.dumps(data)
)
return True
except Exception as e:
# Rollback if needed
raise
3. Write-Behind (Write-Back)
Pattern:
- Write to cache immediately
- Asynchronously write to database
- Fastest writes, eventual consistency
Use Cases:
- High write throughput required
- Temporary data acceptable
- Analytics and metrics
Python Example:
import asyncio
from collections import deque
class WriteBehind:
def __init__(self, redis_client: redis.Redis, db_connection):
self.redis = redis_client
self.db = db_connection
self.write_queue = deque()
self.batch_size = 100
self.flush_interval = 5 # seconds
async def update_metric(self, metric_id: str, value: float):
"""Update metric with write-behind pattern."""
cache_key = f"metric:{metric_id}"
# Write to cache immediately
self.redis.incrbyfloat(cache_key, value)
# Queue for database write
self.write_queue.append((metric_id, value))
# Flush if batch size reached
if len(self.write_queue) >= self.batch_size:
await self._flush_to_db()
async def _flush_to_db(self):
"""Flush queued writes to database."""
if not self.write_queue:
return
batch = []
while self.write_queue and len(batch) < self.batch_size:
batch.append(self.write_queue.popleft())
# Batch write to database
await self.db.batch_update_metrics(batch)
4. Read-Through Cache
Pattern:
- Cache handles database loading
- Application only interacts with cache
- Cache library manages data loading
Python Example:
class ReadThrough:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def get_with_loader(self, key: str, loader_func, ttl: int = 3600):
"""Get data with automatic loading."""
# Try cache
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Load using provided function
data = loader_func()
# Cache the result
if data:
self.redis.setex(key, ttl, json.dumps(data))
return data
Redis Implementation Patterns
1. Connection Management
Python with Connection Pooling:
import redis
from redis.sentinel import Sentinel
class RedisManager:
def __init__(self, config: dict):
self.config = config
self._pool = None
self._client = None
def get_client(self) -> redis.Redis:
"""Get Redis client with connection pooling."""
if self._client is None:
self._pool = redis.ConnectionPool(
host=self.config.get('host', 'localhost'),
port=self.config.get('port', 6379),
db=self.config.get('db', 0),
password=self.config.get('password'),
max_connections=self.config.get('max_connections', 50),
socket_timeout=self.config.get('timeout', 5),
socket_connect_timeout=self.config.get('connect_timeout', 5),
decode_responses=True
)
self._client = redis.Redis(connection_pool=self._pool)
return self._client
def get_sentinel_client(self, sentinel_hosts: list,
master_name: str) -> redis.Redis:
"""Get Redis client via Sentinel for high availability."""
sentinel = Sentinel(
sentinel_hosts,
socket_timeout=0.5,
sentinel_kwargs={'password': self.config.get('password')}
)
# Get master connection
master = sentinel.master_for(
master_name,
socket_timeout=5,
password=self.config.get('password')
)
return master
def close(self):
"""Close connection pool."""
if self._pool:
self._pool.disconnect()
2. Cache Invalidation
Invalidation Strategies:
class CacheInvalidation:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def invalidate_by_key(self, key: str):
"""Delete specific cache key."""
self.redis.delete(key)
def invalidate_by_pattern(self, pattern: str):
"""Delete keys matching pattern."""
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=pattern, count=100)
if keys:
self.redis.delete(*keys)
if cursor == 0:
break
def invalidate_by_tags(self, tag: str):
"""Invalidate all keys with specific tag."""
tag_key = f"tag:{tag}"
keys = self.redis.smembers(tag_key)
if keys:
# Delete tagged keys
self.redis.delete(*keys)
# Delete tag set
self.redis.delete(tag_key)
def add_tag(self, key: str, tag: str, ttl: int = None):
"""Tag a cache key for group invalidation."""
tag_key = f"tag:{tag}"
self.redis.sadd(tag_key, key)
if ttl:
self.redis.expire(tag_key, ttl)
3. Distributed Locking
Redis Distributed Lock:
import uuid
import time
class RedisLock:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def acquire_lock(self, lock_name: str, timeout: int = 10,
ttl: int = 10) -> Optional[str]:
"""Acquire distributed lock."""
identifier = str(uuid.uuid4())
lock_key = f"lock:{lock_name}"
end = time.time() + timeout
while time.time() < end:
# Try to acquire lock
if self.redis.set(lock_key, identifier, nx=True, ex=ttl):
return identifier
# Wait before retry
time.sleep(0.001)
return None
def release_lock(self, lock_name: str, identifier: str) -> bool:
"""Release distributed lock safely."""
lock_key = f"lock:{lock_name}"
# Use Lua script for atomic release
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return bool(self.redis.eval(lua_script, 1, lock_key, identifier))
def __enter__(self):
"""Context manager support."""
self.identifier = self.acquire_lock(self.lock_name)
if not self.identifier:
raise Exception("Could not acquire lock")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Release lock on exit."""
self.release_lock(self.lock_name, self.identifier)
4. Rate Limiting
Token Bucket Rate Limiter:
from datetime import datetime
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def is_allowed(self, user_id: str, max_requests: int = 100,
window_seconds: int = 60) -> bool:
"""Check if request is allowed (sliding window)."""
key = f"rate_limit:{user_id}"
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window_seconds)
results = pipe.execute()
request_count = results[1]
return request_count < max_requests
def fixed_window_limiter(self, user_id: str, max_requests: int = 100,
window_seconds: int = 60) -> bool:
"""Fixed window rate limiter (simpler, less accurate)."""
key = f"rate_limit:fixed:{user_id}"
current_window = int(time.time() / window_seconds)
window_key = f"{key}:{current_window}"
pipe = self.redis.pipeline()
pipe.incr(window_key)
pipe.expire(window_key, window_seconds * 2)
results = pipe.execute()
request_count = results[0]
return request_count <= max_requests
5. Session Management
Redis Session Store:
class SessionStore:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_ttl = 3600 # 1 hour
def create_session(self, user_id: str, data: dict,
ttl: int = None) -> str:
"""Create new session."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
'user_id': user_id,
'created_at': datetime.utcnow().isoformat(),
**data
}
self.redis.setex(
session_key,
ttl or self.default_ttl,
json.dumps(session_data)
)
# Index by user
self.redis.sadd(f"user_sessions:{user_id}", session_id)
return session_id
def get_session(self, session_id: str) -> Optional[dict]:
"""Get session data."""
session_key = f"session:{session_id}"
data = self.redis.get(session_key)
if data:
return json.loads(data)
return None
def update_session(self, session_id: str, data: dict):
"""Update session data."""
session = self.get_session(session_id)
if session:
session.update(data)
session_key = f"session:{session_id}"
ttl = self.redis.ttl(session_key)
self.redis.setex(
session_key,
ttl if ttl > 0 else self.default_ttl,
json.dumps(session)
)
def destroy_session(self, session_id: str):
"""Destroy session."""
session = self.get_session(session_id)
if session:
session_key = f"session:{session_id}"
user_id = session.get('user_id')
self.redis.delete(session_key)
if user_id:
self.redis.srem(f"user_sessions:{user_id}", session_id)
6. Pub/Sub Messaging
Redis Pub/Sub:
import threading
class PubSubManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.pubsub = self.redis.pubsub()
self.listeners = {}
def publish(self, channel: str, message: dict):
"""Publish message to channel."""
self.redis.publish(channel, json.dumps(message))
def subscribe(self, channel: str, callback):
"""Subscribe to channel with callback."""
if channel not in self.listeners:
self.pubsub.subscribe(channel)
self.listeners[channel] = []
self.listeners[channel].append(callback)
def start_listening(self):
"""Start listening for messages in background thread."""
def listen():
for message in self.pubsub.listen():
if message['type'] == 'message':
channel = message['channel'].decode('utf-8')
data = json.loads(message['data'])
for callback in self.listeners.get(channel, []):
try:
callback(data)
except Exception as e:
print(f"Error in callback: {e}")
thread = threading.Thread(target=listen, daemon=True)
thread.start()
Memcached Implementation
Basic Memcached Usage
Python Example:
import pymemcache
from pymemcache.client.base import Client
class MemcachedManager:
def __init__(self, host: str = 'localhost', port: int = 11211):
self.client = Client(
(host, port),
serializer=self._serialize,
deserializer=self._deserialize,
connect_timeout=5,
timeout=2
)
def _serialize(self, key, value):
"""Serialize value for storage."""
if isinstance(value, str):
return value.encode('utf-8'), 1
return json.dumps(value).encode('utf-8'), 2
def _deserialize(self, key, value, flags):
"""Deserialize value from storage."""
if flags == 1:
return value.decode('utf-8')
if flags == 2:
return json.loads(value.decode('utf-8'))
return value
def get(self, key: str):
"""Get value from cache."""
return self.client.get(key)
def set(self, key: str, value, ttl: int = 3600):
"""Set value in cache."""
return self.client.set(key, value, expire=ttl)
def delete(self, key: str):
"""Delete key from cache."""
return self.client.delete(key)
def increment(self, key: str, delta: int = 1):
"""Increment counter."""
return self.client.incr(key, delta)
def get_multi(self, keys: list):
"""Get multiple keys."""
return self.client.get_many(keys)
def set_multi(self, mapping: dict, ttl: int = 3600):
"""Set multiple keys."""
return self.client.set_many(mapping, expire=ttl)
Cache Performance Optimization
1. Cache Warming
Preload Critical Data:
class CacheWarmer:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def warm_cache(self, data_loader):
"""Warm cache with frequently accessed data."""
# Load hot data
hot_items = await data_loader.get_popular_items(limit=1000)
pipe = self.redis.pipeline()
for item in hot_items:
key = f"item:{item['id']}"
pipe.setex(key, 3600, json.dumps(item))
pipe.execute()
def warm_on_startup(self):
"""Warm cache on application startup."""
# Load configuration
# Preload user sessions
# Cache static content
pass
2. Cache Compression
Compress Large Values:
import gzip
import pickle
class CompressedCache:
def __init__(self, redis_client: redis.Redis,
compression_threshold: int = 1024):
self.redis = redis_client
self.threshold = compression_threshold
def set_compressed(self, key: str, value, ttl: int = 3600):
"""Set value with automatic compression."""
serialized = pickle.dumps(value)
if len(serialized) > self.threshold:
# Compress large values
compressed = gzip.compress(serialized)
self.redis.setex(f"{key}:c", ttl, compressed)
else:
self.redis.setex(key, ttl, serialized)
def get_compressed(self, key: str):
"""Get value with automatic decompression."""
# Try compressed version first
compressed = self.redis.get(f"{key}:c")
if compressed:
decompressed = gzip.decompress(compressed)
return pickle.loads(decompressed)
# Try uncompressed
data = self.redis.get(key)
if data:
return pickle.loads(data)
return None
3. Pipeline Batching
Batch Operations for Performance:
class BatchCache:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def batch_get(self, keys: list) -> dict:
"""Get multiple keys in single round-trip."""
pipe = self.redis.pipeline()
for key in keys:
pipe.get(key)
results = pipe.execute()
return {
key: json.loads(value) if value else None
for key, value in zip(keys, results)
}
def batch_set(self, items: dict, ttl: int = 3600):
"""Set multiple keys in single round-trip."""
pipe = self.redis.pipeline()
for key, value in items.items():
pipe.setex(key, ttl, json.dumps(value))
pipe.execute()
Monitoring and Debugging
Redis Monitoring
Performance Metrics:
class RedisMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def get_stats(self) -> dict:
"""Get Redis statistics."""
info = self.redis.info()
return {
'used_memory': info['used_memory_human'],
'connected_clients': info['connected_clients'],
'total_commands_processed': info['total_commands_processed'],
'keyspace_hits': info['keyspace_hits'],
'keyspace_misses': info['keyspace_misses'],
'hit_rate': self._calculate_hit_rate(info),
'evicted_keys': info.get('evicted_keys', 0),
'expired_keys': info.get('expired_keys', 0)
}
def _calculate_hit_rate(self, info: dict) -> float:
"""Calculate cache hit rate."""
hits = info.get('keyspace_hits', 0)
misses = info.get('keyspace_misses', 0)
total = hits + misses
if total == 0:
return 0.0
return (hits / total) * 100
def monitor_slow_log(self, count: int = 10):
"""Get slow queries."""
return self.redis.slowlog_get(count)
Best Practices
1. Key Naming Conventions
- Use descriptive, hierarchical names:
user:123:profile - Include version in keys:
v1:user:123 - Use consistent separators (colons recommended)
- Keep keys short but meaningful
- Avoid special characters
2. TTL Management
- Always set expiration for cache keys
- Use appropriate TTL based on data volatility
- Implement cache warming for critical data
- Monitor eviction rates
3. Connection Pooling
- Always use connection pools
- Configure appropriate pool size
- Set reasonable timeouts
- Handle connection failures gracefully
4. Error Handling
- Implement fallback to database on cache failure
- Log cache errors separately
- Use circuit breakers for cache dependencies
- Gracefully degrade when cache is unavailable
5. Security
- Enable password authentication
- Use TLS/SSL for connections
- Implement access controls
- Isolate cache instances by environment
- Regularly update Redis/Memcached
6. Memory Management
- Set maxmemory limits
- Configure eviction policies (LRU, LFU)
- Monitor memory usage
- Use compression for large values
- Clean up expired keys
7. Data Consistency
- Use appropriate cache invalidation
- Implement version tagging
- Consider cache stampede prevention
- Use locks for critical operations
Common Anti-Patterns to Avoid
- Caching everything: Only cache frequently accessed data
- No TTL: Always set expiration times
- Caching mutable data: Be careful with frequently changing data
- Large values: Compress or split large objects
- No monitoring: Track hit rates and performance
- Single point of failure: Use replication and clustering
- Ignoring cache misses: Optimize for cache miss scenarios
Deliverables
When implementing caching, ensure you deliver:
-
Cache Configuration
- Connection settings and pools
- Eviction policies and memory limits
- Replication and clustering setup
-
Cache Layer Implementation
- Cache strategy selection (cache-aside, write-through, etc.)
- Key naming conventions
- TTL strategies
-
Integration Code
- Database integration
- API/service integration
- Error handling and fallbacks
-
Monitoring Setup
- Performance metrics
- Hit/miss rate tracking
- Memory usage monitoring
-
Documentation
- Cache architecture diagram
- Key patterns and conventions
- Troubleshooting guide
Quality Checklist
Before completing a caching implementation, verify:
- Appropriate cache strategy selected and implemented
- Connection pooling configured properly
- TTL set for all cache keys
- Key naming conventions documented and consistent
- Error handling and fallback mechanisms in place
- Cache invalidation strategy implemented
- Memory limits and eviction policies configured
- Monitoring and metrics collection set up
- Security (authentication, encryption) enabled
- Performance testing completed
- Cache hit rate is acceptable (>80% for hot data)
- Documentation includes architecture and patterns
- High availability configured (Sentinel/Cluster)
- Backup and disaster recovery plan documented