Documentation Index
Fetch the complete documentation index at: https://ai.aidalinfo.fr/llms.txt
Use this file to discover all available pages before exploring further.
createParallelStep and createForEachStep let you run multiple sub-steps concurrently. Combine them to transform lists or aggregate analytical workloads efficiently.
Need to parallelise full workflow branches (each with multiple steps)? Use the builder-level branchParallel helper instead of createParallelStep. The two features target different layers.
Chunking pipeline
import {
Chunk,
TChunkDocument,
createForEachStep,
createParallelStep,
createStep,
createWorkflow,
} from "@ai_kit/core";
import { z } from "zod";
const chunkText = createStep<{ text: string }, Chunk[]>({
id: "chunk-text",
description: "Split the source text into homogeneous segments",
handler: async ({ input }) => {
const document = TChunkDocument.fromText(input.text);
return document.chunk({
chunkSize: 200,
chunkOverlap: 20,
metadata: { source: "raw-text" },
});
},
});
const embedChunk = createStep<Chunk, number[]>({
id: "embed-chunk",
description: "Compute an embedding for a chunk",
handler: async ({ input }) => {
return Array.from({ length: 3 }, (_, i) => input.content.length * (i + 1));
},
});
const tagChunk = createStep<Chunk, string[]>({
id: "tag-chunk",
description: "Extract keywords from the chunk",
handler: async ({ input }) => {
return input.content
.split(/[^a-zA-ZÀ-ÿ]+/)
.filter(word => word.length > 4)
.slice(0, 5);
},
});
const processChunk = createParallelStep({
id: "process-chunk",
description: "Run analytical tasks in parallel for a chunk",
steps: {
embedding: embedChunk,
tags: tagChunk,
},
});
const foreachChunk = createForEachStep({
id: "foreach-chunk",
description: "Process each chunk reusing the parallel step",
items: ({ input }) => input,
itemStep: processChunk,
concurrency: 4,
});
export const chunkingWorkflow = createWorkflow({
id: "chunking-parallel-pipeline",
description: "Chunking + parallel processing for each segment",
inputSchema: z.object({ text: z.string().min(1) }),
outputSchema: z.array(
z.object({
embedding: z.array(z.number()),
tags: z.array(z.string()),
}),
),
})
.then(chunkText)
.then(foreachChunk)
.commit();
createForEachStep returns an array by default—set collect when you need to merge the results (for example to concatenate embeddings). TChunkDocument ensures consistent chunking and propagates metadata (source: "raw-text").
Manage concurrency
concurrency in createForEachStep limits how many items run in parallel (defaults to 1). Increase it when handlers are I/O bound.
- Steps declared in
createParallelStep run simultaneously and their outputs are grouped into an object.
- Mix
parallel and foreach to optimise pipelines without sacrificing readability.
Observability
Every nested execution emits events (step:parallel:start, step:parallel:success, …). Watch them with run.watch() to monitor which branches are active and how long they take.
Need dynamic decisions? Read Conditional branches. Want to loop until a predicate is met? Head to While loops.