ServicesFusion Worker

Fusion Worker

Background worker service that processes tasks from Redis queues

👤 Sai Tharun

Fusion Worker

Fusion Worker is the background task processing service that consumes jobs from Redis queues and executes the actual work - sending SMS, emails, push notifications, making webhook requests, and sending WhatsApp messages.

Repository Information

PropertyValue
Repositoryfusion_worker
LanguagePython 3.6+
Main Branchmain
ProjectCrelioHealth-Fusion (CFN)
CreatedApril 2021

Clone Repository

git clone git@bitbucket.org:creliohealth-repo/fusion_worker.git

Tech Stack

CategoryTechnologyPurpose
RuntimePython 3.6+Application runtime
Queue BackendRedis 2.10+Job queue storage
Queue LibraryRQ (Custom Fork)Task queue processing
AWS SDKBoto3SES, SNS, S3 integration
HTTP ClientRequestsWebhook calls
LoggingElasticsearch 5.xCentralized logging
Error TrackingRaven (Sentry)Exception monitoring
SMS50+ Provider APIsSMS delivery (MSG91, Twilio, etc.)
EmailAWS SESEmail delivery
Push NotificationsFCM / APNsMobile push notifications

System Role


Directory Structure

fusion_worker/
├── run_worker.py              # Worker entry point
├── config.py                  # Configuration loader
├── rq/                        # Custom RQ implementation
│   ├── worker.py              # Enhanced worker with retry logic
│   ├── job.py                 # Job class
│   ├── queue.py               # Queue class
│   ├── connections.py         # Redis connection management
│   └── cli/                   # CLI commands
│       └── cli.py             # Worker CLI
├── tasks/                     # Task implementations
│   ├── sms.py                 # SMS sending logic
│   ├── _email_v2.py           # Email via AWS SES
│   ├── push_notification.py   # Push notifications
│   ├── webhooks.py            # HTTP webhooks
│   ├── whatsapp.py            # WhatsApp messaging
│   ├── plugin_sms.py          # SMS provider plugins
│   ├── plugin_whatsapp.py     # WhatsApp plugins
│   ├── sms_gateways.py        # SMS gateway definitions
│   └── maintenance.py         # Queue maintenance tasks
├── wrappers/                  # Utility wrappers
│   ├── Logger.py              # Elasticsearch logging
│   └── _utils.py              # Helper functions
├── certificates/              # SSL certificates
└── instance/                  # Environment configs
    ├── production.py          # Production settings
    └── local.py               # Local development settings

Running the Worker

Local Development

# Create virtual environment
python3.6 -m venv venv
source venv/bin/activate

# Install dependencies
pip install -r requirement.txt

# Set environment variables
export CRELIO_DEPLOYMENT_ZONE="IN"
export CRELIO_DEPLOYMENT_MODE="LOCAL"

# Run worker for all queues
python run_worker.py worker -c instance --customq all

Using Procfile

worker: CRELIO_DEPLOYMENT_ZONE="IN" CRELIO_DEPLOYMENT_MODE="LOCAL" python run_worker.py worker -c instance --customq all

CLI Options

python run_worker.py worker [OPTIONS]

Options:
  -c, --config TEXT       Module containing RQ settings (e.g., 'instance')
  --customq TEXT          Specific queue to work on, or 'all' for all queues
  -b, --burst             Run in burst mode (quit when queues are empty)
  -n, --name TEXT         Worker name
  --logging_level TEXT    Logging level (default: INFO)
  -v, --verbose           Show more output
  -q, --quiet             Show less output
  --results-ttl INTEGER   Result TTL in seconds
  --worker-ttl INTEGER    Worker TTL in seconds

Worker Execution Flow


Key Differences from Standard RQ

1. Priority-Based Dequeuing

Standard RQ uses LPOP (FIFO), but Fusion Worker uses:

# Get highest priority job
_items = connection.zrevrange(current_queue, 0, 0)

# Atomic removal
if connection.zrem(current_queue, _item) == 1:
    # Successfully claimed the job
    result = _item

2. Scheduler Queue Handling

The scheduler queue uses a different mechanism (list-based):

if current_queue == 'rq:queue:scheduler':
    _item = self.lpop([current_queue], timeout, connection)
else:
    _items = connection.zrevrange(current_queue, 0, 0)

3. Enhanced Retry Logic

def handle_job_failure(self, job, queue, started_job_registry=None, retry_flag=False):
    if job.retryCount < 10 and retry_flag:
        # Schedule retry
        job.retryCount += 1
        job.save()
        requests.post(settings.FUSION_SERVER + '/retryTask/', json=job._args)
    else:
        # Call failure callback and move to failed queue
        if job.failed_task_handler:
            self.failed_task_callback(job)
        self.custom_failed_handle(job, started_job_registry)

4. Callback System

# On success
def success_task_callback(self, job):
    job_meta = {
        'description': job.description,
        'job_id': job.id,
        'status': job._status,
        'retry_attempts': job.retryCount
    }
    requests.post(job.success_task_handler, data=job_meta)

# On failure
def failed_task_callback(self, job):
    job_meta = {
        'description': job.description,
        'failure_trace': job.exc_info,
        'job_id': job.id,
        'status': job._status
    }
    requests.post(job.failed_task_handler, data=job_meta)

Queues Configuration

By default, workers process all queues except dictionary and report:

queues = list(set(queues) - set(['dictionary', 'report']))

Available Queues

QueueDescription
smsSMS messages
emailEmail messages
pnPush notifications
webhookHTTP webhook requests
whatsappWhatsApp messages
schedulerScheduled jobs (special handling)
schedulerSetScheduler sorted set
dictionaryDictionary mapping (excluded by default)
reportReport processing (excluded by default)

Error Handling

Retryable Exceptions

These exceptions trigger automatic retry:

  • ClientError (AWS)
  • ConnectTimeout
  • ConnectionError
  • ReadTimeout

Non-Retryable Exceptions

All other exceptions move the job directly to the failed queue.

Whitelist Checking

For webhook tasks, URLs are validated against a whitelist:

def isWhitelisted(job):
    if job._args[0]['payload']['task'] != 4:
        return 1  # Non-webhook tasks always pass
    
    test = re.match(settings.regex2, job._args[0]['payload']['url'])
    if 'staging' in job._args[0]['payload']['url']:
        job.retryCount += 10  # Allow more retries for staging
        return 1
    # ... additional validation

Maintenance Tasks

The worker runs periodic maintenance every 24 hours:

@property
def should_run_maintenance_tasks(self):
    if self.last_cleaned_at is None:
        return True
    if (utcnow() - self.last_cleaned_at) > timedelta(hours=24):
        return True
    return False

What Maintenance Does

  1. Clean registries - Remove stale jobs from started/finished registries
  2. Remove failed jobs - Clean up old entries from failed queue
def clean_registries(self):
    def remove(job_id):
        if self.connection.delete(f"rq:job:{job_id}"):
            self.connection.lrem("rq:queue:failed", 0, job_id)
    
    for queue in self.queues:
        clean_registries(queue)
    self.last_cleaned_at = utcnow()

On this page