Documentation Index
Fetch the complete documentation index at: https://ai.aidalinfo.fr/llms.txt
Use this file to discover all available pages before exploring further.
createParallelStep et createForEachStep permettent de lancer plusieurs sous-étapes en concurrence. Cette combinaison est idéale pour transformer des listes ou agréger des tâches analytiques.
Vous cherchez à paralléliser des branches entières (plusieurs steps successifs) dans un workflow builder ? Consultez plutôt branchParallel, conçu pour ce cas d’usage différent.
Pipeline de chunking
import {
Chunk,
TChunkDocument,
createForEachStep,
createParallelStep,
createStep,
createWorkflow,
} from "@ai_kit/core";
import { z } from "zod";
const chunkText = createStep<{ text: string }, Chunk[]>({
id: "chunk-text",
description: "Découpe le texte source en segments homogènes",
handler: async ({ input }) => {
const document = TChunkDocument.fromText(input.text);
return document.chunk({
chunkSize: 200,
chunkOverlap: 20,
metadata: { source: "raw-text" },
});
},
});
const embedChunk = createStep<Chunk, number[]>({
id: "embed-chunk",
description: "Calcule un embedding pour un chunk",
handler: async ({ input }) => {
return Array.from({ length: 3 }, (_, i) => input.content.length * (i + 1));
},
});
const tagChunk = createStep<Chunk, string[]>({
id: "tag-chunk",
description: "Extrait des tags clés du chunk",
handler: async ({ input }) => {
return input.content
.split(/[^a-zA-ZÀ-ÿ]+/)
.filter(word => word.length > 4)
.slice(0, 5);
},
});
const processChunk = createParallelStep({
id: "process-chunk",
description: "Lance les tâches analytiques en parallèle pour un chunk",
steps: {
embedding: embedChunk,
tags: tagChunk,
},
});
const foreachChunk = createForEachStep({
id: "foreach-chunk",
description: "Traite chaque chunk en réutilisant le step parallèle",
items: ({ input }) => input,
itemStep: processChunk,
concurrency: 4,
});
export const chunkingWorkflow = createWorkflow({
id: "chunking-parallel-pipeline",
description: "Chunking + traitement parallèle de chaque segment",
inputSchema: z.object({ text: z.string().min(1) }),
outputSchema: z.array(
z.object({
embedding: z.array(z.number()),
tags: z.array(z.string()),
}),
),
})
.then(chunkText)
.then(foreachChunk)
.commit();
createForEachStep renvoie un tableau par défaut : utilisez l’option collect pour fusionner les résultats (ex. concaténation d’embeddings). TChunkDocument assure un chunking homogène et propage la metadata (source: "raw-text").
Gérer la concurrence
concurrency dans createForEachStep limite le nombre d’items traités en parallèle (1 par défaut). Augmentez-le lorsque vos handlers sont I/O-bound.
- Les steps définis dans
createParallelStep s’exécutent simultanément et leurs sorties sont regroupées dans un objet.
- Combinez
parallel et foreach pour optimiser vos pipelines sans sacrifier la lisibilité.
Observabilité
Chaque sous-exécution émet des événements (step:parallel:start, step:parallel:success, etc.). Surveillez-les avec run.watch() pour suivre les branches actives et les temps de traitement.
Besoin de décisions dynamiques ? Consultez les branches conditionnelles. Pour répéter une étape jusqu’à un prédicat, rendez-vous sur les boucles while.