Skip to main content
createParallelStep and createForEachStep let you run multiple sub-steps concurrently. Combine them to transform lists or aggregate analytical workloads efficiently.
Need to parallelise full workflow branches (each with multiple steps)? Use the builder-level branchParallel helper instead of createParallelStep. The two features target different layers.

Chunking pipeline

import {
  Chunk,
  TChunkDocument,
  createForEachStep,
  createParallelStep,
  createStep,
  createWorkflow,
} from "@ai_kit/core";
import { z } from "zod";

const chunkText = createStep<{ text: string }, Chunk[]>({
  id: "chunk-text",
  description: "Split the source text into homogeneous segments",
  handler: async ({ input }) => {
    const document = TChunkDocument.fromText(input.text);
    return document.chunk({
      chunkSize: 200,
      chunkOverlap: 20,
      metadata: { source: "raw-text" },
    });
  },
});

const embedChunk = createStep<Chunk, number[]>({
  id: "embed-chunk",
  description: "Compute an embedding for a chunk",
  handler: async ({ input }) => {
    return Array.from({ length: 3 }, (_, i) => input.content.length * (i + 1));
  },
});

const tagChunk = createStep<Chunk, string[]>({
  id: "tag-chunk",
  description: "Extract keywords from the chunk",
  handler: async ({ input }) => {
    return input.content
      .split(/[^a-zA-ZÀ-ÿ]+/)
      .filter(word => word.length > 4)
      .slice(0, 5);
  },
});

const processChunk = createParallelStep({
  id: "process-chunk",
  description: "Run analytical tasks in parallel for a chunk",
  steps: {
    embedding: embedChunk,
    tags: tagChunk,
  },
});

const foreachChunk = createForEachStep({
  id: "foreach-chunk",
  description: "Process each chunk reusing the parallel step",
  items: ({ input }) => input,
  itemStep: processChunk,
  concurrency: 4,
});

export const chunkingWorkflow = createWorkflow({
  id: "chunking-parallel-pipeline",
  description: "Chunking + parallel processing for each segment",
  inputSchema: z.object({ text: z.string().min(1) }),
  outputSchema: z.array(
    z.object({
      embedding: z.array(z.number()),
      tags: z.array(z.string()),
    }),
  ),
})
  .then(chunkText)
  .then(foreachChunk)
  .commit();
createForEachStep returns an array by default—set collect when you need to merge the results (for example to concatenate embeddings). TChunkDocument ensures consistent chunking and propagates metadata (source: "raw-text").

Manage concurrency

  • concurrency in createForEachStep limits how many items run in parallel (defaults to 1). Increase it when handlers are I/O bound.
  • Steps declared in createParallelStep run simultaneously and their outputs are grouped into an object.
  • Mix parallel and foreach to optimise pipelines without sacrificing readability.

Observability

Every nested execution emits events (step:parallel:start, step:parallel:success, …). Watch them with run.watch() to monitor which branches are active and how long they take. Need dynamic decisions? Read Conditional branches. Want to loop until a predicate is met? Head to While loops.