Passer au contenu principal
createParallelStep et createForEachStep permettent de lancer plusieurs sous-étapes en concurrence. Cette combinaison est idéale pour transformer des listes ou agréger des tâches analytiques.
Vous cherchez à paralléliser des branches entières (plusieurs steps successifs) dans un workflow builder ? Consultez plutôt branchParallel, conçu pour ce cas d’usage différent.

Pipeline de chunking

import {
  Chunk,
  TChunkDocument,
  createForEachStep,
  createParallelStep,
  createStep,
  createWorkflow,
} from "@ai_kit/core";
import { z } from "zod";

const chunkText = createStep<{ text: string }, Chunk[]>({
  id: "chunk-text",
  description: "Découpe le texte source en segments homogènes",
  handler: async ({ input }) => {
    const document = TChunkDocument.fromText(input.text);
    return document.chunk({
      chunkSize: 200,
      chunkOverlap: 20,
      metadata: { source: "raw-text" },
    });
  },
});

const embedChunk = createStep<Chunk, number[]>({
  id: "embed-chunk",
  description: "Calcule un embedding pour un chunk",
  handler: async ({ input }) => {
    return Array.from({ length: 3 }, (_, i) => input.content.length * (i + 1));
  },
});

const tagChunk = createStep<Chunk, string[]>({
  id: "tag-chunk",
  description: "Extrait des tags clés du chunk",
  handler: async ({ input }) => {
    return input.content
      .split(/[^a-zA-ZÀ-ÿ]+/)
      .filter(word => word.length > 4)
      .slice(0, 5);
  },
});

const processChunk = createParallelStep({
  id: "process-chunk",
  description: "Lance les tâches analytiques en parallèle pour un chunk",
  steps: {
    embedding: embedChunk,
    tags: tagChunk,
  },
});

const foreachChunk = createForEachStep({
  id: "foreach-chunk",
  description: "Traite chaque chunk en réutilisant le step parallèle",
  items: ({ input }) => input,
  itemStep: processChunk,
  concurrency: 4,
});

export const chunkingWorkflow = createWorkflow({
  id: "chunking-parallel-pipeline",
  description: "Chunking + traitement parallèle de chaque segment",
  inputSchema: z.object({ text: z.string().min(1) }),
  outputSchema: z.array(
    z.object({
      embedding: z.array(z.number()),
      tags: z.array(z.string()),
    }),
  ),
})
  .then(chunkText)
  .then(foreachChunk)
  .commit();
createForEachStep renvoie un tableau par défaut : utilisez l’option collect pour fusionner les résultats (ex. concaténation d’embeddings). TChunkDocument assure un chunking homogène et propage la metadata (source: "raw-text").

Gérer la concurrence

  • concurrency dans createForEachStep limite le nombre d’items traités en parallèle (1 par défaut). Augmentez-le lorsque vos handlers sont I/O-bound.
  • Les steps définis dans createParallelStep s’exécutent simultanément et leurs sorties sont regroupées dans un objet.
  • Combinez parallel et foreach pour optimiser vos pipelines sans sacrifier la lisibilité.

Observabilité

Chaque sous-exécution émet des événements (step:parallel:start, step:parallel:success, etc.). Surveillez-les avec run.watch() pour suivre les branches actives et les temps de traitement. Besoin de décisions dynamiques ? Consultez les branches conditionnelles. Pour répéter une étape jusqu’à un prédicat, rendez-vous sur les boucles while.