API Reference

Core API

Junjo: A python library for building and managing complex Graph Workflows.

This library provides the building blocks and tools for wrapping python functions into nodes, edges, and graphs that can be executed by a workflow.

This library also produces annotated Opentelemetry Spans to help make sense of execution telemetry.

class junjo.Condition[source]

Bases: Generic[StateT], ABC

Abstract base class for edge conditions in a workflow graph.

Implement a concrete condition that determines whether a transition along an edge should occur based only on the current state.

This is designed to be used with the Edge class, which represents a directed edge in the workflow graph. The condition is evaluated when determining whether to transition from the tail node to the head node.

StateT is the state type that the condition evaluates against. It should be a subclass of BaseState.

Conditions should follow these rules:

  • The condition should be stateless and depend only on the current state.

  • Do not use side effects in the condition, such as network calls or database queries.

  • The condition should be a pure function of the state.

Example

class MyCondition(Condition[MyState]):
    def evaluate(self, state: MyState) -> bool:
        return state.some_property == "some_value"

my_condition = MyCondition()
edges = [
    Edge(tail=node_1, head=node_2, condition=my_condition),
    Edge(tail=node_2, head=node_3),
]
abstractmethod evaluate(state: StateT) bool[source]

Evaluate whether the transition should occur based on the current workflow state.

Parameters:

state (StateT) – The current workflow state.

Returns:

True if the transition should occur, False otherwise.

Return type:

bool

class junjo.Graph(source: Node | _NestableWorkflow, sinks: list[Node | _NestableWorkflow], edges: list[Edge])[source]

Bases: object

Represents a directed graph of nodes and edges, defining the structure and flow of a workflow.

The Graph class is a fundamental component in Junjo, responsible for encapsulating the relationships between different processing units (Nodes or Subflows) and the conditions under which transitions between them occur.

It holds references to the entry point (source) and explicit terminal nodes (sinks) of the graph, as well as a list of all edges that connect the nodes.

Graph definitions are immutable after construction. Junjo stores the source, sinks, and edges as read-only graph-shape metadata so the cached compiled snapshot cannot silently drift away from the graph definition.

Parameters:
  • source (Node | _NestableWorkflow) – The starting node or subflow of the graph. Execution of the workflow begins here.

  • sinks (list[Node | _NestableWorkflow]) – The explicit terminal nodes or subflows of the graph. Execution completes successfully only when one of these executables is reached.

  • edges (list[Edge]) – A list of Edge instances that define the connections and transition logic between nodes in the graph.

Example

from junjo import Node, Edge, Graph, BaseStore, Condition, BaseState

# Define a simple state (can be more complex in real scenarios)
class MyWorkflowState(BaseState):
    count: int | None = None

# Define a simple store
class MyWorkflowStore(BaseStore[MyWorkflowState]):
    async def set_count(self, payload: int) -> None:
        await self.set_state({"count": payload})

# Define some simple nodes
class FirstNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("First Node Executed")

class CountItemsNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        # In a real scenario, you might get items from state and count them
        await store.set_count(5) # Example count
        print("Counted items")

class EvenItemsNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("Path taken for even items count.")

class OddItemsNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("Path taken for odd items count.")

class FinalNode(Node[MyWorkflowStore]):
    async def service(self, store: MyWorkflowStore) -> None:
        print("Final Node Executed")

# Define a condition
class CountIsEven(Condition[MyWorkflowState]):
    def evaluate(self, state: MyWorkflowState) -> bool:
        if state.count is None:
            return False
        return state.count % 2 == 0

# Instantiate the nodes
first_node = FirstNode()
count_items_node = CountItemsNode()
even_items_node = EvenItemsNode()
odd_items_node = OddItemsNode()
final_node = FinalNode()

# Create the workflow graph
workflow_graph = Graph(
    source=first_node,
    sinks=[final_node],
    edges=[
        Edge(tail=first_node, head=count_items_node),
        Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()),
        Edge(tail=count_items_node, head=odd_items_node), # Fallback
        Edge(tail=even_items_node, head=final_node),
        Edge(tail=odd_items_node, head=final_node),
    ]
)
property source: Node | _NestableWorkflow

Return the immutable source executable for this graph definition.

property sinks: tuple[Node | _NestableWorkflow, ...]

Return the immutable terminal executables for this graph definition.

property edges: tuple[Edge, ...]

