Data Flow and Lifecycle
Request lifecycle, data flow patterns, and integration event flows in crelio-app
Data Flow and Lifecycle
This document describes how data flows through the crelio-app system, from API request to database persistence and external system synchronization.
Request Lifecycle
Standard API Request Flow
Concrete Example: Patient Registration
Evidence: registration.py
Transaction Boundaries
Transaction.atomic Usage
Transactions are applied at the view level, not model level:
# patient/views/registration.py
class IdProofsView(View):
@transaction.atomic
def post(self, request, lab_user_id=None, ...):
patient = UserDetails.objects.get(labId_id=lab_id, labUserId=lab_user_id)
patient.save_id_proofs(payload.get("idProofs", []))
return JsonResponse({"status": "Success"})Files Using transaction.atomic
Based on code analysis, transaction.atomic is used in 50+ locations:
| Domain | Files | Pattern |
|---|---|---|
| Patient | registration.py, attachments.py, allowed_tests.py | CRUD operations |
| Finance | bill_update.py, insurance_group_views.py, claims_management.py | Bill/claim updates |
| Report | amend_report.py, sample_rerun.py, smart_report.py | Report modifications |
| Admin | domain.py, sub_domains.py, outsource.py | Configuration updates |
Transaction Pattern
# Standard pattern
@transaction.atomic
def post(self, request, *args, **kwargs):
# All database operations here are atomic
instance = Model.objects.get(pk=id)
instance.set_values(payload)
instance.save()
RelatedModel.objects.filter(...).update(...)
return JsonResponse({...})[!WARNING] Side Effect Ordering:
after_save()runs INSIDE the transaction. If ES sync fails, the transaction is NOT rolled back. Consider queueing side effects to Fusion for critical operations.
Validation Flow
Model-Level Validation
Validation is split across multiple specific methods:
# patient/models/user_details.py
class UserDetails(BaseModel):
def validate(self, *args, **kwargs):
"""Master validation method"""
errors = []
errors.extend(self.validate_age())
errors.extend(self.validate_contact_info())
errors.extend(self.validate_national_id())
if kwargs.get("payload"):
self.validate_insurance(kwargs["payload"].get("insuranceList", []))
if errors:
raise ValidationError(errors)
def validate_patient_action(self, *args, **kwargs):
"""Permission validation"""
session = kwargs.get("session", {})
if self.is_new and not session.get("userAddNewPatientFlag"):
raise ValidationError("No permission to create patient")
def validate_strict_check(self, *args, **kwargs):
"""Field-level access control"""
# Check if user can modify sensitive fields
...Validation Categories
| Category | Methods | Purpose |
|---|---|---|
| Input | validate_age(), validate_contact_info() | Data format/content |
| Permission | validate_patient_action() | User access control |
| Business Rule | validate_strict_check() | Domain constraints |
| Cross-Entity | validate_insurance() | Related data validation |
Data Access Patterns
QuerySet Optimization
The codebase uses select_related and prefetch_related extensively:
# finance/models/billing.py
@classmethod
def get_bills(cls, lab_id, from_date, to_date, ...):
qs = cls.objects.filter(
labId_id=lab_id,
registrationDate__range=(from_date, to_date)
).select_related(
"userDetailsId",
"orgId",
"branch"
).prefetch_related(
"billinginfo_set",
"billingicd_set"
)
return qsCommon N+1 Risk Zones
[!CAUTION] These patterns may cause N+1 queries if not careful:
| Location | Risk | Mitigation |
|---|---|---|
| Report list views | Loading report formats per report | Use prefetch_related("reportformat_set") |
| Bill serialization | Loading user details | Use select_related("userDetailsId") |
| Device results | Loading lab report relations | Batch queries in method |
Where QuerySets Live
| Pattern | Location | Example |
|---|---|---|
| Model methods | Model.get_*() classmethods | Billing.get_bills() |
| Proxy models | Proxy.get_*() | PatientReport.get_reports() |
| View inline | Simple filters | Model.objects.filter(lab_id=id) |
| Managers | Complex reusable queries | PatientOverviewManager |
Async Job Architecture
Fusion Worker Integration
External system calls are queued to the Fusion worker:
Communication Flow
# communication/base.py
class CommunicationBase:
def send(self, lab_id=None, is_report=0, ...):
"""Queue communications to Fusion"""
self.validate_triggers()
self.prepare_user_meta()
self.prepare_lab_meta()
job_ids = []
for trigger in enabled_triggers:
# Prepare payload
sms_meta = self.prepare_sms(trigger)
whatsapp_meta = self.prepare_whatsapp(trigger)
email_meta = self.prepare_email(trigger)
# Queue to Fusion
job_id = FusionClient().queue_communication(
sms=sms_meta,
whatsapp=whatsapp_meta,
email=email_meta
)
job_ids.append(job_id)
return job_idsJob Types
| Job Type | Trigger | Handler |
|---|---|---|
| SMS | should_trigger_sms | Fusion → SMS providers |
should_trigger_whatsapp | Fusion → Twilio/Pinnacle/Interakt | |
should_trigger_email | Fusion → SMTP/SendGrid | |
| Webhooks | webhook_enabled on model | Fusion → HTTP POST |
Integration Event Flow
Outbound Integrations
Inbound Integrations (Webhooks)
Integration Retry Mechanism
# integration/models/integration_directory.py
class IntegrationDirectory(DocumentDBModelBase):
auto_retry_error_messages = (
"No response",
"Connection Timeout",
...
)
@classmethod
def get_valid_auto_retry_logs(cls, lab_id, valid_integrations, ...):
"""Find failed logs eligible for retry"""
...
@classmethod
def update_retry_details_return_count(cls, lab_id, log_id, record, ...):
"""Update retry count and details"""
...Elasticsearch Sync
Write Path
# patient/models/user_details.py
def update_es_record(self, whatsapp_consent=0):
"""Sync patient to ES"""
es_client = get_client("es")
payload = {
"labId": self.labId_id,
"userDetailsId": self.id,
"fullName": self.fullName,
# ... more fields
}
if self.is_new_instance:
es_client.create(index="patients", doc=payload)
else:
es_client.update(index="patients", doc_id=self.id, doc=payload)ES Indexes Used
| Index | Purpose | Model |
|---|---|---|
patients | Patient search | UserDetails |
activity_logs | Audit trail | ActivityLog |
lab_reports | Report search | LabReportRelation |
lab_users | Staff search | LabUser |
Cache Patterns
Redis Cache Usage
# core/cache.py
class CustomRedisClusterCache(RedisCache):
"""Custom cache with cluster support"""
def get(self, key, default=None):
...
def hget(self, name, key):
"""Hash get for model instance caching"""
...
def hset(self, name, key, value):
"""Hash set for model instance caching"""
...Model Instance Caching
# core/models/base.py
class BaseModel:
allow_individual_instance_caching = False
cache_key = "Model_CentreId{lab_id}_List"
@classmethod
def get_cached_instance(cls, instance_id, serializer=None, ...):
"""Fetch from cache or database"""
instance = cache.hget(cls.cache_key, instance_id)
if not instance:
instance = cls.get(pk=instance_id)
cache.hset(cls.cache_key, instance_id, json.dumps(instance))
return instance
@classmethod
def reset_cache_instance(cls, instance_id):
"""Invalidate cache on update"""
cache.hdel(cls.cache_key, instance_id)Cache Invalidation Points
| Event | Action | Location |
|---|---|---|
| Model save | Invalidate instance | after_save() if caching enabled |
| Bulk update | Manual invalidation | View/service layer |
| Settings change | Full cache clear | Admin action |
Activity Log Flow
Logging to Elasticsearch
# core/models/activity_log_base.py
class ActivityLogBase:
def add_activity_log(self, action, message="", session={}, ...):
"""Log activity to ES"""
payload = {
"activity_text": message.format(**self.__dict__),
"lab_id": session.get("labId"),
"lab_user_id": session.get("loginUser"),
"log_category_id": self.category_id_mapper.get(action),
**self.prepare_activity_log_payload(action, session)
}
ActivityLog(**payload).save() # → ESActivity Categories
# report/models/reflex_test_config.py
class ReflexTestConfiguration(BaseModel):
category_id_mapper = {
"created": 628,
"updated": 629,
"triggered": 630,
"deleted": 634,
"enabled": 644,
"disabled": 645,
}Device Integration Flow
HL7/ASTM Result Processing
Key Methods
# interfacing/models/device_results_validation.py
class DeviceResultsValidation:
@classmethod
def save_device_results(cls, payload):
"""Store incoming device results for validation"""
...
@classmethod
def release_parameters(cls, lab_details, device_id, parameters):
"""Release validated results to reports"""
...
@classmethod
def post_process_report(cls, report_detail, logs, lab_details):
"""Apply post-processing rules (auto-sign, etc.)"""
...File Evidence
Core Flow Components
- base.py - Lifecycle hooks
- view.py - GenericView base
- cache.py - Redis cache
- clients.py - Service clients
Transaction Usage
Async/Integration
- base.py - CommunicationBase
- client.py - Fusion client
- integration_directory.py
- device_results_validation.py