ServicesFusion Worker

Task Implementations

Detailed documentation of all task handlers in Fusion Worker

👤 Sai Tharun

Task Implementations

This document details all the task handlers implemented in Fusion Worker, including their payloads, providers, and behavior.


SMS Tasks (tasks/sms.py)

Overview

SMS tasks are routed to different providers based on configuration. The system supports 50+ SMS providers across different regions.

Functions

sendSMS(payload)

Main SMS sending function that routes to the appropriate provider.

Payload Structure:

{
  "payload": {
    "contact": 9876543210,
    "message": "Your message here",
    "countryCode": 91,
    "senderId": "CRELIO",
    "config": {
      "provider": "msg91"
    },
    "provider_id": "provider-uuid",
    "credit_deducted": 1,
    "callback_url": "https://..."
  }
}

sendOTPSMS(payload)

Specialized function for OTP delivery with fast channel support.

Special Handling:

  • Uses MSG91's OTP API template for Indian numbers
  • Supports isOTP flag for instant delivery
  • Supports context and flowAuth for transactional SMS

Provider Routing

{
    "sns": send_sms_sns,              # AWS SNS (International)
    "msg91": send_sms_msg91,          # India
    "twilio_sms": send_twilio_sms,    # International
    "msegat": send_sms_msegat,        # Saudi Arabia
    "textlocal": send_sms_textlocal,  # UK/India
    "clicksend": send_sms_clicksend,  # Australia
    # ... 50+ more providers
}

Country Code Handling

Non-Indian numbers automatically switch to AWS SNS:

if str(country_code) != "91":
    if obj.get("config", {}).get("provider") == "msg91":
        obj.get("config", {}).update({"provider": "sns"})

Provider Whitelist

Providers can be restricted by country code:

provider_mapping = json.loads(conn.hget(cache_key, provider_id))
if country_code not in provider_mapping:
    return "countryCode is not whitelisted for provider"

Email Tasks (tasks/_email_v2.py)

Overview

Emails are sent via AWS SES (Simple Email Service) with support for templates, attachments, and tracking.

Functions

send_plain_email(payload)

Send HTML emails without attachments.

Payload Structure:

{
  "payload": {
    "to": ["recipient@example.com"],
    "cc": [],
    "bcc": [],
    "subject": "Email Subject",
    "message": "<html>Email body</html>",
    "sender_name": "CrelioHealth",
    "config": {
      "config_fields": [
        {"name": "sender_id_name", "value": "Sender Name"},
        {"name": "sender_id_email", "value": "sender@domain.com"},
        {"name": "reply_to", "value": "reply@domain.com"}
      ]
    },
    "callback_url": "https://..."
  }
}

send_template_attachment_email(payload)

Send emails with attachments and/or templates.

Additional Fields:

{
  "files": [
    "https://s3.amazonaws.com/.../file1.pdf",
    "https://s3.amazonaws.com/.../file2.xlsx"
  ],
  "template": "<html>Template HTML</html>",
  "is_template": 1
}

Email Validation

Emails are validated and cleaned before sending:

EMAIL_REGEX = r"^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$"

BLACKLIST_EMAILS = [
    'lifecare1@gmail.com',
    'na@na.com',
    # System and test emails
]

Attachment Handling

  1. Files are downloaded from S3
  2. Attached using MIME multipart
  3. Deleted after sending (both locally and from S3)

SES Configuration

email_client = boto3.client('ses', 
    region_name='us-east-1',
    aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
    aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY
)

# Uses configuration set for tracking
ConfigurationSetName='lh_email_tracker'

Error States

ErrorDescription
Message Rejected by SESSES rejected the email
Mail From domain is not verifiedDomain not verified in SES
Configuration set sending pausedConfig set is paused
Account sending paused exceptionSES account is paused
Server Error while sending EmailGeneric error
PublishedEmail sent successfully

Push Notification Tasks (tasks/push_notification.py)

Overview

Push notifications are sent to Android via FCM (Firebase Cloud Messaging) and iOS via APNs (Apple Push Notification service).

Function

sendNotification(payload)

Payload Structure:

{
  "payload": {
    "android": {
      "collapse_key": "notification_type",
      "api_key": "FCM_SERVER_KEY",
      "registration_ids": ["device_token_1", "device_token_2"],
      "data": {
        "message": "Notification message",
        "category": "Category",
        "custom_data": {}
      }
    },
    "ios": {
      "category": "notification_type",
      "certificate": "certificate.pem",
      "tokens": ["device_token_1"],
      "aps": {
        "alert": "Notification message",
        "badge": 1,
        "sound": "default"
      }
    }
  }
}

