Core Concepts¶
This page breaks down the fundamental building blocks of the Junjo library. Understanding these concepts is key to effectively designing, building, and debugging your workflows.
State¶
What is it? A BaseState is a Pydantic model that defines the data structure for your workflow’s state. It acts as a centralized, type-safe container for all the data that your workflow will operate on.
Key Characteristics: - Pydantic-Based: Leverages Pydantic for data validation and type hinting. - Immutable in Practice: While the state object itself can be replaced, it is treated as immutable within the workflow. Nodes do not modify the state directly; they request changes through the store.
from junjo import BaseState
class MyWorkflowState(BaseState):
user_input: str
processed_data: dict | None = None
is_complete: bool = False
Store¶
What is it? A BaseStore is a class that manages the state of a workflow. It holds the BaseState and provides methods (often called “actions”) to update the state in a controlled and predictable manner.
Key Characteristics: - State Management: The single source of truth for the workflow’s state. - Redux-Inspired: Follows a pattern where state is updated by dispatching actions, ensuring that state changes are explicit and traceable. - Concurrency Safe: Uses an asyncio.Lock to ensure that state updates are atomic, preventing race conditions.
from junjo import BaseStore
class MyWorkflowStore(BaseStore[MyWorkflowState]):
async def set_processed_data(self, data: dict) -> None:
await self.set_state({"processed_data": data})
async def mark_as_complete(self) -> None:
await self.set_state({"is_complete": True})
Node¶
What is it? A Node represents a single unit of work in your workflow. It’s where your business logic, API calls, or any other operations are executed.
Key Characteristics: - Atomic Unit of Work: Each node should have a single, well-defined responsibility. - Interacts with the Store: Nodes receive the workflow’s store as an argument to their service method, allowing them to read the current state and dispatch actions to update it. - Asynchronous: The service method is an async function, allowing for non-blocking I/O operations.
from junjo import Node
class ProcessDataNode(Node[MyWorkflowStore]):
async def service(self, store: MyWorkflowStore) -> None:
state = await store.get_state()
# Perform some processing on state.user_input
processed_data = {"result": "some_value"}
await store.set_processed_data(processed_data)
Edge¶
What is it? An Edge defines a directed connection between two nodes in a workflow graph. It represents a potential path of execution.
Key Characteristics: - Defines Flow: Edges connect a tail node to a head node, establishing the sequence of operations. - Can be Conditional: An edge can have an associated Condition that determines whether the transition from the tail to the head should occur.
from junjo import Edge
edge = Edge(tail=node1, head=node2)
Condition¶
What is it? A Condition is a class that contains logic to determine whether an Edge should be traversed.
Key Characteristics: - Pure Function of State: A condition’s evaluate method should only depend on the current state of the workflow. It should not have any side effects. - Enables Branching: Conditions are the primary mechanism for creating branching logic in your workflows.
from junjo import Condition
class DataIsProcessed(Condition[MyWorkflowState]):
def evaluate(self, state: MyWorkflowState) -> bool:
return state.processed_data is not None
edge = Edge(tail=node1, head=node2, condition=DataIsProcessed())
Graph¶
What is it? A Graph is a collection of nodes and edges that defines the complete structure of your workflow.
Key Characteristics:
- Source and Sinks: A graph has a single entry point (source) and one or more explicit terminal nodes (sinks).
- Defines the Workflow Structure: The graph is a complete representation of all possible paths of execution in your workflow.
- Compiles To A Structural Snapshot: Graph.compile() returns a canonical compiled view that Junjo uses for validation, serialization, and rendering-oriented graph operations.
- Immutable Definition Object: After construction, Junjo treats the graph shape as immutable so compile-time caching, validation, traversal, and rendering all stay aligned with the same graph definition.
from junjo import Graph
workflow_graph = Graph(
source=start_node,
sinks=[end_node],
edges=[
Edge(tail=start_node, head=process_node),
Edge(tail=process_node, head=end_node, condition=DataIsProcessed())
]
)
# Inspect the normalized structural snapshot for debugging or tooling.
compiled = workflow_graph.compile()
print(compiled.source_node_runtime_id)
print(compiled.sink_node_runtime_ids)
Compiled Graph Snapshot¶
What is it? A compiled graph snapshot is the canonical structural view that Junjo derives from a single Graph instance. It is exposed through Graph.compile() as a CompiledGraph and is the one graph representation Junjo uses for validation, ordered traversal lookups, serialization, and rendering-oriented tooling.
Why it exists
One Structural Source of Truth: Junjo no longer walks raw runtime graph objects differently for validation, serialization, and traversal.
Stable Graph Identity: The compiled snapshot assigns deterministic graph_structural_id, node_structural_id, and edge_structural_id values so identical graph shapes can be correlated across runs.
Explicit Runtime vs Structural Identity: Compiled nodes and edges retain runtime ids for exact execution correlation while also carrying structural ids for graph-shape analysis and telemetry aggregation.
What you get
CompiledGraph: the full compiled snapshot for one graph instance
CompiledNode: normalized node metadata, including both runtime and structural identity
CompiledEdge: normalized edge metadata, including declared edge order and structural identity
compiled = workflow_graph.compile()
print(compiled.graph_structural_id)
for compiled_node in compiled.compiled_nodes:
print(
compiled_node.node_runtime_id,
compiled_node.node_structural_id,
compiled_node.node_type_name,
)
for compiled_edge in compiled.compiled_edges:
print(
compiled_edge.edge_structural_id,
compiled_edge.tail_node_runtime_id,
compiled_edge.head_node_runtime_id,
)
Runtime Identity vs Structural Identity¶
Junjo now exposes two different identity layers for graph executables:
Runtime identity: identifies the exact executable instance used in one execution. This is what lets telemetry and hooks talk about the specific node, subflow, or concurrent executable that actually ran.
Structural identity: identifies the same conceptual graph position across repeated runs of the same graph shape.
This distinction matters because a fresh workflow execution creates a fresh graph. Runtime identities should rotate from run to run, while structural identities should remain stable when the graph shape does not change.
For telemetry and hooks, the most important fields are:
executable_runtime_id: the runtime identity of the executable that fired the span or hookexecutable_structural_id: the stable structural identity of that same executableenclosing_graph_structural_id: the stable structural identity of the compiled graph that contains the executable
This lets Junjo support both:
single-run debugging using runtime ids
cross-run aggregation using structural ids
Workflow¶
What is it? A Workflow is the main executable component that takes a graph_factory and a store_factory and runs the defined process.
Key Characteristics:
- Executable: The Workflow class has an execute method that starts the workflow.
- Manages Execution: It traverses the graph, executing nodes and evaluating conditions, until one of the declared sinks is reached.
- Isolated Execution: Each call to execute uses the provided factories to create a fresh Graph and Store, ensuring that each execution is isolated and concurrency-safe.
- Returns a Snapshot: execute returns an ExecutionResult containing the final state and workflow-local execution counts for that run.
from junjo import Workflow
def create_graph() -> Graph:
# ... (graph creation logic)
return workflow_graph
def create_workflow() -> Workflow[MyWorkflowState, MyWorkflowStore]:
"""Factory function to create a new instance of the workflow."""
return Workflow[MyWorkflowState, MyWorkflowStore](
name="My First Workflow",
graph_factory=create_graph,
store_factory=lambda: MyWorkflowStore(
initial_state=MyWorkflowState(user_input="hello")
)
)
# Create and execute the workflow
sample_workflow = create_workflow()
result = await sample_workflow.execute()
print(result.state.model_dump())
By default, Workflow.execute() validates the freshly created graph before
execution begins. This catches invalid terminal topology, unreachable sinks,
and reachable non-sink dead ends before any nodes run. For targeted testing or
advanced debugging, validation can be disabled per run:
result = await sample_workflow.execute(validate_graph=False)
Passing Parameters to Factories
To provide parameters to your graph_factory or store_factory when you create a Workflow, you can wrap your factory function call in a lambda. This creates a new, argument-less factory that calls your function with the desired parameters when executed.
This is useful for injecting dependencies like configuration objects or API clients into your graph at instantiation time, while preserving concurrency safety.
# Your factory function that requires a dependency
def create_graph_with_dependency(api_key: str) -> Graph:
# ... setup graph using the api_key
return Graph(...)
def create_workflow() -> Workflow[MyState, MyStore]:
# Instantiate the workflow, using a lambda to create the factory
return Workflow[MyState, MyStore](
name="configured_workflow",
graph_factory=lambda: create_graph_with_dependency(
api_key="your-secret-key"
),
store_factory=lambda: MyStore(initial_state=MyState())
)
# The workflow can now be executed normally
workflow = create_workflow()
result = await workflow.execute()
print(result.state.model_dump())