StateGraph
Construisez des workflows complexes avec état typé, des interruptions human-in-the-loop et lapersistance des checkpoints.
Pourquoi StateGraph ?
StateGraph est un système de workflow en graphe avancé inspiré de LangGraph. Contrairement à GraphWorkflow basique, StateGraph fournit :
Moteur d'État Typé
SecurityValidation stricte du schéma via TypeScript. Garantit l'intégrité des données entre les transitions de nœuds.
Intervention Humaine
ControlInterruptions stratégiques pour validation. Suspendez l'exécution pour une révision manuelle avant action.
Checkpoints & Persistance
ResilienceMémoire tolérante aux pannes. Sauvegarde l'état automatiquement pour reprendre après une interruption.
Streaming d'Événements
UX / FeedbackLogs d'exécution réactifs. Diffusez les tokens et les statuts des nœuds directement vers votre interface.
Utilisation de Base
import { StateGraph, createStateAnnotation, END } from '@orka-js/graph'; // 1. Define your state typeinterface AgentState { messages: string[]; currentStep: string; result: string;} // 2. Create state annotation with defaultsconst stateAnnotation = createStateAnnotation<AgentState>(() => ({ messages: [], currentStep: '', result: '',})); // 3. Build the graphconst graph = new StateGraph<AgentState>({ stateAnnotation }) .addNode('analyze', async (state) => ({ currentStep: 'analyzing', messages: [...state.messages, 'Analyzing input...'], })) .addNode('process', async (state) => ({ currentStep: 'processing', result: 'Processed: ' + state.messages.join(', '), })) .setEntryPoint('analyze') .addEdge('analyze', 'process') .setFinishPoint('process'); // 4. Compile and runconst compiled = graph.compile();const result = await compiled.invoke({ messages: ['Hello world'],}); console.log(result.state.result);// "Processed: Hello world, Analyzing input..."Reducers d'État
Par défaut, les mises à jour d'état remplacent les valeurs. Utilisez des reducers pour accumuler ou transformer l'état :
import { createStateAnnotation, Reducers } from '@orka-js/graph'; interface ChatState { messages: string[]; tokenCount: number; metadata: Record<string, unknown>;} const stateAnnotation = createStateAnnotation<ChatState>( // Default values () => ({ messages: [], tokenCount: 0, metadata: {}, }), // Reducers (optional) { messages: Reducers.appendList, // Append new messages to array tokenCount: Reducers.increment, // Add to existing count metadata: Reducers.mergeObject, // Merge objects }); // Now when nodes return partial state, reducers are applied:// Node returns: { messages: ['New message'], tokenCount: 50 }// Result: messages = [...oldMessages, 'New message'], tokenCount = oldCount + 50Reducers Intégrés
| Primitive de Reducer | Logique Opérationnelle | Compatibilité |
|---|---|---|
appendListCollection | [...current, ...update] | Array |
mergeObjectStructure | {...current, ...update} | Object |
addToSetUnique | Array.from(new Set()) | Array |
keepLastN(n)Window | slice(-n) | Array |
incrementCounter | current + update | Number |
max / minLimit | Math.max / Math.min | Number |
Arêtes Conditionnelles
Routez l'exécution en fonction des valeurs d'état :
import { StateGraph, END } from '@orka-js/graph'; const graph = new StateGraph<AgentState>({ stateAnnotation }) .addNode('classifier', async (state) => ({ category: classifyInput(state.input), })) .addNode('technical_support', async (state) => ({ result: await handleTechnical(state), })) .addNode('general_support', async (state) => ({ result: await handleGeneral(state), })) .setEntryPoint('classifier') .addConditionalEdges( 'classifier', // Condition function - returns route key (state) => state.category === 'technical' ? 'tech' : 'general', // Path map - route key -> node name { tech: 'technical_support', general: 'general_support', } ) .setFinishPoint('technical_support') .setFinishPoint('general_support'); // Can also route to END directly:.addConditionalEdges( 'check_done', (state) => state.isDone ? END: 'continue', { continue: 'next_step' })Interruptions (Human-in-the-Loop)
Pausez l'exécution pour révision ou saisie humaine. Parfait pour les workflows d'approbation, la modération de contenu, ou les agents interactifs.
import { StateGraph, GraphCheckpointStore } from '@orka-js/graph'; const graph = new StateGraph<AgentState>({ stateAnnotation }) .addNode('draft_email', async (state) => ({ draft: await generateEmail(state.request), })) .addNode('send_email', async (state) => ({ sent: true, result: 'Email sent successfully', })) .setEntryPoint('draft_email') .addEdge('draft_email', 'send_email') .setFinishPoint('send_email'); const compiled = graph.compile();const checkpointer = new GraphCheckpointStore<AgentState>(); // First run - interrupt BEFORE send_email for human approvalconst result1 = await compiled.invoke( { request: 'Send meeting invite' }, { checkpointer, threadId: 'email-workflow-123', interrupt: { before: ['send_email'] }, }); console.log(result1.interrupted); // trueconsole.log(result1.state.draft); // Generated email draftconsole.log(result1.checkpoint?.id); // Checkpoint ID for resume // Human reviews the draft...// Then resume execution:const result2 = await compiled.resume( result1.checkpoint!.id, { checkpointer, threadId: 'email-workflow-123' }); console.log(result2.state.sent); // trueReprendre avec Mise à Jour d'État
Modifiez l'état lors de la reprise - parfait pour les corrections humaines :
// Human edited the draft before approvingconst result = await compiled.resumeWithState( checkpoint.id, { draft: 'Human-edited email content...' }, // State update { checkpointer, threadId: 'email-workflow-123' });Persistance des Checkpoints
Les checkpoints sauvegardent l'état complet d'exécution, vous permettant de reprendre, rejouer, ou brancher des workflows.
import { GraphCheckpointStore } from '@orka-js/graph'; const checkpointer = new GraphCheckpointStore<AgentState>(); // Run with checkpointingconst result = await compiled.invoke(initialState, { checkpointer, threadId: 'my-workflow-session',}); // List all checkpoints for a threadconst checkpoints = await checkpointer.list('my-workflow-session'); // Load a specific checkpointconst checkpoint = await checkpointer.load(checkpointId); // Load the latest checkpointconst latest = await checkpointer.loadLatest('my-workflow-session'); // Delete checkpointsawait checkpointer.delete(checkpointId);await checkpointer.deleteThread('my-workflow-session');Note : GraphCheckpointStore est un store en mémoire pour le développement. Pour la production, implémentez l'interface CheckpointStore avec Redis, PostgreSQL, ou un autre stockage persistant.
Exécution en Streaming
Streamez les événements d'exécution pour des mises à jour UI en temps réel :
const compiled = graph.compile(); for await (const event of compiled.stream(initialState)) { switch (event.type) { case 'node_start': console.log(`Starting node: ${event.nodeId}`); break; case 'node_end': console.log(`Finished node: ${event.nodeId}`); break; case 'state_update': console.log('State updated:', event.stateUpdate); break; case 'interrupt': console.log('Interrupted at:', event.nodeId); console.log('Checkpoint:', event.checkpoint?.id); break; case 'checkpoint': console.log('Checkpoint saved:', event.checkpoint?.id); break; case 'error': console.error('Error:', event.error); break; case 'done': console.log('Execution complete'); break; }}Observabilité Automatique avec Tracer
StateGraph peut automatiquement émettre des événements de trace pour chaque exécution de nœud lorsqu'il est configuré avec un Tracer. Parfait pour le débogage et la surveillance des workflows en production.
import { StateGraph, createStateAnnotation } from '@orka-js/graph';import { Tracer } from '@orka-js/observability';import { createDevToolsHook } from '@orka-js/devtools'; // Create a tracer with DevTools integrationconst tracer = new Tracer({ logLevel: 'info', hooks: [createDevToolsHook()],}); // Pass tracer to StateGraph configconst graph = new StateGraph<AgentState>({ stateAnnotation, name: 'MyWorkflow', tracer, // Automatic tracing enabled}) .addNode('step1', async (state) => ({ result: 'Step 1 done' })) .addNode('step2', async (state) => ({ result: 'Step 2 done' })) .setEntryPoint('step1') .addEdge('step1', 'step2') .setFinishPoint('step2'); // All node executions are automatically tracedconst compiled = graph.compile();const result = await compiled.invoke({ messages: [] }); // View traces in DevTools dashboard at http://localhost:3001Traçage Automatique : Lorsqu'un tracer est configuré, StateGraph émet automatiquement des événements de trace pour le démarrage des nœuds, leur complétion, les erreurs et le cycle de vie du graphe. Chaque événement inclut le timing, les métadonnées et les mises à jour d'état.
Export Diagramme Mermaid
const compiled = graph.compile();const mermaid = compiled.toMermaid(); console.log(mermaid);// graph TD// __start__((START))// __start__ --> analyze// analyze[analyze]// process[process]// __end__((END))// analyze --> process// process --> __end__Exemple Complet : Workflow d'Approbation
import { StateGraph, createStateAnnotation, GraphCheckpointStore, END } from '@orka-js/graph';import { OpenAIAdapter } from '@orka-js/openai'; // State definitioninterface ApprovalState { request: string; analysis: string; decision: 'approved' | 'rejected' | 'pending'; reason: string;} const stateAnnotation = createStateAnnotation<ApprovalState>(() => ({ request: '', analysis: '', decision: 'pending', reason: '',})); const llm = new OpenAIAdapter({ apiKey: process.env.OPENAI_API_KEY! }); // Build the graphconst graph = new StateGraph<ApprovalState>({ stateAnnotation }) .addNode('analyze', async (state) => { const result = await llm.generate( `Analyze this request and provide risk assessment: ${state.request}` ); return { analysis: result.content }; }) .addNode('auto_approve', async (state) => ({ decision: 'approved', reason: 'Auto-approved: Low risk request', })) .addNode('human_review', async (state) => ({ // This node is a placeholder - human will update state on resume })) .addNode('finalize', async (state) => ({ reason: state.decision === 'approved' ? 'Request approved and processed' : 'Request rejected: ' + state.reason, })) .setEntryPoint('analyze') .addConditionalEdges( 'analyze', (state) => state.analysis.includes('low risk') ? 'auto' : 'review', { auto: 'auto_approve', review: 'human_review' } ) .addEdge('auto_approve', 'finalize') .addEdge('human_review', 'finalize') .setFinishPoint('finalize'); // Run with human-in-the-loopconst compiled = graph.compile();const checkpointer = new GraphCheckpointStore<ApprovalState>(); const result = await compiled.invoke( { request: 'Access to production database' }, { checkpointer, threadId: 'approval-123', interrupt: { after: ['human_review'] }, }); if (result.interrupted) { // Human reviews and makes decision const finalResult = await compiled.resumeWithState( result.checkpoint!.id, { decision: 'approved', reason: 'Verified by security team' }, { checkpointer, threadId: 'approval-123' } ); console.log(finalResult.state.reason);}Référence API
| Méthode d'Interface | Phase d'Exécution | Cœur Fonctionnel |
|---|---|---|
addNode(name, fn) | Structure | Ajouter un nœud avec fonction async |
addEdge(from, to) | Structure | Ajouter une arête entre nœuds |
addConditionalEdges(from, fn, map) | Structure | Ajouter routage conditionnel |
setEntryPoint(name) | Structure | Définir le nœud de départ |
setFinishPoint(name) | Structure | Ajouter arête vers END |
compile() | Build | Compiler le graphe pour exécution |
invoke(state, config) | Runtime | Exécuter le graphe |
resume(checkpointId, config) | Recovery | Reprendre depuis checkpoint |
resumeWithState(id, update, config) | Recovery | Reprendre avec mise à jour d'état |
stream(state, config) | Runtime | Streamer les événements |
toMermaid() | Audit | Exporter diagramme Mermaid |