Return the immutable ordered edge set for this graph definition.

compile() CompiledGraph[source]

Compile this graph into one canonical structural snapshot.

The compiled snapshot is cached per Graph instance and becomes the structural source of truth for validation, traversal, and serialization.

Compiling a graph does not execute node logic or evaluate edge conditions. It only normalizes the graph structure into a single, immutable representation.

Returns:

The compiled structural snapshot for this graph instance.

Return type:

CompiledGraph

Raises:

GraphCompilationError – If Junjo cannot build a consistent structural representation of the graph.

validate() None[source]

Validate the graph’s structural shape and declared terminal nodes.

This validation pass is intentionally structural. It does not execute node logic or edge conditions. Instead, it checks the graph topology using the declared edges, source, and sinks.

Validation currently enforces:

  • every declared sink is reachable from the source

  • declared sinks do not have outgoing edges

  • every reachable non-sink node has at least one outgoing edge

  • nested subflow graphs validate recursively

Cycles are allowed as long as the graph still has a reachable sink and no reachable non-sink dead ends.

Raises:

GraphValidationError – If the graph shape violates Junjo’s current validation rules.

async get_next_node(store: BaseStore, current_node: Node | _NestableWorkflow) Node | _NestableWorkflow[source]

Retrieve the next node or subflow in the graph for the given current executable.

This method checks the edges connected to the current executable and resolves the next executable based on the conditions defined in those edges.

Junjo uses ordered first-match traversal semantics:

  • outgoing edges are considered in the order they were declared

  • the first edge whose condition resolves to a next executable wins

  • later edges are not evaluated once a match is found

If no outgoing edge resolves and the current executable is not already a declared sink, this method raises GraphValidationError.

Parameters:
  • store (BaseStore) – The store instance to use for resolving the next executable.

  • current_node (Node | _NestableWorkflow) – The current node or subflow in the graph.

Returns:

The next node or subflow in the graph.

Return type:

Node | _NestableWorkflow

serialize_to_json_string() str[source]

Convert the graph to a neutral serialized JSON string.

The serialized representation treats RunConcurrent instances as subgraphs and includes nested subflow graphs as well.

The serialized payload includes explicit runtime and structural identities for the graph, nodes, and edges. Nested subflow nodes also include their child graph structural id plus explicit source and sink runtime and structural ids.

Returns:

A JSON string containing the graph structure.

Return type:

str

Raises:

GraphSerializationError – If the graph payload cannot be converted into JSON.

to_mermaid() str[source]

Render the graph as Mermaid flowchart syntax.

Mermaid rendering consumes the compiled graph snapshot directly instead of routing through serialized JSON. The returned string contains one flowchart with:

  • the overview graph

  • concurrent executables rendered as Mermaid subgraphs

  • disconnected detail subgraphs for each subflow

Node, concurrent, and subflow identifiers are structural, which keeps Mermaid output stable across repeated fresh graph builds with the same topology.

Returns:

Mermaid flowchart syntax for the compiled graph.

Return type:

str

to_dot_notation() str[source]

Render the Junjo graph as a main overview digraph plus one additional digraph for each subflow.

DOT rendering consumes the compiled graph snapshot directly instead of routing through serialized JSON. Rendered node and subflow identifiers are structural, which keeps DOT output stable across repeated fresh graph builds with the same topology.

export_graphviz_assets(out_dir: str | Path = 'graphviz_out', fmt: str = 'svg', dot_cmd: str = 'dot', open_html: bool = False, clean: bool = True) dict[str, Path][source]

Render every digraph produced by to_dot_notation() and build a gallery HTML page whose headings use the human labels (e.g. “SampleSubflow”) instead of raw digraph identifiers.

Graphviz export renders directly from the compiled graph snapshot. It does not depend on the serialized graph JSON payload, which keeps the rendering path aligned with the same canonical structure used for validation and traversal.

Returns:

An ordered mapping of digraph name to rendered file path, in encounter order.

Return type:

dict[str, Path]

class junjo.CompiledGraph(graph_structural_id: str, source_node_runtime_id: str, sink_node_runtime_ids: tuple[str, ...], compiled_nodes: tuple[CompiledNode, ...], compiled_nodes_by_runtime_id: Mapping[str, CompiledNode], compiled_edges: tuple[CompiledEdge, ...], outgoing_compiled_edges_by_tail_runtime_id: Mapping[str, tuple[CompiledEdge, ...]], reachable_node_runtime_ids: frozenset[str])[source]