Webhook Tasks (tasks/webhooks.py)

Overview

Webhook tasks make HTTP requests to external endpoints with support for all HTTP methods.

Functions

GET(meta)

def GET(meta):
    payload = meta['payload']
    headers = payload.get('headers', {})
    timeout = payload.get('timeout', 15)
    
    r = requests.get(
        payload['url'], 
        params=payload['data'], 
        timeout=timeout, 
        headers=headers
    )

POST(meta)

Supports both query params and body:

if int(payload['isBody']):
    r = requests.post(url, data=json.dumps(payload['args']), headers=headers)
else:
    r = requests.post(url, data=payload['args'], headers=headers)

PUT(meta) and DELETE(meta)

Similar structure to GET/POST.

Payload Structure

{
  "payload": {
    "url": "https://api.example.com/endpoint",
    "data": {"param": "value"},
    "args": {"body_key": "body_value"},
    "isBody": 1,
    "timeout": 15,
    "callback": 1,
    "callback_url": "https://callback.url",
    "headers": {
      "Authorization": "Bearer token",
      "Content-Type": "application/json"
    },
    "body": {
      "callback_headers": {},
      "callback_payload": {}
    }
  }
}

Integration Payload

For integration tracking, include:

{
  "args": {
    "integration_payload": {
      "integration_callback_url": "https://...",
      "metadata": {}
    }
  }
}

Error Handling

5xx errors trigger a retry:

if r.status_code in range(499, 599):
    raise ReadTimeout(message)

Callback Response

When callback=1, the response is sent to callback_url:

requests.post(
    url=payload['callback_url'], 
    data={
        'resp': r.text, 
        'status_code': r.status_code
    }
)

WhatsApp Tasks (tasks/whatsapp.py)

Overview

WhatsApp messages are sent through various WhatsApp Business API providers.

Functions

sendWhatsapp(payload)

Send text-only WhatsApp messages.

sendWhatsapp_withFile(payload)

Send WhatsApp messages with media attachments.

Payload Structure:

{
  "payload": {
    "contact": 9876543210,
    "countryCode": 91,
    "message": "Hello!",
    "template_name": "template_id",
    "template_params": ["param1", "param2"],
    "media_url": "https://...",
    "config": {
      "provider": "provider_name"
    }
  }
}

Maintenance Tasks (tasks/maintenance.py)

Function

run_maintenance_on_failed_queue()

Scheduled to run every 5 minutes, this task cleans up the failed queue.

# Scheduled via cron
add(scheduler, {}, '*/5 * * * *', 
    'tasks.maintenance.run_maintenance_on_failed_queue',
    id='redis_maintenance_on_failed_queue')

Adding a New SMS Provider

To add a new SMS provider:

  1. Create the provider function in tasks/plugin_sms.py:
def send_sms_newprovider(obj):
    contact = obj.get('contact')
    message = obj.get('message')
    country_code = obj.get('countryCode')
    
    # API call to provider
    response = requests.post(
        'https://api.newprovider.com/send',
        json={
            'to': f'+{country_code}{contact}',
            'message': message,
            'api_key': settings.NEW_PROVIDER_API_KEY
        }
    )
    
    return response.json()
  1. Add to provider mapping in tasks/sms.py:
return {
    # ... existing providers
    "newprovider": send_sms_newprovider,
}[sms_provider["provider"]](obj)
  1. Import the function:
from tasks.plugin_sms import (
    # ... existing imports
    send_sms_newprovider,
)

Logging

All tasks log to Elasticsearch via the Log class:

from wrappers.Logger import Log
Log = Log()

# Log events
Log.execution_start(job_id, payload, task_type, task, log_tag)
Log.execution_success(job_id, payload, task_type, task, result, log_tag)
Log.execution_failed(job_id, payload, task_type, task, result, message, log_tag)
Log.execution_retry(job_id, payload, task_type, task, result, message, log_tag)
Log.execution_warnings(job_id, task_type, task, payload, message, log_tag)

Log Structure

{
  "job_id": "uuid",
  "task": "SMS",
  "task_type": "OTP",
  "status": "completed",
  "host": "worker-hostname",
  "log_time": "2024-01-15T10:30:00",
  "message": "Execution Successful",
  "job_result": "Response from provider",
  "tags": "SMS_OTP"
}

On this page