API Reference¶
Core API¶
Junjo: A python library for building and managing complex Graph Workflows.
This library provides the building blocks and tools for wrapping python functions into nodes, edges, and graphs that can be executed by a workflow.
This library also produces annotated Opentelemetry Spans to help make sense of execution telemetry.
- class junjo.Condition[source]¶
-
Abstract base class for edge conditions in a workflow graph.
Implement a concrete condition that determines whether a transition along an edge should occur based only on the current state.
This is designed to be used with the
Edgeclass, which represents a directed edge in the workflow graph. The condition is evaluated when determining whether to transition from the tail node to the head node.StateTis the state type that the condition evaluates against. It should be a subclass ofBaseState.Conditions should follow these rules:
The condition should be stateless and depend only on the current state.
Do not use side effects in the condition, such as network calls or database queries.
The condition should be a pure function of the state.
Example
class MyCondition(Condition[MyState]): def evaluate(self, state: MyState) -> bool: return state.some_property == "some_value" my_condition = MyCondition() edges = [ Edge(tail=node_1, head=node_2, condition=my_condition), Edge(tail=node_2, head=node_3), ]
- class junjo.Graph(source: Node | _NestableWorkflow, sinks: list[Node | _NestableWorkflow], edges: list[Edge])[source]¶
Bases:
objectRepresents a directed graph of nodes and edges, defining the structure and flow of a workflow.
The
Graphclass is a fundamental component in Junjo, responsible for encapsulating the relationships between different processing units (Nodes or Subflows) and the conditions under which transitions between them occur.It holds references to the entry point (source) and explicit terminal nodes (sinks) of the graph, as well as a list of all edges that connect the nodes.
Graph definitions are immutable after construction. Junjo stores the source, sinks, and edges as read-only graph-shape metadata so the cached compiled snapshot cannot silently drift away from the graph definition.
- Parameters:
source (Node | _NestableWorkflow) – The starting node or subflow of the graph. Execution of the workflow begins here.
sinks (list[Node | _NestableWorkflow]) – The explicit terminal nodes or subflows of the graph. Execution completes successfully only when one of these executables is reached.
edges (list[Edge]) – A list of
Edgeinstances that define the connections and transition logic between nodes in the graph.
Example
from junjo import Node, Edge, Graph, BaseStore, Condition, BaseState # Define a simple state (can be more complex in real scenarios) class MyWorkflowState(BaseState): count: int | None = None # Define a simple store class MyWorkflowStore(BaseStore[MyWorkflowState]): async def set_count(self, payload: int) -> None: await self.set_state({"count": payload}) # Define some simple nodes class FirstNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("First Node Executed") class CountItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: # In a real scenario, you might get items from state and count them await store.set_count(5) # Example count print("Counted items") class EvenItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Path taken for even items count.") class OddItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Path taken for odd items count.") class FinalNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Final Node Executed") # Define a condition class CountIsEven(Condition[MyWorkflowState]): def evaluate(self, state: MyWorkflowState) -> bool: if state.count is None: return False return state.count % 2 == 0 # Instantiate the nodes first_node = FirstNode() count_items_node = CountItemsNode() even_items_node = EvenItemsNode() odd_items_node = OddItemsNode() final_node = FinalNode() # Create the workflow graph workflow_graph = Graph( source=first_node, sinks=[final_node], edges=[ Edge(tail=first_node, head=count_items_node), Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()), Edge(tail=count_items_node, head=odd_items_node), # Fallback Edge(tail=even_items_node, head=final_node), Edge(tail=odd_items_node, head=final_node), ] )
- property source: Node | _NestableWorkflow¶
Return the immutable source executable for this graph definition.
- property sinks: tuple[Node | _NestableWorkflow, ...]¶
Return the immutable terminal executables for this graph definition.
- compile() CompiledGraph[source]¶
Compile this graph into one canonical structural snapshot.
The compiled snapshot is cached per
Graphinstance and becomes the structural source of truth for validation, traversal, and serialization.Compiling a graph does not execute node logic or evaluate edge conditions. It only normalizes the graph structure into a single, immutable representation.
- Returns:
The compiled structural snapshot for this graph instance.
- Return type:
- Raises:
GraphCompilationError – If Junjo cannot build a consistent structural representation of the graph.
- validate() None[source]¶
Validate the graph’s structural shape and declared terminal nodes.
This validation pass is intentionally structural. It does not execute node logic or edge conditions. Instead, it checks the graph topology using the declared edges, source, and sinks.
Validation currently enforces:
every declared sink is reachable from the source
declared sinks do not have outgoing edges
every reachable non-sink node has at least one outgoing edge
nested subflow graphs validate recursively
Cycles are allowed as long as the graph still has a reachable sink and no reachable non-sink dead ends.
- Raises:
GraphValidationError – If the graph shape violates Junjo’s current validation rules.
- async get_next_node(store: BaseStore, current_node: Node | _NestableWorkflow) Node | _NestableWorkflow[source]¶
Retrieve the next node or subflow in the graph for the given current executable.
This method checks the edges connected to the current executable and resolves the next executable based on the conditions defined in those edges.
Junjo uses ordered first-match traversal semantics:
outgoing edges are considered in the order they were declared
the first edge whose condition resolves to a next executable wins
later edges are not evaluated once a match is found
If no outgoing edge resolves and the current executable is not already a declared sink, this method raises
GraphValidationError.- Parameters:
store (BaseStore) – The store instance to use for resolving the next executable.
current_node (Node | _NestableWorkflow) – The current node or subflow in the graph.
- Returns:
The next node or subflow in the graph.
- Return type:
- serialize_to_json_string() str[source]¶
Convert the graph to a neutral serialized JSON string.
The serialized representation treats
RunConcurrentinstances as subgraphs and includes nested subflow graphs as well.The serialized payload includes explicit runtime and structural identities for the graph, nodes, and edges. Nested subflow nodes also include their child graph structural id plus explicit source and sink runtime and structural ids.
- Returns:
A JSON string containing the graph structure.
- Return type:
- Raises:
GraphSerializationError – If the graph payload cannot be converted into JSON.
- to_mermaid() str[source]¶
Render the graph as Mermaid flowchart syntax.
Mermaid rendering consumes the compiled graph snapshot directly instead of routing through serialized JSON. The returned string contains one flowchart with:
the overview graph
concurrent executables rendered as Mermaid subgraphs
disconnected detail subgraphs for each subflow
Node, concurrent, and subflow identifiers are structural, which keeps Mermaid output stable across repeated fresh graph builds with the same topology.
- Returns:
Mermaid flowchart syntax for the compiled graph.
- Return type:
- to_dot_notation() str[source]¶
Render the Junjo graph as a main overview digraph plus one additional digraph for each subflow.
DOT rendering consumes the compiled graph snapshot directly instead of routing through serialized JSON. Rendered node and subflow identifiers are structural, which keeps DOT output stable across repeated fresh graph builds with the same topology.
- export_graphviz_assets(out_dir: str | Path = 'graphviz_out', fmt: str = 'svg', dot_cmd: str = 'dot', open_html: bool = False, clean: bool = True) dict[str, Path][source]¶
Render every digraph produced by
to_dot_notation()and build a gallery HTML page whose headings use the human labels (e.g. “SampleSubflow”) instead of raw digraph identifiers.Graphviz export renders directly from the compiled graph snapshot. It does not depend on the serialized graph JSON payload, which keeps the rendering path aligned with the same canonical structure used for validation and traversal.
- class junjo.CompiledGraph(graph_structural_id: str, source_node_runtime_id: str, sink_node_runtime_ids: tuple[str, ...], compiled_nodes: tuple[CompiledNode, ...], compiled_nodes_by_runtime_id: Mapping[str, CompiledNode], compiled_edges: tuple[CompiledEdge, ...], outgoing_compiled_edges_by_tail_runtime_id: Mapping[str, tuple[CompiledEdge, ...]], reachable_node_runtime_ids: frozenset[str])[source]¶
Bases:
objectThe canonical structural representation of a single
Graphinstance.A compiled graph is immutable and normalized for graph-facing features:
validation
traversal adjacency lookups
serialization
rendering
Runtime graph objects still define the graph, but compiled snapshots are the single structural source of truth for all graph operations.
- compiled_nodes: tuple[CompiledNode, ...]¶
- compiled_nodes_by_runtime_id: Mapping[str, CompiledNode]¶
- compiled_edges: tuple[CompiledEdge, ...]¶
- outgoing_compiled_edges_by_tail_runtime_id: Mapping[str, tuple[CompiledEdge, ...]]¶
- class junjo.CompiledNode(node_runtime_id: str, node_structural_id: str, node_type_name: str, node_label: str, node_runtime_ref: Node | _NestableWorkflow, is_concurrent_subgraph: bool = False, is_subflow: bool = False, child_node_runtime_ids: tuple[str, ...] = (), compiled_subflow_graph: CompiledGraph | None = None)[source]¶
Bases:
objectA normalized structural node within a compiled graph snapshot.
Compiled nodes capture the graph-facing metadata Junjo needs for validation, serialization, and rendering while preserving the original runtime node or subflow reference for execution-time operations.
- node_runtime_ref: Node | _NestableWorkflow¶
- compiled_subflow_graph: CompiledGraph | None = None¶
- class junjo.CompiledEdge(edge_structural_id: str, edge_ordinal: int, tail_node_runtime_id: str, tail_node_structural_id: str, head_node_runtime_id: str, head_node_structural_id: str, edge_condition_label: str | None, edge_runtime_ref: Edge)[source]¶
Bases:
objectA normalized structural edge within a compiled graph snapshot.
Compiled edges preserve the original declared ordering through
edge_ordinaland keep a reference to the runtimeEdgeobject so traversal can still evaluate conditions against the run-local store.
- exception junjo.GraphValidationError[source]¶
Bases:
GraphError,ValueErrorRaised when a graph shape or traversal outcome violates Junjo’s graph rules.
This includes invalid constructor inputs such as an empty
sinkslist and runtime traversal situations where execution dead-ends on a non-sink node.
- exception junjo.GraphCompilationError[source]¶
Bases:
GraphErrorRaised when a graph cannot be compiled into a canonical structural form.
Compilation failures are structural normalization failures, not traversal or validation failures. This exception is used when Junjo cannot produce a consistent compiled graph snapshot from the runtime graph definition.
- exception junjo.GraphSerializationError[source]¶
Bases:
GraphErrorRaised when a graph cannot be converted into its serialized JSON form.
This exception is used when Junjo successfully builds an in-memory graph payload, but
json.dumpscannot serialize part of that payload. This is typically caused by attaching non-JSON-serializable values to graph-facing metadata such as node labels.
- exception junjo.GraphRenderError[source]¶
Bases:
GraphErrorRaised when a graph cannot be rendered into an output format.
This includes Graphviz command failures and failures while producing Mermaid or DOT output from the compiled graph snapshot.
- class junjo.Hooks[source]¶
Bases:
objectRegistry for optional Junjo lifecycle callbacks.
Hooks are observers. They do not create spans or control workflow execution. If a hook callback raises, Junjo keeps execution isolated and continues dispatching the remaining callbacks for that hook.
To use them, register one or more callbacks and pass the registry to a workflow or subflow.
Example
import logging hooks = Hooks() logger = logging.getLogger(__name__) def log_completed(event: WorkflowCompletedEvent[MyState]) -> None: logger.info("%s %s", event.hook_name, event.result.state.model_dump()) hooks.on_workflow_completed(log_completed) workflow = Workflow(..., hooks=hooks)
- on_workflow_started(callback: Callable[[WorkflowStartedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for the start of a top-level workflow run.
- on_workflow_completed(callback: Callable[[WorkflowCompletedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for successful workflow completion.
- on_workflow_failed(callback: Callable[[WorkflowFailedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for workflow failures.
- on_workflow_cancelled(callback: Callable[[WorkflowCancelledEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for workflow cancellation.
- on_subflow_started(callback: Callable[[SubflowStartedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for subflow start events.
- on_subflow_completed(callback: Callable[[SubflowCompletedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for successful subflow completion.
- on_subflow_failed(callback: Callable[[SubflowFailedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for subflow failures.
- on_subflow_cancelled(callback: Callable[[SubflowCancelledEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for subflow cancellation.
- on_node_started(callback: Callable[[NodeStartedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for node start events.
- on_node_completed(callback: Callable[[NodeCompletedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for successful node completion.
- on_node_failed(callback: Callable[[NodeFailedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for node failures.
- on_node_cancelled(callback: Callable[[NodeCancelledEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for node cancellation.
- on_run_concurrent_started(callback: Callable[[RunConcurrentStartedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for RunConcurrent start events.
- on_run_concurrent_completed(callback: Callable[[RunConcurrentCompletedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for successful RunConcurrent completion.
- on_run_concurrent_failed(callback: Callable[[RunConcurrentFailedEvent], None | Awaitable[None]]) Callable[[], None][source]¶
Register a callback for RunConcurrent failures.
- class junjo.GraphFactory(*args, **kwargs)[source]¶
Bases:
Protocol,Generic[_CovariantGraphT]A callable that returns a new instance of a workflow’s graph.
This factory is invoked at the beginning of each
execute()orexecute()call to ensure a fresh, isolated graph for that execution. This is critical for concurrency safety because nodes and subflows are runtime objects with per-run identities.
- class junjo.StoreFactory(*args, **kwargs)[source]¶
Bases:
Protocol,Generic[_CovariantStoreT]A callable that returns a new instance of a workflow’s store.
This factory is invoked at the beginning of each
execute()orexecute()call to ensure fresh, isolated state for that execution.
- class junjo.ExecutionResult(run_id: str, definition_id: str, name: str, state: StateT, node_execution_counts: Mapping[str, int])[source]¶
Bases:
Generic[StateT]Immutable snapshot of a completed workflow or subflow execution.
ExecutionResultis the public post-run API for accessing final state and execution metadata without exposing live runtime objects like the internal store or graph.The result includes:
run_id: The unique identifier for this specific execution.definition_id: The stable identifier of the workflow or subflow definition.name: The configured workflow or subflow name.state: The detached final state snapshot for the completed execution.node_execution_counts: Current-scope execution counts keyed by executable id.
- state: StateT¶
- class junjo.Workflow(graph_factory: GraphFactory[Graph], store_factory: StoreFactory[StoreT], max_iterations: int = 100, hooks: Hooks | None = None, name: str | None = None)[source]¶
Bases:
_NestableWorkflow[StateT,StoreT,None,None]A Workflow is a top-level, executable collection of nodes and edges arranged as a graph. It manages its own state and store, distinct from any parent or sub-workflows.
This class is generic and requires two type parameters for a convenient and type-safe developer experience:
StateTis the type of state managed by this workflow and should be a subclass ofBaseState.StoreTis the store type used by this workflow and should be a subclass ofBaseStore.Every call to
execute()creates a fresh graph, a fresh store, and a fresh execution context. That makes aWorkflowinstance a reusable definition or blueprint rather than a mutable live-run container.- Parameters:
name (str | None, optional) – An optional name for the workflow. If not provided, the class name is used.
graph_factory (GraphFactory[Graph]) – A callable that returns a new instance of the workflow’s graph (
Graph). This factory is invoked at the beginning of eachexecute()call to ensure a fresh, isolated graph for that execution.store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the workflow’s store (
StoreT). This factory is invoked at the beginning of eachexecute()call to ensure a fresh store for that execution.max_iterations (int, optional) – The maximum number of times any single node or executable may run within one workflow execution. This helps detect accidental loops. Defaults to 100.
hooks (Hooks | None, optional) – An optional
Hooksregistry for observing workflow lifecycle events. Hooks are optional observers; they do not control OpenTelemetry instrumentation or workflow execution.
Example without hooks
workflow = Workflow[MyState, MyStore]( name="demo_base_workflow", graph_factory=create_my_graph, store_factory=lambda: MyStore(initial_state=MyState()), ) result = await workflow.execute() print(result.state.model_dump_json())
Example with hooks
import logging hooks = Hooks() logger = logging.getLogger(__name__) def log_completed(event) -> None: logger.info( "workflow completed %s %s %s", event.name, event.run_id, event.result.state.model_dump(), ) hooks.on_workflow_completed(log_completed) workflow = Workflow[MyState, MyStore]( name="configured_workflow", graph_factory=create_my_graph, store_factory=lambda: MyStore(initial_state=MyState()), hooks=hooks, )
Passing Parameters to Factories
To provide parameters to your
graph_factoryorstore_factorywhen you create a workflow, wrap the factory call in alambda. This creates an argument-less factory that closes over the dependencies you want to inject while preserving the fresh-per-run execution model.def create_graph_with_dependency(emulator: Emulator) -> Graph: return Graph(...) my_emulator = Emulator() workflow = Workflow[MyState, MyStore]( name="configured_workflow", graph_factory=lambda: create_graph_with_dependency(my_emulator), store_factory=lambda: MyStore(initial_state=MyState()), )
- class junjo.Subflow(graph_factory: GraphFactory[Graph], store_factory: StoreFactory[StoreT], max_iterations: int = 100, hooks: Hooks | None = None, name: str | None = None)[source]¶
Bases:
_NestableWorkflow[StateT,StoreT,ParentStateT,ParentStoreT],ABCA Subflow is a workflow that:
Executes within a parent workflow or parent subflow.
Has its own isolated state and store.
Can interact with its parent workflow state before and after execution via
pre_run_actions()andpost_run_actions().
Like top-level workflows, subflows create a fresh graph and a fresh store for every execution. The child run is isolated from the parent store except for the explicit handoff points provided by the pre- and post-run hooks.
- Parameters:
name (str | None, optional) – An optional name for the subflow. If not provided, the class name is used.
graph_factory (GraphFactory[Graph]) – A callable that returns a new instance of the subflow’s graph (
Graph). This factory is invoked at the beginning of eachexecute()call to ensure a fresh, isolated graph for that execution.store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the subflow’s store (
StoreT). This factory is invoked at the beginning of eachexecute()call to ensure a fresh store for that execution.max_iterations (int, optional) – The maximum number of times any single node or executable may run within one subflow execution. Defaults to 100.
hooks (Hooks | None, optional) – An optional
Hooksregistry for observing lifecycle events emitted by this subflow.
Example
class ExampleSubflow( Subflow[SubflowState, SubflowStore, ParentState, ParentStore] ): async def pre_run_actions(self, parent_store, subflow_store): parent_state = await parent_store.get_state() await subflow_store.set_parameter( {"parameter": parent_state.parameter} ) async def post_run_actions(self, parent_store, subflow_store): subflow_state = await subflow_store.get_state() await parent_store.set_subflow_result(subflow_state.result) example_subflow = ExampleSubflow( graph_factory=create_example_subflow_graph, store_factory=lambda: ExampleSubflowStore( initial_state=ExampleSubflowState() ), )
- abstractmethod async pre_run_actions(parent_store: ParentStoreT, subflow_store: StoreT) None[source]¶
This method is called before the subflow has run.
This is where you can pass initial state values from the parent workflow to the subflow store for this specific run.
- Parameters:
parent_store (ParentStoreT) – The parent store to interact with.
subflow_store (StoreT) – The store for this specific subflow execution.
- abstractmethod async post_run_actions(parent_store: ParentStoreT, subflow_store: StoreT) None[source]¶
This method is called after the subflow has run.
This is where you can update the parent store with the results of the child workflow.
- Parameters:
parent_store (ParentStoreT) – The parent store to update.
subflow_store (StoreT) – The store for this specific subflow execution.
- class junjo.Node[source]¶
-
Nodes are the building blocks of a workflow. They represent a single unit of work that can be executed within the context of a workflow.
Place business logic to be executed by the node in
service(). Junjo wraps that service method with OpenTelemetry tracing, error handling, and lifecycle hook dispatch duringexecute().The
Nodetype is meant to remain decoupled from your application’s domain logic. While you can place business logic directly in theservice()method, it is recommended that you call a service function located in a separate module. This keeps nodes easy to test, easier to understand, and focused on orchestration rather than implementation detail.StoreTis the workflow store type that will be passed into this node during execution.Nodes have three main responsibilities:
The workflow passes the run-local store to the node’s
execute()method.execute()manages tracing, lifecycle hooks, and error handling.service()performs the side effects for this unit of work.
Example implementation
class SaveMessageNode(Node[MessageWorkflowStore]): async def service(self, store) -> None: state = await store.get_state() sentiment = await get_message_sentiment(state.message) await store.set_message_sentiment(sentiment)
- property patches: list[JsonPatch]¶
Returns the list of state patches that have been applied by this node.
- abstractmethod async service(store: StoreT) None[source]¶
This is the main logic of the node.
The concrete implementation of this method should contain the side effects that this node will perform. The method is called by
execute(), which is responsible for tracing, lifecycle dispatch, and error handling.The
service()method should not be called directly. Instead, it should be called by theexecute()method of the node.DO NOT EXECUTE
node.service()DIRECTLY! Usenode.execute()instead.- Parameters:
store (StoreT) – The run-local store passed to the node’s service function.
- async execute(store: StoreT, parent_id: str) None[source]¶
Execute the node’s
service()method with tracing and lifecycle dispatch.This method acquires a tracer, opens a node span, emits lifecycle events, and then calls
service(). Terminal lifecycle hooks are dispatched before the span closes so hook failure telemetry can stay attached to the real node span.- Parameters:
store (StoreT) – The run-local store for the current workflow execution.
parent_id (str) – The identifier of the parent workflow or subflow. This becomes the parent id recorded on the node span.
- class junjo.RunConcurrent(name: str, items: list[Node | Subflow])[source]¶
Bases:
NodeExecute a list of nodes or subflows concurrently.
An instance of
RunConcurrentcan be added to a workflow graph the same way as any other node. Under the hood it starts one task per child item and waits for them to settle.If one child fails, Junjo cancels all still-pending siblings and re-raises the original failure. Cancelled siblings are marked as cancelled in telemetry rather than as errors so traces tell the full story of the failure boundary.
- Parameters:
node_1 = NodeOne() node_2 = NodeTwo() node_3 = NodeThree() run_concurrent = RunConcurrent( name="Concurrent Execution", items=[node_1, node_2, node_3], )
- async service(store: BaseStore) None[source]¶
Execute the provided nodes and subflows concurrently.
Child items receive the same run-local store. If one item fails, all still-pending siblings are cancelled and the original failure is re-raised once the cancellations have been drained.
- class junjo.BaseState[source]¶
Bases:
BaseModelCommon base for states, with no unknown fields allowed.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config = {'extra': 'forbid'}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class junjo.BaseStore(initial_state: StateT)[source]¶
Bases:
Generic[StateT]BaseStore represents a generic store for managing the state of a workflow. It is designed to be subclassed with a specific state type (a Pydantic model derived from
BaseState).The store is responsible for:
Managing the state of a workflow or subflow execution.
Making immutable updates to that state safely in a concurrent environment.
Validating committed updates against the underlying Pydantic model.
The store uses an
asyncio.Lockto ensure that state updates are concurrency-safe. Each committed update is derived, validated, and applied against the exact locked state version it modifies, which prevents stale validate-then-apply races under concurrent execution.Subclass
BaseStorewith your own state type and expose domain-specific actions that callset_state().Example
class MyWorkflowState(BaseState): user_input: str processed_data: dict | None = None class MyWorkflowStore(BaseStore[MyWorkflowState]): async def set_processed_data(self, data: dict) -> None: await self.set_state({"processed_data": data})
- Parameters:
initial_state (StateT) – The initial state of the store.
- async get_state() StateT[source]¶
Return a detached deep copy of the current state.
This method follows the immutability principle for read access: callers receive a deep snapshot that can be inspected freely without mutating the live store. Store updates must still flow through store actions and
set_state().
- async get_state_json() str[source]¶
Return the current state as a JSON string.
This is useful for logging, tracing, or serializing the current state without manually calling
model_dump_jsonon a snapshot.
- async set_state(update: dict) None[source]¶
Update the store’s state with a dictionary of changes.
The update is merged with the current locked state, validated against the store’s Pydantic model, and committed atomically if it changes the state. This method also emits OpenTelemetry
set_stateevents and lifecycle state-changed hooks when a commit occurs.- Parameters:
update (dict) – A dictionary of updates to apply to the state.
class MessageWorkflowState(BaseState): received_message: Message class MessageWorkflowStore(BaseStore[MessageWorkflowState]): async def set_received_message(self, payload: Message) -> None: await self.set_state({"received_message": payload}) payload = Message(...) await store.set_received_message(payload)
Note
Validation happens while the store lock is held, so every committed update is validated against the exact state version it will be applied to.
Note
If the resulting state is unchanged, the store remains untouched and an empty patch is recorded in telemetry.
- class junjo.Edge(tail: Node | _NestableWorkflow, head: Node | _NestableWorkflow, condition: Condition[StateT] | None = None)[source]¶
Bases:
objectRepresents a directed edge in the workflow graph.
An edge connects a tail node to a head node, optionally with a condition that determines whether the transition from tail to head should occur.
- Parameters:
tail (Node | _NestableWorkflow) – The source node of the edge where the transition originates.
head (Node | _NestableWorkflow) – The destination node of the edge where the transition leads.
condition (Condition[StateT] | None) – An optional condition that determines whether the transition from
tailtoheadshould occur. IfNone, the transition is always valid.
- async next_node(store: BaseStore) Node | _NestableWorkflow | None[source]¶
Determine the next node in the workflow based on this edge’s condition.
- Parameters:
store (BaseStore) – The store instance to use when resolving the next node.
- Returns:
The next node if the transition is valid, otherwise
None.- Return type:
Node | _NestableWorkflow | None
Telemetry API¶
- class junjo.telemetry.junjo_otel_exporter.JunjoOtelExporter(host: str, port: str, api_key: str, insecure: bool = False)[source]¶
Bases:
objectConfigure Junjo AI Studio OTLP components for an existing OpenTelemetry setup.
Junjo is designed to be compatible with existing OpenTelemetry configurations, by adding to an existing configuration instead of creating a new one.
In normal applications, the tracer provider and meter provider remain the top-level owners of shutdown. Call
TracerProvider.shutdown()andMeterProvider.shutdown()when the process is terminating.flush()is available for manual immediate export when you truly need it, such as in short-lived scripts or tests.shutdown()is a wrapper-local helper that shuts down only the Junjo-owned span processor and metric reader.- Parameters:
Local Development
To send telemetry to a local Junjo AI Studio instance, such as one running through Docker Compose:
import os from junjo.telemetry.junjo_otel_exporter import JunjoOtelExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry import trace # Retrieve API key from environment JUNJO_AI_STUDIO_API_KEY = os.getenv("JUNJO_AI_STUDIO_API_KEY") # Option 1: Using localhost junjo_exporter_local = JunjoOtelExporter( host="localhost", port="50051", api_key=JUNJO_AI_STUDIO_API_KEY, insecure=True, ) # Option 2: Using Docker service name (if running in same Docker network) junjo_exporter_docker = JunjoOtelExporter( host="junjo-ai-studio-ingestion", # Docker service name port="50051", api_key=JUNJO_AI_STUDIO_API_KEY, insecure=True, ) # Add to your tracer provider provider = TracerProvider() provider.add_span_processor(junjo_exporter_local.span_processor) trace.set_tracer_provider(provider)
Production Deployment
For a production environment with TLS enabled, such as one behind a reverse proxy like Caddy:
import os from junjo.telemetry.junjo_otel_exporter import JunjoOtelExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry import trace # Retrieve API key from environment JUNJO_AI_STUDIO_API_KEY = os.getenv("JUNJO_AI_STUDIO_API_KEY") junjo_exporter_prod = JunjoOtelExporter( host="ingestion.junjo.example.com", # Your domain port="443", # HTTPS port api_key=JUNJO_AI_STUDIO_API_KEY, insecure=False, # TLS enabled ) # Add to your tracer provider provider = TracerProvider() provider.add_span_processor(junjo_exporter_prod.span_processor) trace.set_tracer_provider(provider)
Initializes the JunjoOtelExporter.
- property span_processor: BatchSpanProcessor¶
Returns the configured span processor.
- property metric_reader: PeriodicExportingMetricReader¶
Returns the configured metric reader.
- shutdown(timeout_millis: float = 30000) bool[source]¶
Shut down the Junjo-owned telemetry components.
In most applications, the preferred terminal lifecycle is to shut down the owning
TracerProviderandMeterProvider. This helper is provided for cases where you need to shut down only the Junjo-owned span processor and metric reader directly.
- flush(timeout_millis: float = 120000) bool[source]¶
Force a manual drain of pending telemetry.
This method blocks until pending telemetry is exported or the timeout is reached. Use it only when you need an immediate manual export, such as in tests, short-lived scripts, or other environments where the normal provider shutdown lifecycle is not the right fit.
In normal applications, shut down the owning
TracerProviderandMeterProviderinstead of usingflush()as the standard exit path.- Parameters:
timeout_millis (float) – Maximum time to wait for flush in milliseconds. Defaults to 120000ms (120 seconds) to match the exporter timeout and allow for retries.
- Returns:
Trueif all telemetry was flushed successfully,Falseotherwise. Failures are logged through thejunjo.telemetrylogger.- Return type:
- class junjo.workflow._NestableWorkflow(graph_factory: GraphFactory[Graph], store_factory: StoreFactory[StoreT], max_iterations: int = 100, hooks: Hooks | None = None, name: str | None = None)[source]¶
Bases:
Generic[StateT,StoreT,ParentStateT,ParentStoreT]Shared execution implementation for
WorkflowandSubflow.This type should not be used directly. Consumers should instantiate
Workflowor subclassSubflow.- property span_type: JunjoOtelSpanTypes¶
Returns the OpenTelemetry span type for this executable.
- async execute(parent_store: ParentStoreT | None = None, parent_id: str | None = None, *, validate_graph: bool = True) ExecutionResult[StateT][source]¶
Execute the workflow or subflow and return the final execution snapshot.
Each call creates a fresh graph, a fresh store, a fresh run id, and a fresh lifecycle dispatcher. This keeps the workflow definition itself immutable and safe to reuse across concurrent runs.
- Parameters:
parent_store (ParentStoreT | None) – The parent store when executing a subflow. Top-level workflows should omit this argument.
parent_id (str | None) – The parent workflow or subflow identifier when nested.
validate_graph (bool) – Whether to run
Graph.validate()on the fresh graph before execution starts. Defaults toTrue.
- Returns:
A detached snapshot of the completed execution, including the final state and current-scope execution counts.
- Return type:
ExecutionResult[StateT]