Skip to content

Workflows & Activities

ContextWorker uses Temporal.io for durable workflow execution. Workflows survive process restarts, network failures, and server crashes.

Dual-Mode Operation

Unlike stateless HTTP requests, background tasks (like downloading 100,000 product catalogs) can take hours and are subject to network drops or pod evictions. ContextWorker uses HashiCorp/Temporal to guarantee that if a worker dies, the workflow will resume exactly where it left off on another machine.

Worker runs in two distinct processes:

1. gRPC Service (default)

Terminal window
python -m contextunity.worker

Starts the gRPC WorkerService for receiving workflow triggers from other services.

2. Temporal Worker

Terminal window
# All discovered modules
python -m contextunity.worker --temporal
# Specific modules only
python -m contextunity.worker --temporal --modules harvest gardener

Starts Temporal workers that execute registered workflows and activities.

gRPC Service

service WorkerService {
rpc StartWorkflow(ContextUnit) returns (ContextUnit);
rpc GetTaskStatus(ContextUnit) returns (ContextUnit);
rpc ExecuteCode(ContextUnit) returns (ContextUnit); // Planned
}

StartWorkflow

Routes to appropriate Temporal workflow by workflow_type:

from contextunity.core import ContextUnit
unit = ContextUnit(
payload={
"workflow_type": "harvest",
"supplier_code": "camping-trade",
"tenant_id": "my_project",
},
provenance=["commerce:trigger"],
)
response = stub.StartWorkflow(unit.to_protobuf(worker_pb2))
workflow_id = response.payload["workflow_id"]

GetTaskStatus

Query workflow execution status by workflow_id.

Module Registry

To keep the Worker codebase decoupled and clean, it uses a modular registry. You don’t dump all your code into a massive tasks.py file. Instead, domain-specific plugins (like commerce or medical) register their temporal workflows and activities dynamically at boot.

from contextunity.worker.core.registry import WorkerRegistry, ModuleConfig
# Commerce module registers its workflows
registry = WorkerRegistry()
registry.register(ModuleConfig(
name="harvest",
workflows=[HarvestWorkflow],
activities=[fetch_products, transform_data, sync_to_brain],
))

Modules are discovered at startup and can be filtered via --modules flag.

Workflow Pattern

worker-workflows