gRPC Contracts
All ContextUnity services communicate via gRPC using a universal ContextUnit envelope — every RPC accepts and returns contextunity.core.ContextUnit. Domain-specific data flows through the payload dict.
ContextUnit Envelope Pattern
// Every service uses ContextUnit as input and outputrpc Search(contextunity.core.ContextUnit) returns (stream contextunity.core.ContextUnit);rpc Upsert(contextunity.core.ContextUnit) returns (contextunity.core.ContextUnit);This pattern unifies security (token travels with every call), provenance tracking, and error handling across the entire mesh.
Proto Definitions
All proto files live in packages/core/protos/:
brain.proto — 17 RPCs
service BrainService { // Knowledge rpc Search(ContextUnit) returns (stream ContextUnit); rpc GraphSearch(ContextUnit) returns (ContextUnit); rpc CreateKGRelation(ContextUnit) returns (ContextUnit); rpc Upsert(ContextUnit) returns (ContextUnit); // Memory rpc QueryMemory(ContextUnit) returns (stream ContextUnit); rpc AddEpisode(ContextUnit) returns (ContextUnit); rpc GetRecentEpisodes(ContextUnit) returns (stream ContextUnit); rpc UpsertFact(ContextUnit) returns (ContextUnit); rpc GetUserFacts(ContextUnit) returns (stream ContextUnit); // News rpc UpsertNewsItem(ContextUnit) returns (ContextUnit); rpc GetNewsItems(ContextUnit) returns (stream ContextUnit); rpc UpsertNewsPost(ContextUnit) returns (ContextUnit); rpc CheckNewsPostExists(ContextUnit) returns (ContextUnit); // Traces rpc LogTrace(ContextUnit) returns (ContextUnit); rpc GetTraces(ContextUnit) returns (stream ContextUnit); // Taxonomy rpc UpsertTaxonomy(ContextUnit) returns (ContextUnit); rpc GetTaxonomy(ContextUnit) returns (stream ContextUnit);}commerce.proto — 8 RPCs
service CommerceService { rpc GetProduct(ContextUnit) returns (ContextUnit); rpc UpdateProduct(ContextUnit) returns (ContextUnit); rpc GetProducts(ContextUnit) returns (stream ContextUnit); rpc UpsertDealerProduct(ContextUnit) returns (ContextUnit); rpc UpdateEnrichment(ContextUnit) returns (ContextUnit); rpc TriggerHarvest(ContextUnit) returns (ContextUnit); rpc GetPendingVerifications(ContextUnit) returns (stream ContextUnit); rpc SubmitVerification(ContextUnit) returns (ContextUnit);}worker.proto — 3 RPCs
service WorkerService { rpc StartWorkflow(ContextUnit) returns (ContextUnit); rpc GetTaskStatus(ContextUnit) returns (ContextUnit); rpc ExecuteCode(ContextUnit) returns (ContextUnit);}Additional Services
| Proto | Service | Purpose |
|---|---|---|
router.proto | RouterService | Dispatch, invoke graph, plugin management |
admin.proto | AdminService | Agent management, traces, memory, health (contextunity.view) |
shield.proto | ShieldService | Firewall scan, token validation, policy management |
zero.proto | ZeroService | PII anonymization, deanonymization, session management |
contextunit.proto | — | Base ContextUnit message definition |
Proto Compilation
After modifying any .proto file, regenerate stubs immediately:
uv run python scripts/build_protos.pyThis generates *_pb2.py and *_pb2_grpc.py files for all 8 protos.
Importing Stubs
# Correct — always import from contextunity.corefrom contextunity.core import brain_pb2, brain_pb2_grpcfrom contextunity.core import contextunit_pb2
# Create clientchannel = grpc.aio.insecure_channel("localhost:50051")stub = brain_pb2_grpc.BrainServiceStub(channel)
# All RPCs use ContextUnitunit = ContextUnit(payload={"tenant_id": "my_app", "query": "..."})response = await stub.Upsert(unit.to_protobuf(contextunit_pb2))