ServicesFusion

Fusion

Task Queue API Server - The central hub for scheduling and routing background tasks

👤 Sai Tharun

Fusion Service

Fusion is a Flask-based task queue API server that serves as the central hub for scheduling and routing background tasks in CrelioHealth. It provides HTTP endpoints for enqueueing tasks like SMS, Email, Push Notifications, Webhooks, and WhatsApp messages into Redis queues for processing by Fusion Worker.

Repository Information

PropertyValue
Repositoryfusion
LanguagePython 3.6+
Main Branchmain
ProjectCrelioHealth-Fusion (CFN)
FrameworkFlask
Queue BackendRedis (RQ - Redis Queue)

Clone Repository

git clone git@bitbucket.org:creliohealth-repo/fusion.git

Tech Stack

CategoryTechnologyPurpose
RuntimePython 3.6+Application runtime
Web FrameworkFlask 0.12HTTP API server
Queue BackendRedis 3.5Job queue storage
Queue LibraryRQ (Custom Fork)Task queue management
SchedulerRQ SchedulerDelayed/Cron job scheduling
AWS SDKBoto3AWS Secrets Manager integration
LoggingElasticsearch 6.xCentralized logging
MonitoringRaven (Sentry)Error tracking
Process ManagerSupervisorProduction process management
WSGI ServerGunicorn + GeventProduction HTTP server

System Architecture Overview


Key Components

1. Flask API Server (app.py)

The main entry point that exposes the following HTTP endpoints:

EndpointMethodDescription
/queue/POSTImmediately enqueue tasks for processing
/schedule/POSTSchedule tasks for future execution
/jobInfo/POSTGet information about a specific job
/cancelJob/POSTCancel scheduled jobs
/rescheduleJob/POSTReschedule a job to a new time
/mapping/POSTQueue dictionary mapping tasks
/retryTask/POSTRetry a failed task
/update_consent/POSTUpdate WhatsApp consent status

2. Priority Queue (wrappers/PriorityQueue.py)

A custom priority-based queue implementation using Redis Sorted Sets (ZSET):

# Priority Levels
PRIORITY_HIGH = 2      # OTP, Reminders
PRIORITY_MODERATE = 1  # Standard messages
PRIORITY_LOW = 0       # Campaigns, Bulk operations

Jobs are stored with their priority as the score, allowing higher-priority jobs to be processed first using ZREVRANGE.

3. RQ Scheduler (rq_scheduler/scheduler.py)

Handles time-based job scheduling:

  • enqueue_at() - Schedule a job for a specific datetime
  • enqueue_in() - Schedule a job after a time delta
  • cron() - Schedule recurring jobs using cron syntax
  • Jobs are stored in rq:scheduler:scheduled_jobs sorted set

4. Task Types and Routing

# Task to Queue Mapping
Task 1 (SMS)      → Queue: 'sms'      → tasks.sms.sendSMS
Task 2 (EMAIL)    → Queue: 'email'    → tasks._email_v2.send_plain_email
Task 3 (PN)       → Queue: 'pn'       → tasks.push_notification.sendNotification
Task 4 (WEBHOOK)  → Queue: 'webhook'  → tasks.webhooks.GET/POST/PUT/DELETE
Task 6 (WHATSAPP) → Queue: 'whatsapp' → tasks.whatsapp.sendWhatsapp

Directory Structure

fusion/
├── app.py                    # Main Flask application
├── config.py                 # AWS Secrets Manager configuration
├── run_worker.py             # Worker entry point
├── run_schduler.py           # Scheduler entry point
├── run_dashboard.py          # RQ Dashboard entry point
├── rq/                       # Custom RQ implementation
│   ├── queue.py              # Queue class with priority support
│   ├── job.py                # Job class with retry & callbacks
│   ├── worker.py             # Worker class
│   ├── connections.py        # Redis connection management
│   └── cli/                  # CLI commands
├── rq_scheduler/             # Scheduler implementation
│   ├── scheduler.py          # Main scheduler logic
│   └── utils.py              # Utility functions
├── wrappers/                 # Utility wrappers
│   ├── PriorityQueue.py      # Priority queue implementation
│   ├── Logger.py             # Elasticsearch logging
│   ├── scheduler_task.py     # Cron job definitions
│   └── _utils.py             # Helper functions
├── instance/                 # Environment configs
│   ├── production.py         # Production settings
│   └── local.py              # Local development settings
└── dashboard/                # RQ Dashboard

Running Fusion

Local Development

# Create virtual environment
python3.6 -m venv venv
source venv/bin/activate

# Install dependencies
pip install -r req.txt

# Set environment variables
export CRELIO_DEPLOYMENT_ZONE="IN"
export CRELIO_DEPLOYMENT_MODE="LOCAL"

# Run Flask server
flask run -h 0.0.0.0 -p 8002 --reload

# Run scheduler (separate terminal)
python run_schduler.py

# Run dashboard (separate terminal)
python run_dashboard.py

Using Procfile

# Web server
web: CRELIO_DEPLOYMENT_ZONE="IN" CRELIO_DEPLOYMENT_MODE="LOCAL" FLASK_APP=app.py flask run -h 0.0.0.0 -p 8002 --reload

# Scheduler
scheduler: python run_schduler.py

# Dashboard
dashboard: python run_dashboard.py

Configuration

Environment Variables

VariableDescriptionExample
CRELIO_DEPLOYMENT_ZONEDeployment regionIN, US, EU, UAE, KSA
CRELIO_DEPLOYMENT_MODEDeployment modeprod, staging, LOCAL

Secrets Management

Secrets are fetched from AWS Secrets Manager based on the deployment zone:

# Secret naming convention
secret_name = f"{deployment_mode}/{deployment_region}/fusion".lower()
# Example: "prod/in/fusion"

For local development, secrets are loaded from secrets.json in the parent directory.

On this page