Task Implementations
Detailed documentation of all task handlers in Fusion Worker
Task Implementations
This document details all the task handlers implemented in Fusion Worker, including their payloads, providers, and behavior.
SMS Tasks (tasks/sms.py)
Overview
SMS tasks are routed to different providers based on configuration. The system supports 50+ SMS providers across different regions.
Functions
sendSMS(payload)
Main SMS sending function that routes to the appropriate provider.
Payload Structure:
{
"payload": {
"contact": 9876543210,
"message": "Your message here",
"countryCode": 91,
"senderId": "CRELIO",
"config": {
"provider": "msg91"
},
"provider_id": "provider-uuid",
"credit_deducted": 1,
"callback_url": "https://..."
}
}sendOTPSMS(payload)
Specialized function for OTP delivery with fast channel support.
Special Handling:
- Uses MSG91's OTP API template for Indian numbers
- Supports
isOTPflag for instant delivery - Supports
contextandflowAuthfor transactional SMS
Provider Routing
{
"sns": send_sms_sns, # AWS SNS (International)
"msg91": send_sms_msg91, # India
"twilio_sms": send_twilio_sms, # International
"msegat": send_sms_msegat, # Saudi Arabia
"textlocal": send_sms_textlocal, # UK/India
"clicksend": send_sms_clicksend, # Australia
# ... 50+ more providers
}Country Code Handling
Non-Indian numbers automatically switch to AWS SNS:
if str(country_code) != "91":
if obj.get("config", {}).get("provider") == "msg91":
obj.get("config", {}).update({"provider": "sns"})Provider Whitelist
Providers can be restricted by country code:
provider_mapping = json.loads(conn.hget(cache_key, provider_id))
if country_code not in provider_mapping:
return "countryCode is not whitelisted for provider"Email Tasks (tasks/_email_v2.py)
Overview
Emails are sent via AWS SES (Simple Email Service) with support for templates, attachments, and tracking.
Functions
send_plain_email(payload)
Send HTML emails without attachments.
Payload Structure:
{
"payload": {
"to": ["recipient@example.com"],
"cc": [],
"bcc": [],
"subject": "Email Subject",
"message": "<html>Email body</html>",
"sender_name": "CrelioHealth",
"config": {
"config_fields": [
{"name": "sender_id_name", "value": "Sender Name"},
{"name": "sender_id_email", "value": "sender@domain.com"},
{"name": "reply_to", "value": "reply@domain.com"}
]
},
"callback_url": "https://..."
}
}send_template_attachment_email(payload)
Send emails with attachments and/or templates.
Additional Fields:
{
"files": [
"https://s3.amazonaws.com/.../file1.pdf",
"https://s3.amazonaws.com/.../file2.xlsx"
],
"template": "<html>Template HTML</html>",
"is_template": 1
}Email Validation
Emails are validated and cleaned before sending:
EMAIL_REGEX = r"^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$"
BLACKLIST_EMAILS = [
'lifecare1@gmail.com',
'na@na.com',
# System and test emails
]Attachment Handling
- Files are downloaded from S3
- Attached using MIME multipart
- Deleted after sending (both locally and from S3)
SES Configuration
email_client = boto3.client('ses',
region_name='us-east-1',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY
)
# Uses configuration set for tracking
ConfigurationSetName='lh_email_tracker'Error States
| Error | Description |
|---|---|
Message Rejected by SES | SES rejected the email |
Mail From domain is not verified | Domain not verified in SES |
Configuration set sending paused | Config set is paused |
Account sending paused exception | SES account is paused |
Server Error while sending Email | Generic error |
Published | Email sent successfully |
Push Notification Tasks (tasks/push_notification.py)
Overview
Push notifications are sent to Android via FCM (Firebase Cloud Messaging) and iOS via APNs (Apple Push Notification service).
Function
sendNotification(payload)
Payload Structure:
{
"payload": {
"android": {
"collapse_key": "notification_type",
"api_key": "FCM_SERVER_KEY",
"registration_ids": ["device_token_1", "device_token_2"],
"data": {
"message": "Notification message",
"category": "Category",
"custom_data": {}
}
},
"ios": {
"category": "notification_type",
"certificate": "certificate.pem",
"tokens": ["device_token_1"],
"aps": {
"alert": "Notification message",
"badge": 1,
"sound": "default"
}
}
}
}Webhook Tasks (tasks/webhooks.py)
Overview
Webhook tasks make HTTP requests to external endpoints with support for all HTTP methods.
Functions
GET(meta)
def GET(meta):
payload = meta['payload']
headers = payload.get('headers', {})
timeout = payload.get('timeout', 15)
r = requests.get(
payload['url'],
params=payload['data'],
timeout=timeout,
headers=headers
)POST(meta)
Supports both query params and body:
if int(payload['isBody']):
r = requests.post(url, data=json.dumps(payload['args']), headers=headers)
else:
r = requests.post(url, data=payload['args'], headers=headers)PUT(meta) and DELETE(meta)
Similar structure to GET/POST.
Payload Structure
{
"payload": {
"url": "https://api.example.com/endpoint",
"data": {"param": "value"},
"args": {"body_key": "body_value"},
"isBody": 1,
"timeout": 15,
"callback": 1,
"callback_url": "https://callback.url",
"headers": {
"Authorization": "Bearer token",
"Content-Type": "application/json"
},
"body": {
"callback_headers": {},
"callback_payload": {}
}
}
}Integration Payload
For integration tracking, include:
{
"args": {
"integration_payload": {
"integration_callback_url": "https://...",
"metadata": {}
}
}
}Error Handling
5xx errors trigger a retry:
if r.status_code in range(499, 599):
raise ReadTimeout(message)Callback Response
When callback=1, the response is sent to callback_url:
requests.post(
url=payload['callback_url'],
data={
'resp': r.text,
'status_code': r.status_code
}
)WhatsApp Tasks (tasks/whatsapp.py)
Overview
WhatsApp messages are sent through various WhatsApp Business API providers.
Functions
sendWhatsapp(payload)
Send text-only WhatsApp messages.
sendWhatsapp_withFile(payload)
Send WhatsApp messages with media attachments.
Payload Structure:
{
"payload": {
"contact": 9876543210,
"countryCode": 91,
"message": "Hello!",
"template_name": "template_id",
"template_params": ["param1", "param2"],
"media_url": "https://...",
"config": {
"provider": "provider_name"
}
}
}Maintenance Tasks (tasks/maintenance.py)
Function
run_maintenance_on_failed_queue()
Scheduled to run every 5 minutes, this task cleans up the failed queue.
# Scheduled via cron
add(scheduler, {}, '*/5 * * * *',
'tasks.maintenance.run_maintenance_on_failed_queue',
id='redis_maintenance_on_failed_queue')Adding a New SMS Provider
To add a new SMS provider:
- Create the provider function in
tasks/plugin_sms.py:
def send_sms_newprovider(obj):
contact = obj.get('contact')
message = obj.get('message')
country_code = obj.get('countryCode')
# API call to provider
response = requests.post(
'https://api.newprovider.com/send',
json={
'to': f'+{country_code}{contact}',
'message': message,
'api_key': settings.NEW_PROVIDER_API_KEY
}
)
return response.json()- Add to provider mapping in
tasks/sms.py:
return {
# ... existing providers
"newprovider": send_sms_newprovider,
}[sms_provider["provider"]](obj)- Import the function:
from tasks.plugin_sms import (
# ... existing imports
send_sms_newprovider,
)Logging
All tasks log to Elasticsearch via the Log class:
from wrappers.Logger import Log
Log = Log()
# Log events
Log.execution_start(job_id, payload, task_type, task, log_tag)
Log.execution_success(job_id, payload, task_type, task, result, log_tag)
Log.execution_failed(job_id, payload, task_type, task, result, message, log_tag)
Log.execution_retry(job_id, payload, task_type, task, result, message, log_tag)
Log.execution_warnings(job_id, task_type, task, payload, message, log_tag)Log Structure
{
"job_id": "uuid",
"task": "SMS",
"task_type": "OTP",
"status": "completed",
"host": "worker-hostname",
"log_time": "2024-01-15T10:30:00",
"message": "Execution Successful",
"job_result": "Response from provider",
"tags": "SMS_OTP"
}