Skip to main content
Workflows orchestrate typed sequences of steps inspired by Mastra. Each step can validate input/output schemas, share context, and emit real-time events to observe execution.

Why workflows?

  • Structure business processes into testable, reusable steps.
  • Enrich AI scenarios with strict validation, shared metadata, and cancellation through AbortSignal.
  • Monitor execution with run.watch() or run.stream() to build live dashboards.
  • Introduce human collaboration or controlled parallelism with minimal effort.

Must-read guides

  • Introduction – create your first steps and run a full workflow.
  • Conditional branches – route execution according to business rules.
  • While loops – repeat a step until a predicate is satisfied.
  • Parallel & foreach – process collections and run tasks concurrently.
  • Human steps – put a human in the loop and manage the request → resume cycle.
  • Telemetry – instrument runs with OpenTelemetry and Langfuse.

Core concepts

Typed steps

import { createStep } from "@ai_kit/core";
import { z } from "zod";

export const fetchWeather = createStep({
  id: "fetch-weather",
  description: "Fetch current weather",
  inputSchema: z.object({ city: z.string().min(1) }),
  handler: async ({ input, signal }) => {
    if (signal.aborted) throw new Error("Request aborted");
    return { forecast: `Sunny in ${input.city}` };
  },
});
Clone a step with cloneStep when you want to reuse the same handler:
import { cloneStep } from "@ai_kit/core";

export const fetchWeatherCopy = cloneStep(fetchWeather, {
  id: "fetch-weather-copy",
  description: "Weather for another city",
});

Context & events

Each run shares metadata accessible via context.getMetadata() and mutable through context.updateMetadata(). The context.store map lets you keep temporary references.
const notifyTeam = createStep({
  id: "notify-team",
  handler: async ({ context }) => {
    context.emit({ type: "notification", data: { channel: "slack" } });
    return { status: "sent" };
  },
});

Instrumentation

  • workflow.createRun() instantiates a reusable run.
  • run.watch(listener) listens to events (workflow:start, step:success, step:event, …).
  • run.stream() returns an async iterator to consume events live.
  • run.cancel() aborts execution through the underlying AbortSignal.

Best practices

  • Declare explicit schemas to catch inconsistencies early.
  • Compose short, testable steps with clear descriptions.
  • Emit business events to feed your monitoring and audit trails.
  • Combine workflows with agents to model richer decision-making loops.