ServicesFusion Worker
Fusion Worker
Background worker service that processes tasks from Redis queues
👤 Sai Tharun
Fusion Worker
Fusion Worker is the background task processing service that consumes jobs from Redis queues and executes the actual work - sending SMS, emails, push notifications, making webhook requests, and sending WhatsApp messages.
Repository Information
| Property | Value |
|---|---|
| Repository | fusion_worker |
| Language | Python 3.6+ |
| Main Branch | main |
| Project | CrelioHealth-Fusion (CFN) |
| Created | April 2021 |
Clone Repository
git clone git@bitbucket.org:creliohealth-repo/fusion_worker.gitTech Stack
| Category | Technology | Purpose |
|---|---|---|
| Runtime | Python 3.6+ | Application runtime |
| Queue Backend | Redis 2.10+ | Job queue storage |
| Queue Library | RQ (Custom Fork) | Task queue processing |
| AWS SDK | Boto3 | SES, SNS, S3 integration |
| HTTP Client | Requests | Webhook calls |
| Logging | Elasticsearch 5.x | Centralized logging |
| Error Tracking | Raven (Sentry) | Exception monitoring |
| SMS | 50+ Provider APIs | SMS delivery (MSG91, Twilio, etc.) |
| AWS SES | Email delivery | |
| Push Notifications | FCM / APNs | Mobile push notifications |
System Role
Directory Structure
fusion_worker/
├── run_worker.py # Worker entry point
├── config.py # Configuration loader
├── rq/ # Custom RQ implementation
│ ├── worker.py # Enhanced worker with retry logic
│ ├── job.py # Job class
│ ├── queue.py # Queue class
│ ├── connections.py # Redis connection management
│ └── cli/ # CLI commands
│ └── cli.py # Worker CLI
├── tasks/ # Task implementations
│ ├── sms.py # SMS sending logic
│ ├── _email_v2.py # Email via AWS SES
│ ├── push_notification.py # Push notifications
│ ├── webhooks.py # HTTP webhooks
│ ├── whatsapp.py # WhatsApp messaging
│ ├── plugin_sms.py # SMS provider plugins
│ ├── plugin_whatsapp.py # WhatsApp plugins
│ ├── sms_gateways.py # SMS gateway definitions
│ └── maintenance.py # Queue maintenance tasks
├── wrappers/ # Utility wrappers
│ ├── Logger.py # Elasticsearch logging
│ └── _utils.py # Helper functions
├── certificates/ # SSL certificates
└── instance/ # Environment configs
├── production.py # Production settings
└── local.py # Local development settingsRunning the Worker
Local Development
# Create virtual environment
python3.6 -m venv venv
source venv/bin/activate
# Install dependencies
pip install -r requirement.txt
# Set environment variables
export CRELIO_DEPLOYMENT_ZONE="IN"
export CRELIO_DEPLOYMENT_MODE="LOCAL"
# Run worker for all queues
python run_worker.py worker -c instance --customq allUsing Procfile
worker: CRELIO_DEPLOYMENT_ZONE="IN" CRELIO_DEPLOYMENT_MODE="LOCAL" python run_worker.py worker -c instance --customq allCLI Options
python run_worker.py worker [OPTIONS]
Options:
-c, --config TEXT Module containing RQ settings (e.g., 'instance')
--customq TEXT Specific queue to work on, or 'all' for all queues
-b, --burst Run in burst mode (quit when queues are empty)
-n, --name TEXT Worker name
--logging_level TEXT Logging level (default: INFO)
-v, --verbose Show more output
-q, --quiet Show less output
--results-ttl INTEGER Result TTL in seconds
--worker-ttl INTEGER Worker TTL in secondsWorker Execution Flow
Key Differences from Standard RQ
1. Priority-Based Dequeuing
Standard RQ uses LPOP (FIFO), but Fusion Worker uses:
# Get highest priority job
_items = connection.zrevrange(current_queue, 0, 0)
# Atomic removal
if connection.zrem(current_queue, _item) == 1:
# Successfully claimed the job
result = _item2. Scheduler Queue Handling
The scheduler queue uses a different mechanism (list-based):
if current_queue == 'rq:queue:scheduler':
_item = self.lpop([current_queue], timeout, connection)
else:
_items = connection.zrevrange(current_queue, 0, 0)3. Enhanced Retry Logic
def handle_job_failure(self, job, queue, started_job_registry=None, retry_flag=False):
if job.retryCount < 10 and retry_flag:
# Schedule retry
job.retryCount += 1
job.save()
requests.post(settings.FUSION_SERVER + '/retryTask/', json=job._args)
else:
# Call failure callback and move to failed queue
if job.failed_task_handler:
self.failed_task_callback(job)
self.custom_failed_handle(job, started_job_registry)4. Callback System
# On success
def success_task_callback(self, job):
job_meta = {
'description': job.description,
'job_id': job.id,
'status': job._status,
'retry_attempts': job.retryCount
}
requests.post(job.success_task_handler, data=job_meta)
# On failure
def failed_task_callback(self, job):
job_meta = {
'description': job.description,
'failure_trace': job.exc_info,
'job_id': job.id,
'status': job._status
}
requests.post(job.failed_task_handler, data=job_meta)Queues Configuration
By default, workers process all queues except dictionary and report:
queues = list(set(queues) - set(['dictionary', 'report']))Available Queues
| Queue | Description |
|---|---|
sms | SMS messages |
email | Email messages |
pn | Push notifications |
webhook | HTTP webhook requests |
whatsapp | WhatsApp messages |
scheduler | Scheduled jobs (special handling) |
schedulerSet | Scheduler sorted set |
dictionary | Dictionary mapping (excluded by default) |
report | Report processing (excluded by default) |
Error Handling
Retryable Exceptions
These exceptions trigger automatic retry:
ClientError(AWS)ConnectTimeoutConnectionErrorReadTimeout
Non-Retryable Exceptions
All other exceptions move the job directly to the failed queue.
Whitelist Checking
For webhook tasks, URLs are validated against a whitelist:
def isWhitelisted(job):
if job._args[0]['payload']['task'] != 4:
return 1 # Non-webhook tasks always pass
test = re.match(settings.regex2, job._args[0]['payload']['url'])
if 'staging' in job._args[0]['payload']['url']:
job.retryCount += 10 # Allow more retries for staging
return 1
# ... additional validationMaintenance Tasks
The worker runs periodic maintenance every 24 hours:
@property
def should_run_maintenance_tasks(self):
if self.last_cleaned_at is None:
return True
if (utcnow() - self.last_cleaned_at) > timedelta(hours=24):
return True
return FalseWhat Maintenance Does
- Clean registries - Remove stale jobs from started/finished registries
- Remove failed jobs - Clean up old entries from failed queue
def clean_registries(self):
def remove(job_id):
if self.connection.delete(f"rq:job:{job_id}"):
self.connection.lrem("rq:queue:failed", 0, job_id)
for queue in self.queues:
clean_registries(queue)
self.last_cleaned_at = utcnow()