Fusion
Task Queue API Server - The central hub for scheduling and routing background tasks
Fusion Service
Fusion is a Flask-based task queue API server that serves as the central hub for scheduling and routing background tasks in CrelioHealth. It provides HTTP endpoints for enqueueing tasks like SMS, Email, Push Notifications, Webhooks, and WhatsApp messages into Redis queues for processing by Fusion Worker.
Repository Information
| Property | Value |
|---|---|
| Repository | fusion |
| Language | Python 3.6+ |
| Main Branch | main |
| Project | CrelioHealth-Fusion (CFN) |
| Framework | Flask |
| Queue Backend | Redis (RQ - Redis Queue) |
Clone Repository
git clone git@bitbucket.org:creliohealth-repo/fusion.gitTech Stack
| Category | Technology | Purpose |
|---|---|---|
| Runtime | Python 3.6+ | Application runtime |
| Web Framework | Flask 0.12 | HTTP API server |
| Queue Backend | Redis 3.5 | Job queue storage |
| Queue Library | RQ (Custom Fork) | Task queue management |
| Scheduler | RQ Scheduler | Delayed/Cron job scheduling |
| AWS SDK | Boto3 | AWS Secrets Manager integration |
| Logging | Elasticsearch 6.x | Centralized logging |
| Monitoring | Raven (Sentry) | Error tracking |
| Process Manager | Supervisor | Production process management |
| WSGI Server | Gunicorn + Gevent | Production HTTP server |
System Architecture Overview
Key Components
1. Flask API Server (app.py)
The main entry point that exposes the following HTTP endpoints:
| Endpoint | Method | Description |
|---|---|---|
/queue/ | POST | Immediately enqueue tasks for processing |
/schedule/ | POST | Schedule tasks for future execution |
/jobInfo/ | POST | Get information about a specific job |
/cancelJob/ | POST | Cancel scheduled jobs |
/rescheduleJob/ | POST | Reschedule a job to a new time |
/mapping/ | POST | Queue dictionary mapping tasks |
/retryTask/ | POST | Retry a failed task |
/update_consent/ | POST | Update WhatsApp consent status |
2. Priority Queue (wrappers/PriorityQueue.py)
A custom priority-based queue implementation using Redis Sorted Sets (ZSET):
# Priority Levels
PRIORITY_HIGH = 2 # OTP, Reminders
PRIORITY_MODERATE = 1 # Standard messages
PRIORITY_LOW = 0 # Campaigns, Bulk operationsJobs are stored with their priority as the score, allowing higher-priority jobs to be processed first using ZREVRANGE.
3. RQ Scheduler (rq_scheduler/scheduler.py)
Handles time-based job scheduling:
enqueue_at()- Schedule a job for a specific datetimeenqueue_in()- Schedule a job after a time deltacron()- Schedule recurring jobs using cron syntax- Jobs are stored in
rq:scheduler:scheduled_jobssorted set
4. Task Types and Routing
# Task to Queue Mapping
Task 1 (SMS) → Queue: 'sms' → tasks.sms.sendSMS
Task 2 (EMAIL) → Queue: 'email' → tasks._email_v2.send_plain_email
Task 3 (PN) → Queue: 'pn' → tasks.push_notification.sendNotification
Task 4 (WEBHOOK) → Queue: 'webhook' → tasks.webhooks.GET/POST/PUT/DELETE
Task 6 (WHATSAPP) → Queue: 'whatsapp' → tasks.whatsapp.sendWhatsappDirectory Structure
fusion/
├── app.py # Main Flask application
├── config.py # AWS Secrets Manager configuration
├── run_worker.py # Worker entry point
├── run_schduler.py # Scheduler entry point
├── run_dashboard.py # RQ Dashboard entry point
├── rq/ # Custom RQ implementation
│ ├── queue.py # Queue class with priority support
│ ├── job.py # Job class with retry & callbacks
│ ├── worker.py # Worker class
│ ├── connections.py # Redis connection management
│ └── cli/ # CLI commands
├── rq_scheduler/ # Scheduler implementation
│ ├── scheduler.py # Main scheduler logic
│ └── utils.py # Utility functions
├── wrappers/ # Utility wrappers
│ ├── PriorityQueue.py # Priority queue implementation
│ ├── Logger.py # Elasticsearch logging
│ ├── scheduler_task.py # Cron job definitions
│ └── _utils.py # Helper functions
├── instance/ # Environment configs
│ ├── production.py # Production settings
│ └── local.py # Local development settings
└── dashboard/ # RQ DashboardRunning Fusion
Local Development
# Create virtual environment
python3.6 -m venv venv
source venv/bin/activate
# Install dependencies
pip install -r req.txt
# Set environment variables
export CRELIO_DEPLOYMENT_ZONE="IN"
export CRELIO_DEPLOYMENT_MODE="LOCAL"
# Run Flask server
flask run -h 0.0.0.0 -p 8002 --reload
# Run scheduler (separate terminal)
python run_schduler.py
# Run dashboard (separate terminal)
python run_dashboard.pyUsing Procfile
# Web server
web: CRELIO_DEPLOYMENT_ZONE="IN" CRELIO_DEPLOYMENT_MODE="LOCAL" FLASK_APP=app.py flask run -h 0.0.0.0 -p 8002 --reload
# Scheduler
scheduler: python run_schduler.py
# Dashboard
dashboard: python run_dashboard.pyConfiguration
Environment Variables
| Variable | Description | Example |
|---|---|---|
CRELIO_DEPLOYMENT_ZONE | Deployment region | IN, US, EU, UAE, KSA |
CRELIO_DEPLOYMENT_MODE | Deployment mode | prod, staging, LOCAL |
Secrets Management
Secrets are fetched from AWS Secrets Manager based on the deployment zone:
# Secret naming convention
secret_name = f"{deployment_mode}/{deployment_region}/fusion".lower()
# Example: "prod/in/fusion"For local development, secrets are loaded from secrets.json in the parent directory.