Using Celery for Async Registration Batch Processing
Symptom Profile & Operational Impact Link to this section
During high-velocity ticket drops, corporate bulk uploads, or multi-track conference check-ins, registration platforms routinely exhibit a 30–120 second payment sync gap. Operators observe attendees marked as REGISTERED in the CRM while badge printers queue PAYMENT_PENDING jobs. Concurrently, webhook payloads arrive out of sequence, synchronous form API polling exhausts connection pools, and memory bottlenecks trigger OOM kills on application servers. The operational fallout is predictable: duplicate badge prints, orphaned payment records, and manual reconciliation queues that scale linearly with attendee volume.
Root Cause & Architectural Shift Link to this section
The underlying failure stems from coupling form ingestion, payment verification, and badge template rendering within a single synchronous request thread. When a registration payload arrives, the system blocks on external gateway latency, holds database row locks during schema validation, and retries webhook verification without idempotency guarantees. This tight coupling creates cascading backpressure. The resolution requires shifting to a decoupled event-driven architecture where Registration Ingestion & Payment Reconciliation operates as independent, retry-safe task boundaries. Celery provides the necessary task queue semantics, but production reliability demands explicit chunking, deterministic idempotency keys, and compensating transaction patterns rather than naive fire-and-forget dispatch.
Implementation Guide Link to this section
Step 1: Idempotent Task Routing & Chunking Link to this section
Batch registration payloads must be segmented to prevent broker memory exhaustion and guarantee partial failure isolation. Monolithic payloads monopolize worker threads and trigger ACK timeouts. Each chunk carries a deterministic batch_id and idempotency_key derived from the source form submission hash.
import os
import uuid
import hashlib
import logging
from typing import List, Dict, Any
from celery import Celery, group, chain
from pydantic import BaseModel, ValidationError
logger = logging.getLogger(__name__)
app = Celery(
"event_reg",
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "postgresql+psycopg2://user:pass@db/events"),
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1,
task_default_retry_delay=15,
task_max_retries=5,
task_time_limit=300,
task_soft_time_limit=240,
)
class RegistrationChunk(BaseModel):
batch_id: str
idempotency_key: str
records: List[Dict[str, Any]]
def generate_idempotency_key(payload: Dict[str, Any]) -> str:
raw = f"{payload.get('email', '')}-{payload.get('ticket_type', '')}-{payload.get('timestamp', '')}"
return hashlib.sha256(raw.encode()).hexdigest()
def chunk_payload(records: List[Dict[str, Any]], chunk_size: int = 50) -> List[RegistrationChunk]:
batch_id = str(uuid.uuid4())
chunks = []
for i in range(0, len(records), chunk_size):
subset = records[i : i + chunk_size]
key = generate_idempotency_key(subset[0])
chunks.append(RegistrationChunk(batch_id=batch_id, idempotency_key=key, records=subset))
return chunks
@app.task(bind=True, name="tasks.process_registration_chunk", max_retries=5)
def process_registration_chunk(self, chunk: RegistrationChunk) -> Dict[str, Any]:
try:
# Idempotency guard: skip if already processed
# Pseudocode for DB check: if exists(batch_id, idempotency_key): return {"status": "skipped"}
logger.info(f"Processing chunk {chunk.batch_id} | {len(chunk.records)} records")
# Execute [Async Batch Processing](/registration-ingestion-payment-reconciliation/async-batch-processing/) pipeline
for record in chunk.records:
validate_and_dispatch(record)
return {"batch_id": chunk.batch_id, "status": "completed", "processed": len(chunk.records)}
except Exception as exc:
logger.error(f"Chunk {chunk.batch_id} failed: {exc}")
self.retry(exc=exc, countdown=self.request.retries * 15)
Step 2: Worker Concurrency & Memory Guardrails Link to this section
OOM kills occur when workers accumulate unbounded ORM sessions, large JSON payloads, or unreleased file descriptors. Enforce strict memory ceilings and connection recycling.
# celery_worker_config.py
app.conf.update(
worker_max_tasks_per_child=100, # Force worker restart to clear memory leaks
worker_max_memory_per_child=512000, # Kill & restart if RSS > 512MB
worker_concurrency=4, # Tune to CPU cores; avoid oversubscription
broker_connection_retry_on_startup=True,
result_expires=3600, # Auto-cleanup stale results
)
Startup Command:
celery -A event_reg worker \
--loglevel=INFO \
--concurrency=4 \
--max-memory-per-child=512000 \
--max-tasks-per-child=100 \
--pool=prefork \
--without-gossip --without-mingle --without-heartbeat
Step 3: Payment Webhook Reconciliation & Retry Semantics Link to this section
Webhooks arrive out-of-order due to network jitter and gateway retries. Implement database-level upserts with explicit state transitions to prevent race conditions.
import psycopg2
from psycopg2.extras import execute_values
from celery.exceptions import Ignore
@app.task(bind=True, name="tasks.reconcile_payment_webhook", max_retries=8)
def reconcile_payment_webhook(self, webhook_payload: Dict[str, Any]) -> str:
txn_id = webhook_payload.get("transaction_id")
status = webhook_payload.get("status")
if not txn_id or not status:
raise Ignore("Malformed webhook payload")
try:
with psycopg2.connect(os.getenv("DB_URL")) as conn:
with conn.cursor() as cur:
# Atomic upsert with state machine guard
cur.execute("""
INSERT INTO payment_ledger (txn_id, status, updated_at, idempotency_hash)
VALUES (%s, %s, NOW(), %s)
ON CONFLICT (txn_id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = NOW()
WHERE payment_ledger.status != 'COMPLETED'
""", (txn_id, status, webhook_payload.get("idempotency_key")))
conn.commit()
if status == "COMPLETED":
trigger_badge_print.delay(txn_id)
return f"Reconciled {txn_id} -> {status}"
except psycopg2.OperationalError as e:
logger.warning(f"DB connection lost, retrying: {e}")
self.retry(exc=e, countdown=2 ** self.request.retries)
Step 4: Schema Validation & Error Categorization Link to this section
Reject malformed payloads early. Categorize failures into RETRYABLE (transient network), FATAL (schema violation), and MANUAL_REVIEW (business rule conflict). Route accordingly.
from enum import Enum
from pydantic import EmailStr, Field, conint
class ErrorCategory(str, Enum):
RETRYABLE = "RETRYABLE"
FATAL = "FATAL"
MANUAL_REVIEW = "MANUAL_REVIEW"
class RegistrationPayload(BaseModel):
email: EmailStr
ticket_type: str = Field(..., min_length=3, max_length=50)
quantity: conint(ge=1, le=10)
payment_method: str
def validate_and_dispatch(record: Dict[str, Any]) -> None:
try:
validated = RegistrationPayload(**record)
# Proceed to payment gateway
except ValidationError as e:
log_error(ErrorCategory.FATAL, record, e)
except Exception as e:
if "timeout" in str(e).lower() or "5xx" in str(e):
log_error(ErrorCategory.RETRYABLE, record, e)
raise
else:
log_error(ErrorCategory.MANUAL_REVIEW, record, e)
Incident Response & Rollback Procedures Link to this section
Fast Incident Resolution Link to this section
- Identify Backpressure:
celery -A event_reg inspect active --json - Purge Stale Tasks:
celery -A event_reg purge(only if tasks are idempotent and safe to drop) - Force Worker Restart:
sudo systemctl restart celery-worker - Clear Redis Broker Memory:
redis-cli MEMORY PURGE && redis-cli FLUSHDB(use with caution; verify no in-flight webhooks) - DB Reconciliation Script:
-- Find orphaned PAYMENT_PENDING records older than 1 hour
SELECT id, email, created_at FROM registrations
WHERE payment_status = 'PAYMENT_PENDING' AND created_at < NOW() - INTERVAL '1 hour'
ORDER BY created_at ASC;
Re-dispatch via: process_registration_chunk.delay(RegistrationChunk(batch_id="manual_fix", idempotency_key="fix", records=[...]))
Rollback to Synchronous Fallback Link to this section
If Celery broker fails or workers crash repeatedly:
- Disable async routing in API gateway:
export REGISTRATION_ASYNC_MODE=false - Route
/registerdirectly to legacy sync handler. - Flush pending Celery tasks:
celery -A event_reg control shutdown - Verify connection pool limits in
pgbouncerorSQLAlchemy:pool_size=10, max_overflow=5 - Monitor
psql -c "SELECT count(*) FROM pg_stat_activity WHERE state = 'active';"to confirm sync load is within capacity.
Post-Incident Validation Link to this section
- Confirm
payment_ledgerrow counts match CRMREGISTEREDcounts. - Verify badge printer queue depth returns to
< 5. - Review
celery -A event_reg inspect statsfor memory leaks or task rejection spikes.