import {
Chunk,
TChunkDocument,
createForEachStep,
createParallelStep,
createStep,
createWorkflow,
} from "@ai_kit/core";
import { z } from "zod";
const chunkText = createStep<{ text: string }, Chunk[]>({
id: "chunk-text",
description: "Split the source text into homogeneous segments",
handler: async ({ input }) => {
const document = TChunkDocument.fromText(input.text);
return document.chunk({
chunkSize: 200,
chunkOverlap: 20,
metadata: { source: "raw-text" },
});
},
});
const embedChunk = createStep<Chunk, number[]>({
id: "embed-chunk",
description: "Compute an embedding for a chunk",
handler: async ({ input }) => {
return Array.from({ length: 3 }, (_, i) => input.content.length * (i + 1));
},
});
const tagChunk = createStep<Chunk, string[]>({
id: "tag-chunk",
description: "Extract keywords from the chunk",
handler: async ({ input }) => {
return input.content
.split(/[^a-zA-ZÀ-ÿ]+/)
.filter(word => word.length > 4)
.slice(0, 5);
},
});
const processChunk = createParallelStep({
id: "process-chunk",
description: "Run analytical tasks in parallel for a chunk",
steps: {
embedding: embedChunk,
tags: tagChunk,
},
});
const foreachChunk = createForEachStep({
id: "foreach-chunk",
description: "Process each chunk reusing the parallel step",
items: ({ input }) => input,
itemStep: processChunk,
concurrency: 4,
});
export const chunkingWorkflow = createWorkflow({
id: "chunking-parallel-pipeline",
description: "Chunking + parallel processing for each segment",
inputSchema: z.object({ text: z.string().min(1) }),
outputSchema: z.array(
z.object({
embedding: z.array(z.number()),
tags: z.array(z.string()),
}),
),
})
.then(chunkText)
.then(foreachChunk)
.commit();