Skip to main content
AI Kit workflows are built from typed steps (createStep, createMapStep, …) chained through createWorkflow. Every run is observable, cancellable, and integrates with OpenTelemetry.

1. Install dependencies

pnpm add @ai_kit/core zod
# or
npm install @ai_kit/core zod
zod is optional but recommended to strongly type your inputs and outputs.

2. Declare a step

import { createStep } from "@ai_kit/core";
import { z } from "zod";

type WeatherInput = { city: string };
type WeatherOutput = { forecast: string };

export const fetchWeather = createStep<WeatherInput, WeatherOutput>({
  id: "fetch-weather",
  description: "Retrieve the current weather",
  inputSchema: z.object({ city: z.string().min(1) }),
  handler: async ({ input, signal }) => {
    if (signal.aborted) {
      throw new Error("Request cancelled");
    }

    // Replace with your real API call
    return { forecast: `Sunny in ${input.city}` };
  },
});
  • Schemas are optional: any value exposing parse or safeParse works.
  • Duplicate a step with cloneStep(step, overrides) when you want to reuse the handler.

3. Assemble a workflow

import { createWorkflow } from "@ai_kit/core";
import { z } from "zod";
import { fetchWeather } from "./steps/fetchWeather";

export const weatherWorkflow = createWorkflow({
  id: "weather-line",
  description: "Simple weather workflow",
  inputSchema: z.object({ city: z.string() }),
  outputSchema: z.object({ forecast: z.string() }),
})
  .then(fetchWeather)
  .commit();
commit() returns an immutable Workflow. The output schema is applied to the value returned by the last step (or finalize when defined).

4. Run and inspect

const result = await weatherWorkflow.run({
  inputData: { city: "Paris" },
});

if (result.status === "success") {
  console.log(result.result.forecast);
} else {
  console.error("Failed", result.error);
}

Control execution

  • workflow.createRun() returns a reusable WorkflowRun.
  • run.watch(listener) fires on every event (workflow:start, step:success, step:event, …).
  • run.stream() exposes an async iterator so you can consume events in real time while awaiting completion.
  • run.cancel() aborts the execution via an AbortSignal.
const run = weatherWorkflow.createRun();

const unwatch = run.watch(event => {
  console.log(`[${event.type}]`, event);
});

const { stream, final } = await run.stream({ inputData: { city: "Lyon" } });

for await (const evt of stream) {
  // Feed a live UI or log pipeline progress
}

const outcome = await final;
unwatch();

Shared metadata

Initialise shared metadata through metadata when starting the run. Access it inside a step with context.getMetadata() and update it via context.updateMetadata(). context.store exposes a shared Map to keep temporary references.
const notifyTeam = createStep({
  id: "notify-team",
  handler: async ({ context }) => {
    context.emit({ type: "notification", data: { channel: "slack" } });
    return { status: "sent" };
  },
});

Execution context (ctx)

Carry a typed execution context between steps:
type OrderCtx = {
  orgId: string;
  total: number;
  userId?: string;
};

const orderWorkflow = createWorkflow<
  { amount: number; userId: string },
  string,
  Record<string, never>,
  OrderCtx
>({
  id: "order-processing",
  ctx: { orgId: "default-org", total: 0 },
})
  .then(
    createStep({
      id: "apply-amount",
      handler: ({ input, ctx, stepRuntime }) => {
        stepRuntime.updateCtx(current => ({
          ...current,
          total: current.total + input.amount,
          userId: input.userId,
        }));
        return `Recorded for ${ctx.orgId}`;
      },
    }),
  )
  .then(
    createStep({
      id: "format-summary",
      handler: ({ ctx }) => `Organisation ${ctx.orgId} — total ${ctx.total}`,
    }),
  )
  .commit();

const run = await orderWorkflow.run({
  inputData: { amount: 120, userId: "user_42" },
  ctx: { orgId: "acme-co" },
});

console.log(run.ctx);
// { orgId: "acme-co", total: 120, userId: "user_42" }

5. Full example (agent + workflow)

import { Agent, createStep, createWorkflow, scaleway } from "@ai_kit/core";
import { z } from "zod";

