Workflows & Activities
ContextWorker uses Temporal.io for durable workflow execution. Workflows survive process restarts, network failures, and server crashes.
Dual-Mode Operation
Unlike stateless HTTP requests, background tasks (like downloading 100,000 product catalogs) can take hours and are subject to network drops or pod evictions. ContextWorker uses HashiCorp/Temporal to guarantee that if a worker dies, the workflow will resume exactly where it left off on another machine.
Worker runs in two distinct processes:
1. gRPC Service (default)
python -m contextunity.workerStarts the gRPC WorkerService for receiving workflow triggers from other services.
2. Temporal Worker
# All discovered modulespython -m contextunity.worker --temporal
# Specific modules onlypython -m contextunity.worker --temporal --modules harvest gardenerStarts Temporal workers that execute registered workflows and activities.
gRPC Service
service WorkerService { rpc StartWorkflow(ContextUnit) returns (ContextUnit); rpc GetTaskStatus(ContextUnit) returns (ContextUnit); rpc ExecuteCode(ContextUnit) returns (ContextUnit); // Planned}StartWorkflow
Routes to appropriate Temporal workflow by workflow_type:
from contextunity.core import ContextUnit
unit = ContextUnit( payload={ "workflow_type": "harvest", "supplier_code": "camping-trade", "tenant_id": "my_project", }, provenance=["commerce:trigger"],)
response = stub.StartWorkflow(unit.to_protobuf(worker_pb2))workflow_id = response.payload["workflow_id"]GetTaskStatus
Query workflow execution status by workflow_id.
Module Registry
To keep the Worker codebase decoupled and clean, it uses a modular registry. You don’t dump all your code into a massive tasks.py file. Instead, domain-specific plugins (like commerce or medical) register their temporal workflows and activities dynamically at boot.
from contextunity.worker.core.registry import WorkerRegistry, ModuleConfig
# Commerce module registers its workflowsregistry = WorkerRegistry()registry.register(ModuleConfig( name="harvest", workflows=[HarvestWorkflow], activities=[fetch_products, transform_data, sync_to_brain],))Modules are discovered at startup and can be filtered via --modules flag.