OrkaJS
Orka.JS

StateGraph

Construisez des workflows complexes avec état typé, des interruptions human-in-the-loop et lapersistance des checkpoints.

Pourquoi StateGraph ?

StateGraph est un système de workflow en graphe avancé inspiré de LangGraph. Contrairement à GraphWorkflow basique, StateGraph fournit :

Moteur d'État Typé

Security

Validation stricte du schéma via TypeScript. Garantit l'intégrité des données entre les transitions de nœuds.

Intervention Humaine

Control

Interruptions stratégiques pour validation. Suspendez l'exécution pour une révision manuelle avant action.

Checkpoints & Persistance

Resilience

Mémoire tolérante aux pannes. Sauvegarde l'état automatiquement pour reprendre après une interruption.

Streaming d'Événements

UX / Feedback

Logs d'exécution réactifs. Diffusez les tokens et les statuts des nœuds directement vers votre interface.

Utilisation de Base

import { StateGraph, createStateAnnotation, END } from '@orka-js/graph';
 
// 1. Define your state type
interface AgentState {
messages: string[];
currentStep: string;
result: string;
}
 
// 2. Create state annotation with defaults
const stateAnnotation = createStateAnnotation<AgentState>(() => ({
messages: [],
currentStep: '',
result: '',
}));
 
// 3. Build the graph
const graph = new StateGraph<AgentState>({ stateAnnotation })
.addNode('analyze', async (state) => ({
currentStep: 'analyzing',
messages: [...state.messages, 'Analyzing input...'],
}))
.addNode('process', async (state) => ({
currentStep: 'processing',
result: 'Processed: ' + state.messages.join(', '),
}))
.setEntryPoint('analyze')
.addEdge('analyze', 'process')
.setFinishPoint('process');
 
// 4. Compile and run
const compiled = graph.compile();
const result = await compiled.invoke({
messages: ['Hello world'],
});
 
console.log(result.state.result);
// "Processed: Hello world, Analyzing input..."

Reducers d'État

Par défaut, les mises à jour d'état remplacent les valeurs. Utilisez des reducers pour accumuler ou transformer l'état :

import { createStateAnnotation, Reducers } from '@orka-js/graph';
 
interface ChatState {
messages: string[];
tokenCount: number;
metadata: Record<string, unknown>;
}
 
const stateAnnotation = createStateAnnotation<ChatState>(
// Default values
() => ({
messages: [],
tokenCount: 0,
metadata: {},
}),
// Reducers (optional)
{
messages: Reducers.appendList, // Append new messages to array
tokenCount: Reducers.increment, // Add to existing count
metadata: Reducers.mergeObject, // Merge objects
}
);
 
// Now when nodes return partial state, reducers are applied:
// Node returns: { messages: ['New message'], tokenCount: 50 }
// Result: messages = [...oldMessages, 'New message'], tokenCount = oldCount + 50

Reducers Intégrés

Primitive de ReducerLogique OpérationnelleCompatibilité
appendListCollection
[...current, ...update]Array
mergeObjectStructure
{...current, ...update}Object
addToSetUnique
Array.from(new Set())Array
keepLastN(n)Window
slice(-n)Array
incrementCounter
current + updateNumber
max / minLimit
Math.max / Math.minNumber

Arêtes Conditionnelles

Routez l'exécution en fonction des valeurs d'état :

import { StateGraph, END } from '@orka-js/graph';
 
const graph = new StateGraph<AgentState>({ stateAnnotation })
.addNode('classifier', async (state) => ({
category: classifyInput(state.input),
}))
.addNode('technical_support', async (state) => ({
result: await handleTechnical(state),
}))
.addNode('general_support', async (state) => ({
result: await handleGeneral(state),
}))
.setEntryPoint('classifier')
.addConditionalEdges(
'classifier',
// Condition function - returns route key
(state) => state.category === 'technical' ? 'tech' : 'general',
// Path map - route key -> node name
{
tech: 'technical_support',
general: 'general_support',
}
)
.setFinishPoint('technical_support')
.setFinishPoint('general_support');
 
// Can also route to END directly:
.addConditionalEdges(
'check_done',
(state) => state.isDone ? END: 'continue',
{ continue: 'next_step' }
)

