Hooks¶
Junjo hooks let you observe workflow lifecycle events with ordinary Python callbacks. Hooks are optional and do not control workflow execution. OpenTelemetry remains a built-in runtime concern even when you use hooks.
Simple completion logging¶
from junjo import BaseState, BaseStore, Graph, Hooks, Node, Workflow
from junjo.hooks import WorkflowCompletedEvent
class MyState(BaseState):
message: str = "hello"
class MyStore(BaseStore[MyState]):
pass
class MyNode(Node[MyStore]):
async def service(self, store: MyStore) -> None:
return None
def create_graph() -> Graph:
node = MyNode()
return Graph(source=node, sinks=[node], edges=[])
hooks = Hooks()
def log_completed(event: WorkflowCompletedEvent[MyState]) -> None:
print(
event.hook_name,
{
"workflow_name": event.name,
"run_id": event.run_id,
"executable_definition_id": event.executable_definition_id,
"executable_runtime_id": event.executable_runtime_id,
"executable_structural_id": event.executable_structural_id,
"enclosing_graph_structural_id": event.enclosing_graph_structural_id,
"trace_id": event.trace_id,
"span_id": event.span_id,
"message": event.result.state.message,
},
)
hooks.on_workflow_completed(log_completed)
workflow = Workflow[MyState, MyStore](
name="My Workflow",
graph_factory=create_graph,
store_factory=lambda: MyStore(initial_state=MyState()),
hooks=hooks,
)
result = await workflow.execute()
Common hook payloads¶
Hook callbacks receive one immutable event object. Useful fields include:
event.run_id: the unique execution id for this runevent.executable_definition_id: the stable id of the workflow, subflow, or node definitionevent.hook_name: the lifecycle hook name such asworkflow_completedevent.executable_runtime_id: the runtime id of the executable that fired the hookevent.executable_structural_id: the stable structural id of the executable that fired the hookevent.enclosing_graph_structural_id: the stable structural id of the graph enclosing the executableevent.parent_executable_definition_id: the stable definition id of the parent workflow, subflow, or concurrent executable when the hook fires inside a nested execution scopeevent.parent_executable_runtime_id/event.parent_executable_structural_id: parent executable identities when the hook fires inside a nested execution scopeevent.trace_id/event.span_id: OpenTelemetry correlation idsevent.result.state: final state on completion hooksevent.state: detached state snapshot on failure, cancellation, and state-changed hooksevent.patch: JSON patch string onon_state_changedevent.error: original exception on failure hooksevent.reason: cancellation reason on cancellation hooks
Hook event state payloads are Python-side lifecycle data, not serialized
OpenTelemetry payloads. event.state is a detached copied state object for
in-process inspection inside your callback. That means telemetry-oriented state
serialization choices such as field exclusion or truncation do not
automatically change what appears on event.state.
For on_state_changed, the executable identity fields describe the active
workflow, subflow, node, or concurrent executable that actually triggered the
state update. The parent executable fields describe the containing execution
scope around that update.
Base example¶
The base example app registers every public hook with simple logging so you can see the complete lifecycle payloads that fire during a normal run:
workflow hooks
subflow hooks
node hooks
run-concurrent hooks
state-changed hooks