Skip to content

gRPC Contracts

All ContextUnity services communicate via gRPC using a universal ContextUnit envelope — every RPC accepts and returns contextunity.core.ContextUnit. Domain-specific data flows through the payload dict.

ContextUnit Envelope Pattern

// Every service uses ContextUnit as input and output
rpc Search(contextunity.core.ContextUnit) returns (stream contextunity.core.ContextUnit);
rpc Upsert(contextunity.core.ContextUnit) returns (contextunity.core.ContextUnit);

This pattern unifies security (token travels with every call), provenance tracking, and error handling across the entire mesh.

Proto Definitions

All proto files live in packages/core/protos/:

brain.proto — 17 RPCs

service BrainService {
// Knowledge
rpc Search(ContextUnit) returns (stream ContextUnit);
rpc GraphSearch(ContextUnit) returns (ContextUnit);
rpc CreateKGRelation(ContextUnit) returns (ContextUnit);
rpc Upsert(ContextUnit) returns (ContextUnit);
// Memory
rpc QueryMemory(ContextUnit) returns (stream ContextUnit);
rpc AddEpisode(ContextUnit) returns (ContextUnit);
rpc GetRecentEpisodes(ContextUnit) returns (stream ContextUnit);
rpc UpsertFact(ContextUnit) returns (ContextUnit);
rpc GetUserFacts(ContextUnit) returns (stream ContextUnit);
// News
rpc UpsertNewsItem(ContextUnit) returns (ContextUnit);
rpc GetNewsItems(ContextUnit) returns (stream ContextUnit);
rpc UpsertNewsPost(ContextUnit) returns (ContextUnit);
rpc CheckNewsPostExists(ContextUnit) returns (ContextUnit);
// Traces
rpc LogTrace(ContextUnit) returns (ContextUnit);
rpc GetTraces(ContextUnit) returns (stream ContextUnit);
// Taxonomy
rpc UpsertTaxonomy(ContextUnit) returns (ContextUnit);
rpc GetTaxonomy(ContextUnit) returns (stream ContextUnit);
}

commerce.proto — 8 RPCs

service CommerceService {
rpc GetProduct(ContextUnit) returns (ContextUnit);
rpc UpdateProduct(ContextUnit) returns (ContextUnit);
rpc GetProducts(ContextUnit) returns (stream ContextUnit);
rpc UpsertDealerProduct(ContextUnit) returns (ContextUnit);
rpc UpdateEnrichment(ContextUnit) returns (ContextUnit);
rpc TriggerHarvest(ContextUnit) returns (ContextUnit);
rpc GetPendingVerifications(ContextUnit) returns (stream ContextUnit);
rpc SubmitVerification(ContextUnit) returns (ContextUnit);
}

worker.proto — 3 RPCs

service WorkerService {
rpc StartWorkflow(ContextUnit) returns (ContextUnit);
rpc GetTaskStatus(ContextUnit) returns (ContextUnit);
rpc ExecuteCode(ContextUnit) returns (ContextUnit);
}

Additional Services

ProtoServicePurpose
router.protoRouterServiceDispatch, invoke graph, plugin management
admin.protoAdminServiceAgent management, traces, memory, health (contextunity.view)
shield.protoShieldServiceFirewall scan, token validation, policy management
zero.protoZeroServicePII anonymization, deanonymization, session management
contextunit.protoBase ContextUnit message definition

Proto Compilation

After modifying any .proto file, regenerate stubs immediately:

Terminal window
uv run python scripts/build_protos.py

This generates *_pb2.py and *_pb2_grpc.py files for all 8 protos.

Importing Stubs

# Correct — always import from contextunity.core
from contextunity.core import brain_pb2, brain_pb2_grpc
from contextunity.core import contextunit_pb2
# Create client
channel = grpc.aio.insecure_channel("localhost:50051")
stub = brain_pb2_grpc.BrainServiceStub(channel)
# All RPCs use ContextUnit
unit = ContextUnit(payload={"tenant_id": "my_app", "query": "..."})
response = await stub.Upsert(unit.to_protobuf(contextunit_pb2))