type WeatherInput = { city: string };
type WeatherSnapshot = {
  location: string;
  temperature: number;
  feelsLike: number;
  humidity: number;
  windSpeed: number;
  windGust: number;
  conditions: string;
};
type AdviceOutput = { text: string };
type WorkflowMeta = Record<string, unknown>;

const weatherCodeLabels: Record<number, string> = {
  0: "Clear sky",
  1: "Mostly clear",
  2: "Partly cloudy",
  3: "Overcast",
  45: "Fog",
  48: "Freezing fog",
  51: "Light drizzle",
  53: "Moderate drizzle",
  55: "Dense drizzle",
  56: "Light freezing drizzle",
  57: "Heavy freezing drizzle",
  61: "Light rain",
  63: "Moderate rain",
  65: "Heavy rain",
  66: "Light freezing rain",
  67: "Heavy freezing rain",
  71: "Light snow",
  73: "Moderate snow",
  75: "Heavy snow",
  77: "Snow grains",
  80: "Light rain showers",
  81: "Moderate rain showers",
  82: "Violent rain showers",
  85: "Light snow showers",
  86: "Heavy snow showers",
  95: "Thunderstorm",
  96: "Thunderstorm with light hail",
  99: "Thunderstorm with heavy hail",
};

const fetchWeather = createStep<WeatherInput, { forecast: WeatherSnapshot }>({
  id: "fetch-weather",
  description: "Fetch weather data from an external service",
  inputSchema: z.object({ city: z.string().min(1) }),
  outputSchema: z.object({
    location: z.string(),
    temperature: z.number(),
    feelsLike: z.number(),
    humidity: z.number(),
    windSpeed: z.number(),
    windGust: z.number(),
    conditions: z.string(),
  }),
  handler: async ({ input, signal }) => {
    const response = await fetch(
      `https://api.open-meteo.com/v1/forecast?current=temperature_2m,apparent_temperature,relative_humidity_2m,wind_speed_10m,wind_gusts_10m,weather_code&timezone=Europe/Paris&latitude=48.8566&longitude=2.3522`,
      { signal },
    );

    if (!response.ok) {
      throw new Error(`Weather API error (${response.status})`);
    }

    const body = await response.json();
    const current = body.current;

    const snapshot: WeatherSnapshot = {
      location: input.city,
      temperature: current.temperature_2m,
      feelsLike: current.apparent_temperature,
      humidity: current.relative_humidity_2m,
      windSpeed: current.wind_speed_10m,
      windGust: current.wind_gusts_10m,
      conditions:
        weatherCodeLabels[current.weather_code as keyof typeof weatherCodeLabels] ??
        "Unknown conditions",
    };

    return { forecast: snapshot };
  },
});

const generateAdvice = createStep<
  { forecast: WeatherSnapshot },
  AdviceOutput,
  WorkflowMeta
>({
  id: "generate-advice",
  description: "Use an agent to craft weather advice",
  handler: async ({ input, context }) => {
    context.emit({
      type: "forecast",
      data: input.forecast,
    });

    const agent = new Agent({
      name: "weather-advisor",
      instructions:
        "You are a friendly weather assistant that provides practical advice.",
      model: scaleway("gpt-oss-120b"),
    });

    const advice = await agent.generate({
      prompt: `City: ${input.forecast.location}
Feels like: ${input.forecast.feelsLike}°C
Conditions: ${input.forecast.conditions}
Humidity: ${input.forecast.humidity}%
Wind: ${input.forecast.windSpeed} km/h (gusts ${input.forecast.windGust} km/h)

Write a short message in French that summarises the weather and gives a concrete tip.`,
    });

    return { text: advice.text };
  },
});

export const weatherAdvisorWorkflow = createWorkflow<
  WeatherInput,
  AdviceOutput,
  WorkflowMeta
>({
  id: "weather-advisor",
  description: "Weather advisory workflow powered by an agent",
})
  .then(fetchWeather)
  .then(generateAdvice)
  .commit();
This workflow combines an automatic step (API call) and an AI step (agent generation). The context.emit events can feed a real-time interface while the run progresses.