Hook Functions Reference¶
Hook functions are the interface between Section 1 and Section 2. Each template defines the hooks relevant to its pod type. Section 2 calls your hook at the right moment; you return the result and Section 2 handles delivery.
All hooks¶
| Hook | Called When | Pod Types | Parameters | Returns |
|---|---|---|---|---|
on_trigger(payload, headers) | POST /trigger or auto-trigger fires | Gateway, Initiator | payload: dict, headers: dict | dict (forwarded downstream) |
on_receive_relay(payload) | WoSP ingress message arrives | Relay / Processor | payload: dict | dict (forwarded downstream) |
on_receive_fan_out(payload) | WoSP ingress before broadcast | Distributor / Router | payload: dict | dict (broadcast to all egress pods) |
on_aggregate(partials) | All N upstream messages received | Aggregator | partials: list[dict] | dict (forwarded downstream) |
on_receive_terminator(payload) | WoSP ingress, final hop | Sink, Terminator | payload: dict | None (no egress) |
Hook functions run in the asyncio event loop
Use await for any I/O operations (HTTP calls, database writes, file reads). Blocking calls in hooks will stall message delivery across the entire network until the call returns.
on_trigger¶
Called by: Gateway pods (POST /trigger) and the auto-trigger coroutine at startup.
Stub:
What to implement: - Authentication: validate API keys or tokens from headers - Input validation: check schema, reject invalid payloads - Payload transformation: add metadata, generate task IDs, reshape the dict
What NOT to implement: - Anything that calls WoSP, Envoy, or Kubernetes APIs - Routing decisions (topology is fixed at composition time)
Example — authentication and enrichment:
import os, uuid
def on_trigger(payload, headers):
api_key = headers.get("x-api-key")
if api_key != os.environ.get("EXPECTED_KEY"):
raise ValueError("Unauthorized")
return {
"task_id": str(uuid.uuid4()),
"source": "gateway",
"data": payload
}
on_receive_relay¶
Called by: Relay/Processor pods when a WoSP message arrives from upstream.
Stub:
What to implement: - Data transformation: enrich, filter, reshape - Audit logging: record the payload before forwarding - Calls to external services (use await for async calls)
Example — enrichment:
import datetime
def on_receive_relay(payload):
payload["processed_at"] = datetime.datetime.utcnow().isoformat()
payload["processed_by"] = "middleware"
return payload
on_receive_fan_out¶
Called by: Distributor/Router pods before broadcasting to all N downstream pods.
Stub:
What to implement: - Pre-broadcast transformation: add routing metadata, set priority fields - Filtering: conditionally suppress broadcast (return None or empty dict to skip)
Example — routing metadata:
def on_receive_fan_out(payload):
payload["broadcast_id"] = str(uuid.uuid4())
payload["dispatched_at"] = time.time()
return payload
on_aggregate¶
Called by: Aggregator pods after all N upstream messages have arrived.
Parameters: partials — a list of N dicts, one per upstream source, in arrival order.
Stub:
What to implement: - Merge N partial results into a single combined result - Compute aggregate metrics (sum, average, consensus) - Dispatch to an external webhook or queue after aggregation
Example — merge and compute:
def on_aggregate(partials):
scores = [p.get("score", 0) for p in partials]
return {
"count": len(partials),
"average_score": sum(scores) / len(scores),
"all_results": partials
}
on_receive_terminator¶
Called by: Sink and Terminator pods on the final hop. No egress — this is the end of the network.
Stub:
What to implement: - Store results (database write, file write) - Dispatch to external systems (webhook, queue, email) - Emit metrics or telemetry
Example — webhook dispatch: