ServicesFusion Worker

Worker Architecture

Deep dive into Fusion Worker's internal architecture and processing model

👤 Sai Tharun

Fusion Worker Architecture

Fusion Worker is a multi-process, fork-based task execution system that consumes jobs from Redis priority queues and executes them in isolated child processes.


High-Level Architecture


Process Model

Fusion Worker uses a fork-based execution model for process isolation and safety.

Why Fork?

  1. Isolation - Each job runs in its own process; crashes don't affect the main worker
  2. Memory Safety - Child process memory is freed on exit
  3. Timeout Enforcement - Parent can kill hung child processes
  4. Clean State - Each job starts with a fresh process state

Fork Flow


Worker State Machine

Worker States

StateDescriptionRedis Key Value
startedWorker just initializedstate: started
idleWaiting for jobsstate: idle
busyExecuting a jobstate: busy
suspendedPaused by adminstate: suspended

Queue Processing

Priority Queue Mechanics

Unlike standard RQ which uses Redis Lists (FIFO), Fusion Worker uses Sorted Sets (ZSET) for priority-based processing.

Atomic Dequeue

To prevent race conditions with multiple workers:

def work(self):
    while True:
        # Get highest priority job
        _items = connection.zrevrange(current_queue, 0, 0)
        if len(_items) > 0:
            _item = _items[0]
        else:
            continue  # Queue empty
        
        # Atomically remove - only one worker can succeed
        if connection.zrem(current_queue, _item) == 1:
            # We got the job
            result = _item
        else:
            # Another worker got it first
            continue

Queue Round-Robin

Workers iterate through all queues in round-robin fashion:

queue_keys = [q.key for q in self.queues]
queues_length = len(queue_keys)
current_queue_index = -1

while True:
    current_queue_index += 1
    if current_queue_index >= queues_length:
        current_queue_index = 0
    
    current_queue = queue_keys[current_queue_index]
    # Try to get job from this queue
    # If empty, move to next queue

Special Scheduler Queue Handling

The scheduler queue uses list-based (FIFO) processing:

if current_queue == 'rq:queue:scheduler':
    # Use BLPOP for scheduler queue
    _item = self.lpop([current_queue], timeout, connection)
else:
    # Use ZREVRANGE for priority queues
    _items = connection.zrevrange(current_queue, 0, 0)

Job Execution Pipeline

Preparation Phase

def prepare_job_execution(self, job):
    timeout = (job.timeout or 180) + 60
    
    with self.connection.pipeline() as pipeline:
        # Mark worker as busy
        self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
        
        # Track current job
        self.set_current_job_id(job.id, pipeline=pipeline)
        
        # Extend worker TTL
        self.heartbeat(timeout, pipeline=pipeline)
        
        # Add to started registry
        registry = StartedJobRegistry(job.origin, self.connection)
        registry.add(job, timeout, pipeline=pipeline)
        
        # Update job status
        job.set_status(JobStatus.STARTED, pipeline=pipeline)
        
        pipeline.execute()

Execution Phase (Work Horse)

def perform_job(self, job, queue):
    self.prepare_job_execution(job)
    push_connection(self.connection)
    
    try:
        with self.death_penalty_class(job.timeout or 180):
            rv = job.perform()  # Execute the actual task
        
        job.ended_at = utcnow()
        job._result = rv
        
        self.handle_job_success(job, queue, started_job_registry)
        
    except (ClientError, ConnectTimeout, ConnectionError, ReadTimeout):
        # Retryable exceptions
        self.handle_job_failure(job, queue, retry_flag=True)
        
    except Exception:
        # Non-retryable - move to failed queue
        self.handle_job_failure(job, queue, retry_flag=False)
        
    finally:
        pop_connection()

Retry System

Retry Implementation

def handle_job_failure(self, job, queue, retry_flag=False):
    if job.retryCount < 10 and retry_flag:
        # Schedule retry
        job.retryCount += 1
        job.save()
        
        # Call Fusion to schedule retry in 2 minutes
        requests.post(
            settings.FUSION_SERVER + '/retryTask/', 
            json=job._args
        )
        
        # Log retry event
        Log.execution_retry(
            job_id=job.id,
            message=f'Job failed at retry attempt {job.retryCount}'
        )
    else:
        # Max retries exceeded or non-retryable
        if job.failed_task_handler:
            self.failed_task_callback(job)
        
        # Move to failed queue
        self.move_to_failed_queue(job, *sys.exc_info())

Signal Handling

Parent Process (Worker)

def _install_signal_handlers(self):
    signal.signal(signal.SIGINT, self.request_stop)   # Ctrl+C
    signal.signal(signal.SIGTERM, self.request_stop)  # kill

Child Process (Work Horse)

def setup_work_horse_signals(self):
    # Ignore Ctrl+C - parent handles it
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    # Default SIGTERM handling
    signal.signal(signal.SIGTERM, signal.SIG_DFL)

Warm Shutdown


Heartbeat System

Workers send periodic heartbeats to prevent being marked as dead:

def heartbeat(self, timeout=0):
    timeout = max(timeout, self.default_worker_ttl)  # 420 seconds
    self.connection.expire(self.key, timeout)

Worker Registration

# Worker key structure
rq:worker:<hostname>.<pid>

# Hash fields
{
    'birth': '2024-01-15T10:30:00Z',
    'queues': 'sms,email,pn,webhook,whatsapp',
    'state': 'idle',
    'current_job': 'job-uuid-if-busy'
}

Monitoring & Observability

Key Redis Keys to Monitor

KeyDescription
rq:workersSet of all registered workers
rq:worker:<name>Individual worker status
rq:queue:<name>Queue length (ZCARD)
rq:queue:failedFailed jobs count

Elasticsearch Logs

All job events are logged:

{
  "index": "active-fusion-logs",
  "events": [
    "queued",
    "processing", 
    "completed",
    "failed",
    "retry"
  ]
}

CLI Commands

# View queue status
python run_worker.py info

# View workers
python run_worker.py info -W

# Suspend all workers
python run_worker.py suspend

# Resume workers
python run_worker.py resume

# Requeue failed jobs
python run_worker.py requeue --all

Scaling Considerations

Horizontal Scaling

Run multiple worker processes on the same or different machines:

# Machine 1
python run_worker.py worker -c instance --customq all

# Machine 2
python run_worker.py worker -c instance --customq all

# Machine 3 (specialized for webhook queue only)
python run_worker.py worker -c instance --customq webhook

Queue Isolation

For high-volume queues, run dedicated workers:

# SMS-only workers (high priority)
python run_worker.py worker -c instance --customq sms

# Email workers
python run_worker.py worker -c instance --customq email

# Webhook workers
python run_worker.py worker -c instance --customq webhook

Resource Tuning

SettingDefaultDescription
worker_ttl420sWorker heartbeat TTL
result_ttl500sHow long to keep job results
job.timeout180sMax job execution time
Sleep interval3sTime to wait when queues are empty

On this page