Interruptions (Human-in-the-Loop)

Pausez l'exécution pour révision ou saisie humaine. Parfait pour les workflows d'approbation, la modération de contenu, ou les agents interactifs.

import { StateGraph, GraphCheckpointStore } from '@orka-js/graph';
 
const graph = new StateGraph<AgentState>({ stateAnnotation })
.addNode('draft_email', async (state) => ({
draft: await generateEmail(state.request),
}))
.addNode('send_email', async (state) => ({
sent: true,
result: 'Email sent successfully',
}))
.setEntryPoint('draft_email')
.addEdge('draft_email', 'send_email')
.setFinishPoint('send_email');
 
const compiled = graph.compile();
const checkpointer = new GraphCheckpointStore<AgentState>();
 
// First run - interrupt BEFORE send_email for human approval
const result1 = await compiled.invoke(
{ request: 'Send meeting invite' },
{
checkpointer,
threadId: 'email-workflow-123',
interrupt: { before: ['send_email'] },
}
);
 
console.log(result1.interrupted); // true
console.log(result1.state.draft); // Generated email draft
console.log(result1.checkpoint?.id); // Checkpoint ID for resume
 
// Human reviews the draft...
// Then resume execution:
const result2 = await compiled.resume(
result1.checkpoint!.id,
{ checkpointer, threadId: 'email-workflow-123' }
);
 
console.log(result2.state.sent); // true

Reprendre avec Mise à Jour d'État

Modifiez l'état lors de la reprise - parfait pour les corrections humaines :

// Human edited the draft before approving
const result = await compiled.resumeWithState(
checkpoint.id,
{ draft: 'Human-edited email content...' }, // State update
{ checkpointer, threadId: 'email-workflow-123' }
);

Persistance des Checkpoints

Les checkpoints sauvegardent l'état complet d'exécution, vous permettant de reprendre, rejouer, ou brancher des workflows.

import { GraphCheckpointStore } from '@orka-js/graph';
 
const checkpointer = new GraphCheckpointStore<AgentState>();
 
// Run with checkpointing
const result = await compiled.invoke(initialState, {
checkpointer,
threadId: 'my-workflow-session',
});
 
// List all checkpoints for a thread
const checkpoints = await checkpointer.list('my-workflow-session');
 
// Load a specific checkpoint
const checkpoint = await checkpointer.load(checkpointId);
 
// Load the latest checkpoint
const latest = await checkpointer.loadLatest('my-workflow-session');
 
// Delete checkpoints
await checkpointer.delete(checkpointId);
await checkpointer.deleteThread('my-workflow-session');

Note : GraphCheckpointStore est un store en mémoire pour le développement. Pour la production, implémentez l'interface CheckpointStore avec Redis, PostgreSQL, ou un autre stockage persistant.

Exécution en Streaming

Streamez les événements d'exécution pour des mises à jour UI en temps réel :

const compiled = graph.compile();
 
for await (const event of compiled.stream(initialState)) {
switch (event.type) {
case 'node_start':
console.log(`Starting node: ${event.nodeId}`);
break;
case 'node_end':
console.log(`Finished node: ${event.nodeId}`);
break;
case 'state_update':
console.log('State updated:', event.stateUpdate);
break;
case 'interrupt':
console.log('Interrupted at:', event.nodeId);
console.log('Checkpoint:', event.checkpoint?.id);
break;
case 'checkpoint':
console.log('Checkpoint saved:', event.checkpoint?.id);
break;
case 'error':
console.error('Error:', event.error);
break;
case 'done':
console.log('Execution complete');
break;
}
}

Observabilité Automatique avec Tracer

StateGraph peut automatiquement émettre des événements de trace pour chaque exécution de nœud lorsqu'il est configuré avec un Tracer. Parfait pour le débogage et la surveillance des workflows en production.

import { StateGraph, createStateAnnotation } from '@orka-js/graph';
import { Tracer } from '@orka-js/observability';
import { createDevToolsHook } from '@orka-js/devtools';
 
// Create a tracer with DevTools integration
const tracer = new Tracer({
logLevel: 'info',
hooks: [createDevToolsHook()],
});
 