Bases: object

The canonical structural representation of a single Graph instance.

A compiled graph is immutable and normalized for graph-facing features:

  • validation

  • traversal adjacency lookups

  • serialization

  • rendering

Runtime graph objects still define the graph, but compiled snapshots are the single structural source of truth for all graph operations.

graph_structural_id: str
source_node_runtime_id: str
sink_node_runtime_ids: tuple[str, ...]
compiled_nodes: tuple[CompiledNode, ...]
compiled_nodes_by_runtime_id: Mapping[str, CompiledNode]
compiled_edges: tuple[CompiledEdge, ...]
outgoing_compiled_edges_by_tail_runtime_id: Mapping[str, tuple[CompiledEdge, ...]]
reachable_node_runtime_ids: frozenset[str]
class junjo.CompiledNode(node_runtime_id: str, node_structural_id: str, node_type_name: str, node_label: str, node_runtime_ref: Node | _NestableWorkflow, is_concurrent_subgraph: bool = False, is_subflow: bool = False, child_node_runtime_ids: tuple[str, ...] = (), compiled_subflow_graph: CompiledGraph | None = None)[source]

Bases: object

A normalized structural node within a compiled graph snapshot.

Compiled nodes capture the graph-facing metadata Junjo needs for validation, serialization, and rendering while preserving the original runtime node or subflow reference for execution-time operations.

node_runtime_id: str
node_structural_id: str
node_type_name: str
node_label: str
node_runtime_ref: Node | _NestableWorkflow
is_concurrent_subgraph: bool = False
is_subflow: bool = False
child_node_runtime_ids: tuple[str, ...] = ()
compiled_subflow_graph: CompiledGraph | None = None
class junjo.CompiledEdge(edge_structural_id: str, edge_ordinal: int, tail_node_runtime_id: str, tail_node_structural_id: str, head_node_runtime_id: str, head_node_structural_id: str, edge_condition_label: str | None, edge_runtime_ref: Edge)[source]

Bases: object

A normalized structural edge within a compiled graph snapshot.

Compiled edges preserve the original declared ordering through edge_ordinal and keep a reference to the runtime Edge object so traversal can still evaluate conditions against the run-local store.

edge_structural_id: str
edge_ordinal: int
tail_node_runtime_id: str
tail_node_structural_id: str
head_node_runtime_id: str
head_node_structural_id: str
edge_condition_label: str | None
edge_runtime_ref: Edge
exception junjo.GraphValidationError[source]

Bases: GraphError, ValueError

Raised when a graph shape or traversal outcome violates Junjo’s graph rules.

This includes invalid constructor inputs such as an empty sinks list and runtime traversal situations where execution dead-ends on a non-sink node.

exception junjo.GraphCompilationError[source]

Bases: GraphError

Raised when a graph cannot be compiled into a canonical structural form.

Compilation failures are structural normalization failures, not traversal or validation failures. This exception is used when Junjo cannot produce a consistent compiled graph snapshot from the runtime graph definition.

exception junjo.GraphSerializationError[source]

Bases: GraphError

Raised when a graph cannot be converted into its serialized JSON form.

This exception is used when Junjo successfully builds an in-memory graph payload, but json.dumps cannot serialize part of that payload. This is typically caused by attaching non-JSON-serializable values to graph-facing metadata such as node labels.

exception junjo.GraphRenderError[source]

Bases: GraphError

Raised when a graph cannot be rendered into an output format.

This includes Graphviz command failures and failures while producing Mermaid or DOT output from the compiled graph snapshot.

class junjo.Hooks[source]

Bases: object

Registry for optional Junjo lifecycle callbacks.

Hooks are observers. They do not create spans or control workflow execution. If a hook callback raises, Junjo keeps execution isolated and continues dispatching the remaining callbacks for that hook.

To use them, register one or more callbacks and pass the registry to a workflow or subflow.

Example

import logging

hooks = Hooks()
logger = logging.getLogger(__name__)

def log_completed(event: WorkflowCompletedEvent[MyState]) -> None:
    logger.info("%s %s", event.hook_name, event.result.state.model_dump())

hooks.on_workflow_completed(log_completed)

