Data Flow and Lifecycle

Request lifecycle, data flow patterns, and integration event flows in crelio-app

Data Flow and Lifecycle

This document describes how data flows through the crelio-app system, from API request to database persistence and external system synchronization.

Request Lifecycle

Standard API Request Flow

Concrete Example: Patient Registration

Evidence: registration.py


Transaction Boundaries

Transaction.atomic Usage

Transactions are applied at the view level, not model level:

# patient/views/registration.py
class IdProofsView(View):
    
    @transaction.atomic
    def post(self, request, lab_user_id=None, ...):
        patient = UserDetails.objects.get(labId_id=lab_id, labUserId=lab_user_id)
        patient.save_id_proofs(payload.get("idProofs", []))
        return JsonResponse({"status": "Success"})

Files Using transaction.atomic

Based on code analysis, transaction.atomic is used in 50+ locations:

DomainFilesPattern
Patientregistration.py, attachments.py, allowed_tests.pyCRUD operations
Financebill_update.py, insurance_group_views.py, claims_management.pyBill/claim updates
Reportamend_report.py, sample_rerun.py, smart_report.pyReport modifications
Admindomain.py, sub_domains.py, outsource.pyConfiguration updates

Transaction Pattern

# Standard pattern
@transaction.atomic
def post(self, request, *args, **kwargs):
    # All database operations here are atomic
    instance = Model.objects.get(pk=id)
    instance.set_values(payload)
    instance.save()
    RelatedModel.objects.filter(...).update(...)
    return JsonResponse({...})

[!WARNING] Side Effect Ordering: after_save() runs INSIDE the transaction. If ES sync fails, the transaction is NOT rolled back. Consider queueing side effects to Fusion for critical operations.


Validation Flow

Model-Level Validation

Validation is split across multiple specific methods:

# patient/models/user_details.py
class UserDetails(BaseModel):
    
    def validate(self, *args, **kwargs):
        """Master validation method"""
        errors = []
        errors.extend(self.validate_age())
        errors.extend(self.validate_contact_info())
        errors.extend(self.validate_national_id())
        
        if kwargs.get("payload"):
            self.validate_insurance(kwargs["payload"].get("insuranceList", []))
        
        if errors:
            raise ValidationError(errors)
    
    def validate_patient_action(self, *args, **kwargs):
        """Permission validation"""
        session = kwargs.get("session", {})
        if self.is_new and not session.get("userAddNewPatientFlag"):
            raise ValidationError("No permission to create patient")
    
    def validate_strict_check(self, *args, **kwargs):
        """Field-level access control"""
        # Check if user can modify sensitive fields
        ...

Validation Categories

CategoryMethodsPurpose
Inputvalidate_age(), validate_contact_info()Data format/content
Permissionvalidate_patient_action()User access control
Business Rulevalidate_strict_check()Domain constraints
Cross-Entityvalidate_insurance()Related data validation

Data Access Patterns

QuerySet Optimization

The codebase uses select_related and prefetch_related extensively:

# finance/models/billing.py
@classmethod
def get_bills(cls, lab_id, from_date, to_date, ...):
    qs = cls.objects.filter(
        labId_id=lab_id,
        registrationDate__range=(from_date, to_date)
    ).select_related(
        "userDetailsId",
        "orgId",
        "branch"
    ).prefetch_related(
        "billinginfo_set",
        "billingicd_set"
    )
    return qs

Common N+1 Risk Zones

[!CAUTION] These patterns may cause N+1 queries if not careful:

LocationRiskMitigation
Report list viewsLoading report formats per reportUse prefetch_related("reportformat_set")
Bill serializationLoading user detailsUse select_related("userDetailsId")
Device resultsLoading lab report relationsBatch queries in method

Where QuerySets Live

PatternLocationExample
Model methodsModel.get_*() classmethodsBilling.get_bills()
Proxy modelsProxy.get_*()PatientReport.get_reports()
View inlineSimple filtersModel.objects.filter(lab_id=id)
ManagersComplex reusable queriesPatientOverviewManager

Async Job Architecture

Fusion Worker Integration

External system calls are queued to the Fusion worker:

Communication Flow

# communication/base.py
class CommunicationBase:
    
    def send(self, lab_id=None, is_report=0, ...):
        """Queue communications to Fusion"""
        self.validate_triggers()
        self.prepare_user_meta()
        self.prepare_lab_meta()
        
        job_ids = []
        for trigger in enabled_triggers:
            # Prepare payload
            sms_meta = self.prepare_sms(trigger)
            whatsapp_meta = self.prepare_whatsapp(trigger)
            email_meta = self.prepare_email(trigger)
            
            # Queue to Fusion
            job_id = FusionClient().queue_communication(
                sms=sms_meta,
                whatsapp=whatsapp_meta,
                email=email_meta
            )
            job_ids.append(job_id)
        
        return job_ids

