Skip to content

Hook Functions Reference

Hook functions are the interface between Section 1 and Section 2. Each template defines the hooks relevant to its pod type. Section 2 calls your hook at the right moment; you return the result and Section 2 handles delivery.

All hooks

Hook Called When Pod Types Parameters Returns
on_trigger(payload, headers) POST /trigger or auto-trigger fires Gateway, Initiator payload: dict, headers: dict dict (forwarded downstream)
on_receive_relay(payload) WoSP ingress message arrives Relay / Processor payload: dict dict (forwarded downstream)
on_receive_fan_out(payload) WoSP ingress before broadcast Distributor / Router payload: dict dict (broadcast to all egress pods)
on_aggregate(partials) All N upstream messages received Aggregator partials: list[dict] dict (forwarded downstream)
on_receive_terminator(payload) WoSP ingress, final hop Sink, Terminator payload: dict None (no egress)

Hook functions run in the asyncio event loop

Use await for any I/O operations (HTTP calls, database writes, file reads). Blocking calls in hooks will stall message delivery across the entire network until the call returns.


on_trigger

Called by: Gateway pods (POST /trigger) and the auto-trigger coroutine at startup.

Stub:

def on_trigger(payload, headers):
    return payload

What to implement: - Authentication: validate API keys or tokens from headers - Input validation: check schema, reject invalid payloads - Payload transformation: add metadata, generate task IDs, reshape the dict

What NOT to implement: - Anything that calls WoSP, Envoy, or Kubernetes APIs - Routing decisions (topology is fixed at composition time)

Example — authentication and enrichment:

import os, uuid

def on_trigger(payload, headers):
    api_key = headers.get("x-api-key")
    if api_key != os.environ.get("EXPECTED_KEY"):
        raise ValueError("Unauthorized")
    return {
        "task_id": str(uuid.uuid4()),
        "source": "gateway",
        "data": payload
    }


on_receive_relay

Called by: Relay/Processor pods when a WoSP message arrives from upstream.

Stub:

def on_receive_relay(payload):
    return payload

What to implement: - Data transformation: enrich, filter, reshape - Audit logging: record the payload before forwarding - Calls to external services (use await for async calls)

Example — enrichment:

import datetime

def on_receive_relay(payload):
    payload["processed_at"] = datetime.datetime.utcnow().isoformat()
    payload["processed_by"] = "middleware"
    return payload


on_receive_fan_out

Called by: Distributor/Router pods before broadcasting to all N downstream pods.

Stub:

def on_receive_fan_out(payload):
    return payload

What to implement: - Pre-broadcast transformation: add routing metadata, set priority fields - Filtering: conditionally suppress broadcast (return None or empty dict to skip)

Example — routing metadata:

def on_receive_fan_out(payload):
    payload["broadcast_id"] = str(uuid.uuid4())
    payload["dispatched_at"] = time.time()
    return payload


on_aggregate

Called by: Aggregator pods after all N upstream messages have arrived.

Parameters: partials — a list of N dicts, one per upstream source, in arrival order.

Stub:

def on_aggregate(partials):
    return {"results": partials}

What to implement: - Merge N partial results into a single combined result - Compute aggregate metrics (sum, average, consensus) - Dispatch to an external webhook or queue after aggregation

Example — merge and compute:

def on_aggregate(partials):
    scores = [p.get("score", 0) for p in partials]
    return {
        "count": len(partials),
        "average_score": sum(scores) / len(scores),
        "all_results": partials
    }


on_receive_terminator

Called by: Sink and Terminator pods on the final hop. No egress — this is the end of the network.

Stub:

def on_receive_terminator(payload):
    pass  # No return value; no downstream to forward to

What to implement: - Store results (database write, file write) - Dispatch to external systems (webhook, queue, email) - Emit metrics or telemetry

Example — webhook dispatch:

import httpx

async def on_receive_terminator(payload):
    async with httpx.AsyncClient() as client:
        await client.post(
            os.environ["RESULT_WEBHOOK_URL"],
            json=payload,
            timeout=10.0
        )