workflow = Workflow(..., hooks=hooks)
on_workflow_started(callback: Callable[[WorkflowStartedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for the start of a top-level workflow run.

on_workflow_completed(callback: Callable[[WorkflowCompletedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for successful workflow completion.

on_workflow_failed(callback: Callable[[WorkflowFailedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for workflow failures.

on_workflow_cancelled(callback: Callable[[WorkflowCancelledEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for workflow cancellation.

on_subflow_started(callback: Callable[[SubflowStartedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for subflow start events.

on_subflow_completed(callback: Callable[[SubflowCompletedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for successful subflow completion.

on_subflow_failed(callback: Callable[[SubflowFailedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for subflow failures.

on_subflow_cancelled(callback: Callable[[SubflowCancelledEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for subflow cancellation.

on_node_started(callback: Callable[[NodeStartedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for node start events.

on_node_completed(callback: Callable[[NodeCompletedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for successful node completion.

on_node_failed(callback: Callable[[NodeFailedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for node failures.

on_node_cancelled(callback: Callable[[NodeCancelledEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for node cancellation.

on_run_concurrent_started(callback: Callable[[RunConcurrentStartedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for RunConcurrent start events.

on_run_concurrent_completed(callback: Callable[[RunConcurrentCompletedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for successful RunConcurrent completion.

on_run_concurrent_failed(callback: Callable[[RunConcurrentFailedEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for RunConcurrent failures.

on_run_concurrent_cancelled(callback: Callable[[RunConcurrentCancelledEvent], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for RunConcurrent cancellation.

on_state_changed(callback: Callable[[StateChangedEvent[StateT]], None | Awaitable[None]]) Callable[[], None][source]

Register a callback for committed state updates.

class junjo.GraphFactory(*args, **kwargs)[source]

Bases: Protocol, Generic[_CovariantGraphT]

A callable that returns a new instance of a workflow’s graph.

This factory is invoked at the beginning of each execute() or execute() call to ensure a fresh, isolated graph for that execution. This is critical for concurrency safety because nodes and subflows are runtime objects with per-run identities.

class junjo.StoreFactory(*args, **kwargs)[source]

Bases: Protocol, Generic[_CovariantStoreT]

A callable that returns a new instance of a workflow’s store.

This factory is invoked at the beginning of each execute() or execute() call to ensure fresh, isolated state for that execution.

class junjo.ExecutionResult(run_id: str, definition_id: str, name: str, state: StateT, node_execution_counts: Mapping[str, int])[source]

Bases: Generic[StateT]

Immutable snapshot of a completed workflow or subflow execution.

ExecutionResult is the public post-run API for accessing final state and execution metadata without exposing live runtime objects like the internal store or graph.

The result includes:

  • run_id: The unique identifier for this specific execution.

  • definition_id: The stable identifier of the workflow or subflow definition.

  • name: The configured workflow or subflow name.

  • state: The detached final state snapshot for the completed execution.

  • node_execution_counts: Current-scope execution counts keyed by executable id.

run_id: str
definition_id: str
name: str
state: StateT
node_execution_counts: Mapping[str, int]
class junjo.Workflow(graph_factory: GraphFactory[Graph], store_factory: StoreFactory[StoreT], max_iterations: int = 100, hooks: Hooks | None = None, name: str | None = None)[source]

Bases: _NestableWorkflow[StateT, StoreT, None, None]

A Workflow is a top-level, executable collection of nodes and edges arranged as a graph. It manages its own state and store, distinct from any parent or sub-workflows.

This class is generic and requires two type parameters for a convenient and type-safe developer experience:

StateT is the type of state managed by this workflow and should be a subclass of BaseState. StoreT is the store type used by this workflow and should be a subclass of BaseStore.

Every call to execute() creates a fresh graph, a fresh store, and a fresh execution context. That makes a Workflow instance a reusable definition or blueprint rather than a mutable live-run container.

Parameters:
  • name (str | None, optional) – An optional name for the workflow. If not provided, the class name is used.

  • graph_factory (GraphFactory[Graph]) – A callable that returns a new instance of the workflow’s graph (Graph). This factory is invoked at the beginning of each execute() call to ensure a fresh, isolated graph for that execution.

  • store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the workflow’s store (StoreT). This factory is invoked at the beginning of each execute() call to ensure a fresh store for that execution.

  • max_iterations (int, optional) – The maximum number of times any single node or executable may run within one workflow execution. This helps detect accidental loops. Defaults to 100.

  • hooks (Hooks | None, optional) – An optional Hooks registry for observing workflow lifecycle events. Hooks are optional observers; they do not control OpenTelemetry instrumentation or workflow execution.

Example without hooks

workflow = Workflow[MyState, MyStore](
    name="demo_base_workflow",
    graph_factory=create_my_graph,
    store_factory=lambda: MyStore(initial_state=MyState()),
)

result = await workflow.execute()
print(result.state.model_dump_json())

Example with hooks

import logging

hooks = Hooks()
logger = logging.getLogger(__name__)

def log_completed(event) -> None:
    logger.info(
        "workflow completed %s %s %s",
        event.name,
        event.run_id,
        event.result.state.model_dump(),
    )

hooks.on_workflow_completed(log_completed)

workflow = Workflow[MyState, MyStore](
    name="configured_workflow",
    graph_factory=create_my_graph,
    store_factory=lambda: MyStore(initial_state=MyState()),
    hooks=hooks,
)

Passing Parameters to Factories

To provide parameters to your graph_factory or store_factory when you create a workflow, wrap the factory call in a lambda. This creates an argument-less factory that closes over the dependencies you want to inject while preserving the fresh-per-run execution model.

def create_graph_with_dependency(emulator: Emulator) -> Graph:
    return Graph(...)

my_emulator = Emulator()

workflow = Workflow[MyState, MyStore](
    name="configured_workflow",
    graph_factory=lambda: create_graph_with_dependency(my_emulator),
    store_factory=lambda: MyStore(initial_state=MyState()),
)
class junjo.Subflow(graph_factory: GraphFactory[Graph], store_factory: StoreFactory[StoreT], max_iterations: int = 100, hooks: Hooks | None = None, name: str | None = None)[source]

Bases: _NestableWorkflow[StateT, StoreT, ParentStateT, ParentStoreT], ABC

A Subflow is a workflow that:

  • Executes within a parent workflow or parent subflow.

  • Has its own isolated state and store.

  • Can interact with its parent workflow state before and after execution via pre_run_actions() and post_run_actions().

Like top-level workflows, subflows create a fresh graph and a fresh store for every execution. The child run is isolated from the parent store except for the explicit handoff points provided by the pre- and post-run hooks.

Parameters:
  • name (str | None, optional) – An optional name for the subflow. If not provided, the class name is used.

  • graph_factory (GraphFactory[Graph]) – A callable that returns a new instance of the subflow’s graph (Graph). This factory is invoked at the beginning of each execute() call to ensure a fresh, isolated graph for that execution.

  • store_factory (StoreFactory[StoreT]) – A callable that returns a new instance of the subflow’s store (StoreT). This factory is invoked at the beginning of each execute() call to ensure a fresh store for that execution.

  • max_iterations (int, optional) – The maximum number of times any single node or executable may run within one subflow execution. Defaults to 100.

  • hooks (Hooks | None, optional) – An optional Hooks registry for observing lifecycle events emitted by this subflow.

Example

class ExampleSubflow(
    Subflow[SubflowState, SubflowStore, ParentState, ParentStore]
):
    async def pre_run_actions(self, parent_store, subflow_store):
        parent_state = await parent_store.get_state()
        await subflow_store.set_parameter(
            {"parameter": parent_state.parameter}
        )

    async def post_run_actions(self, parent_store, subflow_store):
        subflow_state = await subflow_store.get_state()
        await parent_store.set_subflow_result(subflow_state.result)

example_subflow = ExampleSubflow(
    graph_factory=create_example_subflow_graph,
    store_factory=lambda: ExampleSubflowStore(
        initial_state=ExampleSubflowState()
    ),
)
abstractmethod async pre_run_actions(parent_store: ParentStoreT, subflow_store: StoreT) None[source]

This method is called before the subflow has run.

This is where you can pass initial state values from the parent workflow to the subflow store for this specific run.

Parameters:
  • parent_store (ParentStoreT) – The parent store to interact with.

  • subflow_store (StoreT) – The store for this specific subflow execution.

abstractmethod async post_run_actions(parent_store: ParentStoreT, subflow_store: StoreT) None[source]

This method is called after the subflow has run.

This is where you can update the parent store with the results of the child workflow.

Parameters:
  • parent_store (ParentStoreT) – The parent store to update.

  • subflow_store (StoreT) – The store for this specific subflow execution.

class junjo.Node[source]

Bases: Generic[StoreT], ABC

Nodes are the building blocks of a workflow. They represent a single unit of work that can be executed within the context of a workflow.

Place business logic to be executed by the node in service(). Junjo wraps that service method with OpenTelemetry tracing, error handling, and lifecycle hook dispatch during execute().

The Node type is meant to remain decoupled from your application’s domain logic. While you can place business logic directly in the service() method, it is recommended that you call a service function located in a separate module. This keeps nodes easy to test, easier to understand, and focused on orchestration rather than implementation detail.

StoreT is the workflow store type that will be passed into this node during execution.

Nodes have three main responsibilities:

  • The workflow passes the run-local store to the node’s execute() method.

  • execute() manages tracing, lifecycle hooks, and error handling.

  • service() performs the side effects for this unit of work.

Example implementation

class SaveMessageNode(Node[MessageWorkflowStore]):
    async def service(self, store) -> None:
        state = await store.get_state()
        sentiment = await get_message_sentiment(state.message)
        await store.set_message_sentiment(sentiment)
property id: str

Returns the unique identifier for the node.

property name: str

Returns the name of the node class instance.

property patches: list[JsonPatch]

Returns the list of state patches that have been applied by this node.

add_patch(patch: JsonPatch) None[source]

Adds a patch to the list of patches.

abstractmethod async service(store: StoreT) None[source]

This is the main logic of the node.

The concrete implementation of this method should contain the side effects that this node will perform. The method is called by execute(), which is responsible for tracing, lifecycle dispatch, and error handling.

The service() method should not be called directly. Instead, it should be called by the execute() method of the node.

DO NOT EXECUTE node.service() DIRECTLY! Use node.execute() instead.

Parameters:

store (StoreT) – The run-local store passed to the node’s service function.

async execute(store: StoreT, parent_id: str) None[source]

Execute the node’s service() method with tracing and lifecycle dispatch.

This method acquires a tracer, opens a node span, emits lifecycle events, and then calls service(). Terminal lifecycle hooks are dispatched before the span closes so hook failure telemetry can stay attached to the real node span.

Parameters:
  • store (StoreT) – The run-local store for the current workflow execution.

  • parent_id (str) – The identifier of the parent workflow or subflow. This becomes the parent id recorded on the node span.

class junjo.RunConcurrent(name: str, items: list[Node | Subflow])[source]

Bases: Node

Execute a list of nodes or subflows concurrently.

An instance of RunConcurrent can be added to a workflow graph the same way as any other node. Under the hood it starts one task per child item and waits for them to settle.

If one child fails, Junjo cancels all still-pending siblings and re-raises the original failure. Cancelled siblings are marked as cancelled in telemetry rather than as errors so traces tell the full story of the failure boundary.

Parameters:
  • name (str) – The name of this collection of concurrently executed nodes.

  • items (list[Node | Subflow]) – A list of nodes or subflows to execute concurrently.

node_1 = NodeOne()
node_2 = NodeTwo()
node_3 = NodeThree()

run_concurrent = RunConcurrent(
    name="Concurrent Execution",
    items=[node_1, node_2, node_3],
)
property id: str

Returns the unique identifier for the node or subflow.

property name: str

Returns the configured name of this concurrent execution group.

async service(store: BaseStore) None[source]

Execute the provided nodes and subflows concurrently.

Child items receive the same run-local store. If one item fails, all still-pending siblings are cancelled and the original failure is re-raised once the cancellations have been drained.

async execute(store: BaseStore, parent_id: str) None[source]

Execute the RunConcurrent node with tracing and lifecycle dispatch.

This wraps service() with a dedicated run-concurrent span and the same started/completed/failed/cancelled lifecycle semantics used by ordinary nodes.

Parameters:
  • store (BaseStore) – The run-local store for the current workflow execution.

  • parent_id (str) – The parent workflow or subflow identifier.

class junjo.BaseState[source]

Bases: BaseModel

Common base for states, with no unknown fields allowed.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class junjo.BaseStore(initial_state: StateT)[source]

Bases: Generic[StateT]

BaseStore represents a generic store for managing the state of a workflow. It is designed to be subclassed with a specific state type (a Pydantic model derived from BaseState).

The store is responsible for:

  • Managing the state of a workflow or subflow execution.

  • Making immutable updates to that state safely in a concurrent environment.

  • Validating committed updates against the underlying Pydantic model.

The store uses an asyncio.Lock to ensure that state updates are concurrency-safe. Each committed update is derived, validated, and applied against the exact locked state version it modifies, which prevents stale validate-then-apply races under concurrent execution.

Subclass BaseStore with your own state type and expose domain-specific actions that call set_state().

Example

class MyWorkflowState(BaseState):
    user_input: str
    processed_data: dict | None = None

class MyWorkflowStore(BaseStore[MyWorkflowState]):
    async def set_processed_data(self, data: dict) -> None:
        await self.set_state({"processed_data": data})
Parameters:

initial_state (StateT) – The initial state of the store.

property id: str

Returns the unique identifier of this store instance.

async get_state() StateT[source]

Return a detached deep copy of the current state.

This method follows the immutability principle for read access: callers receive a deep snapshot that can be inspected freely without mutating the live store. Store updates must still flow through store actions and set_state().

async get_state_json() str[source]

Return the current state as a JSON string.

This is useful for logging, tracing, or serializing the current state without manually calling model_dump_json on a snapshot.

async set_state(update: dict) None[source]

Update the store’s state with a dictionary of changes.

The update is merged with the current locked state, validated against the store’s Pydantic model, and committed atomically if it changes the state. This method also emits OpenTelemetry set_state events and lifecycle state-changed hooks when a commit occurs.

Parameters:

update (dict) – A dictionary of updates to apply to the state.

class MessageWorkflowState(BaseState):
    received_message: Message

class MessageWorkflowStore(BaseStore[MessageWorkflowState]):
    async def set_received_message(self, payload: Message) -> None:
        await self.set_state({"received_message": payload})

payload = Message(...)
await store.set_received_message(payload)

Note

Validation happens while the store lock is held, so every committed update is validated against the exact state version it will be applied to.

Note

If the resulting state is unchanged, the store remains untouched and an empty patch is recorded in telemetry.

class junjo.Edge(tail: Node | _NestableWorkflow, head: Node | _NestableWorkflow, condition: Condition[StateT] | None = None)[source]

Bases: object

Represents a directed edge in the workflow graph.

An edge connects a tail node to a head node, optionally with a condition that determines whether the transition from tail to head should occur.

Parameters:
  • tail (Node | _NestableWorkflow) – The source node of the edge where the transition originates.

  • head (Node | _NestableWorkflow) – The destination node of the edge where the transition leads.

  • condition (Condition[StateT] | None) – An optional condition that determines whether the transition from tail to head should occur. If None, the transition is always valid.

async next_node(store: BaseStore) Node | _NestableWorkflow | None[source]

Determine the next node in the workflow based on this edge’s condition.

Parameters:

store (BaseStore) – The store instance to use when resolving the next node.

Returns:

The next node if the transition is valid, otherwise None.

Return type:

Node | _NestableWorkflow | None

Telemetry API

class junjo.telemetry.junjo_otel_exporter.JunjoOtelExporter(host: str, port: str, api_key: str, insecure: bool = False)[source]

Bases: object

Configure Junjo AI Studio OTLP components for an existing OpenTelemetry setup.

Junjo is designed to be compatible with existing OpenTelemetry configurations, by adding to an existing configuration instead of creating a new one.

In normal applications, the tracer provider and meter provider remain the top-level owners of shutdown. Call TracerProvider.shutdown() and MeterProvider.shutdown() when the process is terminating.

flush() is available for manual immediate export when you truly need it, such as in short-lived scripts or tests. shutdown() is a wrapper-local helper that shuts down only the Junjo-owned span processor and metric reader.

Parameters:
  • host (str) – The hostname of the Junjo AI Studio.

  • port (str) – The port of the Junjo AI Studio.

  • api_key (str) – The API key for the Junjo AI Studio.

  • insecure (bool) – Whether to allow insecure connections to the Junjo AI Studio. Defaults to False.

Local Development

To send telemetry to a local Junjo AI Studio instance, such as one running through Docker Compose:

import os
from junjo.telemetry.junjo_otel_exporter import JunjoOtelExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry import trace

# Retrieve API key from environment
JUNJO_AI_STUDIO_API_KEY = os.getenv("JUNJO_AI_STUDIO_API_KEY")

# Option 1: Using localhost
junjo_exporter_local = JunjoOtelExporter(
    host="localhost",
    port="50051",
    api_key=JUNJO_AI_STUDIO_API_KEY,
    insecure=True,
)

# Option 2: Using Docker service name (if running in same Docker network)
junjo_exporter_docker = JunjoOtelExporter(
    host="junjo-ai-studio-ingestion",  # Docker service name
    port="50051",
    api_key=JUNJO_AI_STUDIO_API_KEY,
    insecure=True,
)

# Add to your tracer provider
provider = TracerProvider()
provider.add_span_processor(junjo_exporter_local.span_processor)
trace.set_tracer_provider(provider)

Production Deployment

For a production environment with TLS enabled, such as one behind a reverse proxy like Caddy:

import os
from junjo.telemetry.junjo_otel_exporter import JunjoOtelExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry import trace

# Retrieve API key from environment
JUNJO_AI_STUDIO_API_KEY = os.getenv("JUNJO_AI_STUDIO_API_KEY")

junjo_exporter_prod = JunjoOtelExporter(
    host="ingestion.junjo.example.com",  # Your domain
    port="443",  # HTTPS port
    api_key=JUNJO_AI_STUDIO_API_KEY,
    insecure=False,  # TLS enabled
)

# Add to your tracer provider
provider = TracerProvider()
provider.add_span_processor(junjo_exporter_prod.span_processor)
trace.set_tracer_provider(provider)

Initializes the JunjoOtelExporter.

property span_processor: BatchSpanProcessor

Returns the configured span processor.

property metric_reader: PeriodicExportingMetricReader

Returns the configured metric reader.

shutdown(timeout_millis: float = 30000) bool[source]

Shut down the Junjo-owned telemetry components.

In most applications, the preferred terminal lifecycle is to shut down the owning TracerProvider and MeterProvider. This helper is provided for cases where you need to shut down only the Junjo-owned span processor and metric reader directly.

Parameters:

timeout_millis (float) – Maximum time to wait for metric reader shutdown in milliseconds. Defaults to 30000.

Returns:

True if both components shut down cleanly, False if either shutdown path raises. Failures are logged through the junjo.telemetry logger.

Return type:

bool

flush(timeout_millis: float = 120000) bool[source]

Force a manual drain of pending telemetry.

This method blocks until pending telemetry is exported or the timeout is reached. Use it only when you need an immediate manual export, such as in tests, short-lived scripts, or other environments where the normal provider shutdown lifecycle is not the right fit.

In normal applications, shut down the owning TracerProvider and MeterProvider instead of using flush() as the standard exit path.

Parameters:

timeout_millis (float) – Maximum time to wait for flush in milliseconds. Defaults to 120000ms (120 seconds) to match the exporter timeout and allow for retries.

Returns:

True if all telemetry was flushed successfully, False otherwise. Failures are logged through the junjo.telemetry logger.

Return type:

bool

class junjo.workflow._NestableWorkflow(graph_factory: GraphFactory[Graph], store_factory: StoreFactory[StoreT], max_iterations: int = 100, hooks: Hooks | None = None, name: str | None = None)[source]

Bases: Generic[StateT, StoreT, ParentStateT, ParentStoreT]

Shared execution implementation for Workflow and Subflow.

This type should not be used directly. Consumers should instantiate Workflow or subclass Subflow.

property id: str

Returns the stable definition identifier for this workflow object.

property name: str

Returns the configured workflow name or the class name.

property span_type: JunjoOtelSpanTypes

Returns the OpenTelemetry span type for this executable.

async execute(parent_store: ParentStoreT | None = None, parent_id: str | None = None, *, validate_graph: bool = True) ExecutionResult[StateT][source]

Execute the workflow or subflow and return the final execution snapshot.

Each call creates a fresh graph, a fresh store, a fresh run id, and a fresh lifecycle dispatcher. This keeps the workflow definition itself immutable and safe to reuse across concurrent runs.

Parameters:
  • parent_store (ParentStoreT | None) – The parent store when executing a subflow. Top-level workflows should omit this argument.

  • parent_id (str | None) – The parent workflow or subflow identifier when nested.

  • validate_graph (bool) – Whether to run Graph.validate() on the fresh graph before execution starts. Defaults to True.

Returns:

A detached snapshot of the completed execution, including the final state and current-scope execution counts.

Return type:

ExecutionResult[StateT]