Job Types

Job TypeTriggerHandler
SMSshould_trigger_smsFusion → SMS providers
WhatsAppshould_trigger_whatsappFusion → Twilio/Pinnacle/Interakt
Emailshould_trigger_emailFusion → SMTP/SendGrid
Webhookswebhook_enabled on modelFusion → HTTP POST

Integration Event Flow

Outbound Integrations

Inbound Integrations (Webhooks)

Integration Retry Mechanism

# integration/models/integration_directory.py
class IntegrationDirectory(DocumentDBModelBase):
    
    auto_retry_error_messages = (
        "No response",
        "Connection Timeout",
        ...
    )
    
    @classmethod
    def get_valid_auto_retry_logs(cls, lab_id, valid_integrations, ...):
        """Find failed logs eligible for retry"""
        ...
    
    @classmethod
    def update_retry_details_return_count(cls, lab_id, log_id, record, ...):
        """Update retry count and details"""
        ...

Elasticsearch Sync

Write Path

# patient/models/user_details.py
def update_es_record(self, whatsapp_consent=0):
    """Sync patient to ES"""
    es_client = get_client("es")
    
    payload = {
        "labId": self.labId_id,
        "userDetailsId": self.id,
        "fullName": self.fullName,
        # ... more fields
    }
    
    if self.is_new_instance:
        es_client.create(index="patients", doc=payload)
    else:
        es_client.update(index="patients", doc_id=self.id, doc=payload)

ES Indexes Used

IndexPurposeModel
patientsPatient searchUserDetails
activity_logsAudit trailActivityLog
lab_reportsReport searchLabReportRelation
lab_usersStaff searchLabUser

Cache Patterns

Redis Cache Usage

# core/cache.py
class CustomRedisClusterCache(RedisCache):
    """Custom cache with cluster support"""
    
    def get(self, key, default=None):
        ...
    
    def hget(self, name, key):
        """Hash get for model instance caching"""
        ...
    
    def hset(self, name, key, value):
        """Hash set for model instance caching"""
        ...

Model Instance Caching

# core/models/base.py
class BaseModel:
    allow_individual_instance_caching = False
    cache_key = "Model_CentreId{lab_id}_List"
    
    @classmethod
    def get_cached_instance(cls, instance_id, serializer=None, ...):
        """Fetch from cache or database"""
        instance = cache.hget(cls.cache_key, instance_id)
        if not instance:
            instance = cls.get(pk=instance_id)
            cache.hset(cls.cache_key, instance_id, json.dumps(instance))
        return instance
    
    @classmethod
    def reset_cache_instance(cls, instance_id):
        """Invalidate cache on update"""
        cache.hdel(cls.cache_key, instance_id)

Cache Invalidation Points

EventActionLocation
Model saveInvalidate instanceafter_save() if caching enabled
Bulk updateManual invalidationView/service layer
Settings changeFull cache clearAdmin action

Activity Log Flow

Logging to Elasticsearch

# core/models/activity_log_base.py
class ActivityLogBase:
    
    def add_activity_log(self, action, message="", session={}, ...):
        """Log activity to ES"""
        payload = {
            "activity_text": message.format(**self.__dict__),
            "lab_id": session.get("labId"),
            "lab_user_id": session.get("loginUser"),
            "log_category_id": self.category_id_mapper.get(action),
            **self.prepare_activity_log_payload(action, session)
        }
        ActivityLog(**payload).save()  # → ES

Activity Categories

# report/models/reflex_test_config.py
class ReflexTestConfiguration(BaseModel):
    category_id_mapper = {
        "created": 628,
        "updated": 629,
        "triggered": 630,
        "deleted": 634,
        "enabled": 644,
        "disabled": 645,
    }

Device Integration Flow

HL7/ASTM Result Processing

Key Methods

# interfacing/models/device_results_validation.py
class DeviceResultsValidation:
    
    @classmethod
    def save_device_results(cls, payload):
        """Store incoming device results for validation"""
        ...
    
    @classmethod
    def release_parameters(cls, lab_details, device_id, parameters):
        """Release validated results to reports"""
        ...
    
    @classmethod
    def post_process_report(cls, report_detail, logs, lab_details):
        """Apply post-processing rules (auto-sign, etc.)"""
        ...

File Evidence

Core Flow Components

Transaction Usage

Async/Integration

On this page