RQ System Architecture
Deep dive into the Redis Queue (RQ) system powering Fusion
RQ System Architecture
Fusion uses a custom fork of RQ (Redis Queue) with priority queue support and enhanced job management features. This document provides a deep technical understanding of how the queue system works.
Core Concepts
Redis as Queue Backend
All queues are stored in Redis using specific key patterns:
| Key Pattern | Data Structure | Purpose |
|---|---|---|
rq:queue:<name> | Sorted Set (ZSET) | Priority queues for tasks |
rq:job:<job_id> | Hash | Job data and metadata |
rq:job:<job_id>:dependents | Set | Jobs dependent on this job |
rq:scheduler:scheduled_jobs | Sorted Set | Scheduled jobs (score = unix timestamp) |
rq:worker:<name> | Hash | Worker registration and heartbeat |
rq:workers | Set | All registered workers |
rq:queues | Set | All known queue keys |
Priority Queue Implementation
Unlike standard RQ which uses Redis Lists (FIFO), Fusion uses Sorted Sets (ZSET) for priority-based processing.
How It Works
Priority Levels
class PriorityQueue:
PRIORITY_HIGH = 2 # OTP, Reminders - processed first
PRIORITY_MODERATE = 1 # Standard messages
PRIORITY_LOW = 0 # Campaigns, bulk operations - processed lastAtomic Pop Operation
To prevent race conditions when multiple workers are running:
def pop(self):
try:
# Get highest priority job (highest score)
_item = self.connection.zrevrange(self._key, 0, 0)[0]
# Atomically remove it - if ZREM returns 1, we got it
if self.connection.zrem(self._key, _item) == 1:
return _item
else:
# Another worker got it first, retry
return self.pop()
except IndexError:
return None # Queue is emptyJob Lifecycle
Job Statuses
| Status | Description |
|---|---|
queued | Job is waiting in queue |
started | Worker is executing the job |
finished | Job completed successfully |
failed | Job execution failed |
deferred | Waiting for dependency to complete |
Job Data Structure
Each job is stored as a Redis Hash with the following fields:
{
'data': <pickled tuple of (func_name, instance, args, kwargs)>,
'created_at': '2024-01-15T10:30:00',
'origin': 'sms', # Queue name
'original_queue': 'sms', # Original queue (for requeue)
'description': 'tasks.sms.sendSMS(...)',
'enqueued_at': '2024-01-15T10:30:00',
'started_at': '2024-01-15T10:30:01',
'ended_at': '2024-01-15T10:30:02',
'status': 'finished',
'result': <pickled return value>,
'exc_info': <exception traceback if failed>,
'timeout': 180,
'result_ttl': 500,
# Custom Fusion fields
'priority': 2,
'retryCount': 7,
'log_tag': 'SMS_OTP',
'failed_task_handler': 'https://...',
'success_task_handler': 'https://...'
}Retry Mechanism
Fusion implements automatic retry with exponential backoff:
Retry Configuration
# Default retry count
job.retryCount = 7
# Retryable exceptions
- ClientError (AWS)
- ConnectTimeout
- ConnectionError
- ReadTimeout
# Non-retryable (immediate failure)
- All other exceptionsScheduler System
The scheduler runs as a separate process, periodically checking for jobs that are due:
Scheduler Flow
Scheduling Options
- enqueue_at() - Execute at specific datetime
- enqueue_in() - Execute after time delta
- cron() - Recurring execution using cron syntax
Cron Jobs
Defined in wrappers/scheduler_task.py:
# Run maintenance every 5 minutes
add(scheduler, {}, '*/5 * * * *',
'tasks.maintenance.run_maintenance_on_failed_queue',
id='redis_maintenance_on_failed_queue')Worker Process Model
Workers use a fork-based execution model for isolation:
Worker States
| State | Description |
|---|---|
started | Worker just started |
idle | Waiting for jobs |
busy | Executing a job |
suspended | Paused by rq suspend |
Heartbeat
Workers send heartbeats to Redis to indicate they're alive:
def heartbeat(self, timeout=0):
timeout = max(timeout, self.default_worker_ttl) # 420 seconds
self.connection.expire(self.key, timeout)Failed Queue
Failed jobs are moved to a special queue for inspection and retry:
# Queue name
rq:queue:failed
# Requeue operations
failed_queue.requeue(job_id) # To original queue (List)
failed_queue.requeue_set(job_id) # To priority queue (Sorted Set)Connection Management
Fusion uses a connection stack pattern for managing Redis connections:
# Push connection for current context
push_connection(redis_conn)
# Get current connection
conn = get_current_connection()
# Pop when done
pop_connection()
# Context manager
with Connection(redis_conn):
# Use connection
passLogging & Monitoring
Elasticsearch Logging
All job events are logged to Elasticsearch:
# Index: active-fusion-logs
# Type: logs
{
"job_id": "uuid",
"task": "SMS",
"task_type": "OTP",
"status": "queued|processing|completed|failed|retry",
"host": "worker-hostname",
"log_time": "2024-01-15T10:30:00",
"message": "Status message",
"payload": "...",
"job_result": "..."
}Log Events
| Event | Method | Status |
|---|---|---|
| Job enqueued | Log.start() | queued |
| Execution started | Log.execution_start() | processing |
| Execution succeeded | Log.execution_success() | completed |
| Execution failed | Log.execution_failed() | failed |
| Retry scheduled | Log.execution_retry() | retry |
Best Practices
1. Choosing Priority
# HIGH (2) - User-facing, time-sensitive
- OTP SMS
- Password reset emails
- Critical reminders
# MODERATE (1) - Standard operations
- Report delivery
- Appointment confirmations
- Standard webhooks
# LOW (0) - Background, bulk operations
- Marketing campaigns
- Batch notifications
- Analytics webhooks2. Setting Timeouts
# Default timeout: 180 seconds
# Webhook default: 15 seconds
# For slow operations, increase timeout:
payload = {
'url': '...',
'timeout': 60 # 60 seconds
}3. Using Callbacks
# For tracking delivery status:
payload = {
'callback': 1,
'callback_url': 'https://your-api.com/delivery-status'
}
# For error handling:
data = {
'failed_task_handler': 'https://your-api.com/handle-failure',
'success_task_handler': 'https://your-api.com/handle-success'
}