Worker Architecture
Deep dive into Fusion Worker's internal architecture and processing model
Fusion Worker Architecture
Fusion Worker is a multi-process, fork-based task execution system that consumes jobs from Redis priority queues and executes them in isolated child processes.
High-Level Architecture
Process Model
Fusion Worker uses a fork-based execution model for process isolation and safety.
Why Fork?
- Isolation - Each job runs in its own process; crashes don't affect the main worker
- Memory Safety - Child process memory is freed on exit
- Timeout Enforcement - Parent can kill hung child processes
- Clean State - Each job starts with a fresh process state
Fork Flow
Worker State Machine
Worker States
| State | Description | Redis Key Value |
|---|---|---|
started | Worker just initialized | state: started |
idle | Waiting for jobs | state: idle |
busy | Executing a job | state: busy |
suspended | Paused by admin | state: suspended |
Queue Processing
Priority Queue Mechanics
Unlike standard RQ which uses Redis Lists (FIFO), Fusion Worker uses Sorted Sets (ZSET) for priority-based processing.
Atomic Dequeue
To prevent race conditions with multiple workers:
def work(self):
while True:
# Get highest priority job
_items = connection.zrevrange(current_queue, 0, 0)
if len(_items) > 0:
_item = _items[0]
else:
continue # Queue empty
# Atomically remove - only one worker can succeed
if connection.zrem(current_queue, _item) == 1:
# We got the job
result = _item
else:
# Another worker got it first
continueQueue Round-Robin
Workers iterate through all queues in round-robin fashion:
queue_keys = [q.key for q in self.queues]
queues_length = len(queue_keys)
current_queue_index = -1
while True:
current_queue_index += 1
if current_queue_index >= queues_length:
current_queue_index = 0
current_queue = queue_keys[current_queue_index]
# Try to get job from this queue
# If empty, move to next queueSpecial Scheduler Queue Handling
The scheduler queue uses list-based (FIFO) processing:
if current_queue == 'rq:queue:scheduler':
# Use BLPOP for scheduler queue
_item = self.lpop([current_queue], timeout, connection)
else:
# Use ZREVRANGE for priority queues
_items = connection.zrevrange(current_queue, 0, 0)Job Execution Pipeline
Preparation Phase
def prepare_job_execution(self, job):
timeout = (job.timeout or 180) + 60
with self.connection.pipeline() as pipeline:
# Mark worker as busy
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
# Track current job
self.set_current_job_id(job.id, pipeline=pipeline)
# Extend worker TTL
self.heartbeat(timeout, pipeline=pipeline)
# Add to started registry
registry = StartedJobRegistry(job.origin, self.connection)
registry.add(job, timeout, pipeline=pipeline)
# Update job status
job.set_status(JobStatus.STARTED, pipeline=pipeline)
pipeline.execute()Execution Phase (Work Horse)
def perform_job(self, job, queue):
self.prepare_job_execution(job)
push_connection(self.connection)
try:
with self.death_penalty_class(job.timeout or 180):
rv = job.perform() # Execute the actual task
job.ended_at = utcnow()
job._result = rv
self.handle_job_success(job, queue, started_job_registry)
except (ClientError, ConnectTimeout, ConnectionError, ReadTimeout):
# Retryable exceptions
self.handle_job_failure(job, queue, retry_flag=True)
except Exception:
# Non-retryable - move to failed queue
self.handle_job_failure(job, queue, retry_flag=False)
finally:
pop_connection()Retry System
Retry Implementation
def handle_job_failure(self, job, queue, retry_flag=False):
if job.retryCount < 10 and retry_flag:
# Schedule retry
job.retryCount += 1
job.save()
# Call Fusion to schedule retry in 2 minutes
requests.post(
settings.FUSION_SERVER + '/retryTask/',
json=job._args
)
# Log retry event
Log.execution_retry(
job_id=job.id,
message=f'Job failed at retry attempt {job.retryCount}'
)
else:
# Max retries exceeded or non-retryable
if job.failed_task_handler:
self.failed_task_callback(job)
# Move to failed queue
self.move_to_failed_queue(job, *sys.exc_info())Signal Handling
Parent Process (Worker)
def _install_signal_handlers(self):
signal.signal(signal.SIGINT, self.request_stop) # Ctrl+C
signal.signal(signal.SIGTERM, self.request_stop) # killChild Process (Work Horse)
def setup_work_horse_signals(self):
# Ignore Ctrl+C - parent handles it
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Default SIGTERM handling
signal.signal(signal.SIGTERM, signal.SIG_DFL)Warm Shutdown
Heartbeat System
Workers send periodic heartbeats to prevent being marked as dead:
def heartbeat(self, timeout=0):
timeout = max(timeout, self.default_worker_ttl) # 420 seconds
self.connection.expire(self.key, timeout)Worker Registration
# Worker key structure
rq:worker:<hostname>.<pid>
# Hash fields
{
'birth': '2024-01-15T10:30:00Z',
'queues': 'sms,email,pn,webhook,whatsapp',
'state': 'idle',
'current_job': 'job-uuid-if-busy'
}Monitoring & Observability
Key Redis Keys to Monitor
| Key | Description |
|---|---|
rq:workers | Set of all registered workers |
rq:worker:<name> | Individual worker status |
rq:queue:<name> | Queue length (ZCARD) |
rq:queue:failed | Failed jobs count |
Elasticsearch Logs
All job events are logged:
{
"index": "active-fusion-logs",
"events": [
"queued",
"processing",
"completed",
"failed",
"retry"
]
}CLI Commands
# View queue status
python run_worker.py info
# View workers
python run_worker.py info -W
# Suspend all workers
python run_worker.py suspend
# Resume workers
python run_worker.py resume
# Requeue failed jobs
python run_worker.py requeue --allScaling Considerations
Horizontal Scaling
Run multiple worker processes on the same or different machines:
# Machine 1
python run_worker.py worker -c instance --customq all
# Machine 2
python run_worker.py worker -c instance --customq all
# Machine 3 (specialized for webhook queue only)
python run_worker.py worker -c instance --customq webhookQueue Isolation
For high-volume queues, run dedicated workers:
# SMS-only workers (high priority)
python run_worker.py worker -c instance --customq sms
# Email workers
python run_worker.py worker -c instance --customq email
# Webhook workers
python run_worker.py worker -c instance --customq webhookResource Tuning
| Setting | Default | Description |
|---|---|---|
worker_ttl | 420s | Worker heartbeat TTL |
result_ttl | 500s | How long to keep job results |
job.timeout | 180s | Max job execution time |
| Sleep interval | 3s | Time to wait when queues are empty |