// Pass tracer to StateGraph config
const graph = new StateGraph<AgentState>({
stateAnnotation,
name: 'MyWorkflow',
tracer, // Automatic tracing enabled
})
.addNode('step1', async (state) => ({ result: 'Step 1 done' }))
.addNode('step2', async (state) => ({ result: 'Step 2 done' }))
.setEntryPoint('step1')
.addEdge('step1', 'step2')
.setFinishPoint('step2');
 
// All node executions are automatically traced
const compiled = graph.compile();
const result = await compiled.invoke({ messages: [] });
 
// View traces in DevTools dashboard at http://localhost:3001

Traçage Automatique : Lorsqu'un tracer est configuré, StateGraph émet automatiquement des événements de trace pour le démarrage des nœuds, leur complétion, les erreurs et le cycle de vie du graphe. Chaque événement inclut le timing, les métadonnées et les mises à jour d'état.

Export Diagramme Mermaid

const compiled = graph.compile();
const mermaid = compiled.toMermaid();
 
console.log(mermaid);
// graph TD
// __start__((START))
// __start__ --> analyze
// analyze[analyze]
// process[process]
// __end__((END))
// analyze --> process
// process --> __end__

Exemple Complet : Workflow d'Approbation

import {
StateGraph,
createStateAnnotation,
GraphCheckpointStore,
END
} from '@orka-js/graph';
import { OpenAIAdapter } from '@orka-js/openai';
 
// State definition
interface ApprovalState {
request: string;
analysis: string;
decision: 'approved' | 'rejected' | 'pending';
reason: string;
}
 
const stateAnnotation = createStateAnnotation<ApprovalState>(() => ({
request: '',
analysis: '',
decision: 'pending',
reason: '',
}));
 
const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! });
 
// Build the graph
const graph = new StateGraph<ApprovalState>({ stateAnnotation })
.addNode('analyze', async (state) => {
const result = await llm.generate(
`Analyze this request and provide risk assessment: ${state.request}`
);
return { analysis: result.content };
})
.addNode('auto_approve', async (state) => ({
decision: 'approved',
reason: 'Auto-approved: Low risk request',
}))
.addNode('human_review', async (state) => ({
// This node is a placeholder - human will update state on resume
}))
.addNode('finalize', async (state) => ({
reason: state.decision === 'approved'
? 'Request approved and processed'
: 'Request rejected: ' + state.reason,
}))
.setEntryPoint('analyze')
.addConditionalEdges(
'analyze',
(state) => state.analysis.includes('low risk') ? 'auto' : 'review',
{ auto: 'auto_approve', review: 'human_review' }
)
.addEdge('auto_approve', 'finalize')
.addEdge('human_review', 'finalize')
.setFinishPoint('finalize');
 
// Run with human-in-the-loop
const compiled = graph.compile();
const checkpointer = new GraphCheckpointStore<ApprovalState>();
 
const result = await compiled.invoke(
{ request: 'Access to production database' },
{
checkpointer,
threadId: 'approval-123',
interrupt: { after: ['human_review'] },
}
);
 
if (result.interrupted) {
// Human reviews and makes decision
const finalResult = await compiled.resumeWithState(
result.checkpoint!.id,
{ decision: 'approved', reason: 'Verified by security team' },
{ checkpointer, threadId: 'approval-123' }
);
console.log(finalResult.state.reason);
}

Référence API

Méthode d'InterfacePhase d'ExécutionCœur Fonctionnel
addNode(name, fn)StructureAjouter un nœud avec fonction async
addEdge(from, to)StructureAjouter une arête entre nœuds
addConditionalEdges(from, fn, map)StructureAjouter routage conditionnel
setEntryPoint(name)StructureDéfinir le nœud de départ
setFinishPoint(name)StructureAjouter arête vers END
compile()BuildCompiler le graphe pour exécution
invoke(state, config)RuntimeExécuter le graphe
resume(checkpointId, config)RecoveryReprendre depuis checkpoint
resumeWithState(id, update, config)RecoveryReprendre avec mise à jour d'état
stream(state, config)RuntimeStreamer les événements
toMermaid()AuditExporter diagramme Mermaid