ServicesFusion

RQ System Architecture

Deep dive into the Redis Queue (RQ) system powering Fusion

👤 Sai Tharun

RQ System Architecture

Fusion uses a custom fork of RQ (Redis Queue) with priority queue support and enhanced job management features. This document provides a deep technical understanding of how the queue system works.


Core Concepts

Redis as Queue Backend

All queues are stored in Redis using specific key patterns:

Key PatternData StructurePurpose
rq:queue:<name>Sorted Set (ZSET)Priority queues for tasks
rq:job:<job_id>HashJob data and metadata
rq:job:<job_id>:dependentsSetJobs dependent on this job
rq:scheduler:scheduled_jobsSorted SetScheduled jobs (score = unix timestamp)
rq:worker:<name>HashWorker registration and heartbeat
rq:workersSetAll registered workers
rq:queuesSetAll known queue keys

Priority Queue Implementation

Unlike standard RQ which uses Redis Lists (FIFO), Fusion uses Sorted Sets (ZSET) for priority-based processing.

How It Works

Priority Levels

class PriorityQueue:
    PRIORITY_HIGH = 2      # OTP, Reminders - processed first
    PRIORITY_MODERATE = 1  # Standard messages
    PRIORITY_LOW = 0       # Campaigns, bulk operations - processed last

Atomic Pop Operation

To prevent race conditions when multiple workers are running:

def pop(self):
    try:
        # Get highest priority job (highest score)
        _item = self.connection.zrevrange(self._key, 0, 0)[0]
        
        # Atomically remove it - if ZREM returns 1, we got it
        if self.connection.zrem(self._key, _item) == 1:
            return _item
        else:
            # Another worker got it first, retry
            return self.pop()
    except IndexError:
        return None  # Queue is empty

Job Lifecycle

Job Statuses

StatusDescription
queuedJob is waiting in queue
startedWorker is executing the job
finishedJob completed successfully
failedJob execution failed
deferredWaiting for dependency to complete

Job Data Structure

Each job is stored as a Redis Hash with the following fields:

{
    'data': <pickled tuple of (func_name, instance, args, kwargs)>,
    'created_at': '2024-01-15T10:30:00',
    'origin': 'sms',                    # Queue name
    'original_queue': 'sms',            # Original queue (for requeue)
    'description': 'tasks.sms.sendSMS(...)',
    'enqueued_at': '2024-01-15T10:30:00',
    'started_at': '2024-01-15T10:30:01',
    'ended_at': '2024-01-15T10:30:02',
    'status': 'finished',
    'result': <pickled return value>,
    'exc_info': <exception traceback if failed>,
    'timeout': 180,
    'result_ttl': 500,
    
    # Custom Fusion fields
    'priority': 2,
    'retryCount': 7,
    'log_tag': 'SMS_OTP',
    'failed_task_handler': 'https://...',
    'success_task_handler': 'https://...'
}

Retry Mechanism

Fusion implements automatic retry with exponential backoff:

Retry Configuration

# Default retry count
job.retryCount = 7

# Retryable exceptions
- ClientError (AWS)
- ConnectTimeout
- ConnectionError  
- ReadTimeout

# Non-retryable (immediate failure)
- All other exceptions

Scheduler System

The scheduler runs as a separate process, periodically checking for jobs that are due:

Scheduler Flow

Scheduling Options

  1. enqueue_at() - Execute at specific datetime
  2. enqueue_in() - Execute after time delta
  3. cron() - Recurring execution using cron syntax

Cron Jobs

Defined in wrappers/scheduler_task.py:

# Run maintenance every 5 minutes
add(scheduler, {}, '*/5 * * * *', 
    'tasks.maintenance.run_maintenance_on_failed_queue',
    id='redis_maintenance_on_failed_queue')

Worker Process Model

Workers use a fork-based execution model for isolation:

Worker States

StateDescription
startedWorker just started
idleWaiting for jobs
busyExecuting a job
suspendedPaused by rq suspend

Heartbeat

Workers send heartbeats to Redis to indicate they're alive:

def heartbeat(self, timeout=0):
    timeout = max(timeout, self.default_worker_ttl)  # 420 seconds
    self.connection.expire(self.key, timeout)

Failed Queue

Failed jobs are moved to a special queue for inspection and retry:

# Queue name
rq:queue:failed

# Requeue operations
failed_queue.requeue(job_id)      # To original queue (List)
failed_queue.requeue_set(job_id)  # To priority queue (Sorted Set)

Connection Management

Fusion uses a connection stack pattern for managing Redis connections:

# Push connection for current context
push_connection(redis_conn)

# Get current connection
conn = get_current_connection()

# Pop when done
pop_connection()

# Context manager
with Connection(redis_conn):
    # Use connection
    pass

Logging & Monitoring

Elasticsearch Logging

All job events are logged to Elasticsearch:

# Index: active-fusion-logs
# Type: logs

{
    "job_id": "uuid",
    "task": "SMS",
    "task_type": "OTP", 
    "status": "queued|processing|completed|failed|retry",
    "host": "worker-hostname",
    "log_time": "2024-01-15T10:30:00",
    "message": "Status message",
    "payload": "...",
    "job_result": "..."
}

Log Events

EventMethodStatus
Job enqueuedLog.start()queued
Execution startedLog.execution_start()processing
Execution succeededLog.execution_success()completed
Execution failedLog.execution_failed()failed
Retry scheduledLog.execution_retry()retry

Best Practices

1. Choosing Priority

# HIGH (2) - User-facing, time-sensitive
- OTP SMS
- Password reset emails
- Critical reminders

# MODERATE (1) - Standard operations
- Report delivery
- Appointment confirmations
- Standard webhooks

# LOW (0) - Background, bulk operations
- Marketing campaigns
- Batch notifications
- Analytics webhooks

2. Setting Timeouts

# Default timeout: 180 seconds
# Webhook default: 15 seconds

# For slow operations, increase timeout:
payload = {
    'url': '...',
    'timeout': 60  # 60 seconds
}

3. Using Callbacks

# For tracking delivery status:
payload = {
    'callback': 1,
    'callback_url': 'https://your-api.com/delivery-status'
}

# For error handling:
data = {
    'failed_task_handler': 'https://your-api.com/handle-failure',
    'success_task_handler': 'https://your-api.com/handle-success'
}

On this page