Skip to main content
AI Kit workflows are built from typed steps (createStep, createMapStep, …) chained through createWorkflow. Every run is observable, cancellable, and integrates with OpenTelemetry.

1. Install dependencies

pnpm add @ai_kit/core zod
# or
npm install @ai_kit/core zod
zod is optional but recommended to strongly type your inputs and outputs.

2. Declare a step

import { createStep } from "@ai_kit/core";
import { z } from "zod";

export const fetchWeather = createStep({
  id: "fetch-weather",
  description: "Retrieve the current weather",
  inputSchema: z.object({ city: z.string().min(1) }),
  outputSchema: z.object({ forecast: z.string() }),
  handler: async ({ input, signal }) => {
    if (signal.aborted) {
      throw new Error("Request cancelled");
    }

    // Replace with your real API call
    return { forecast: `Sunny in ${input.city}` };
  },
});
  • Aucun generic nécessaire : dès que vous fournissez inputSchema / outputSchema, TypeScript infère automatiquement les types du handler.
  • Les steps restent compatibles avec tout workflow, même si celui-ci définit un ctx ou un metadata typé. Vous ne rajoutez des generics que si vous voulez du typage avancé (Meta, RootInput, Ctx) dans la step elle-même.
  • Si vous n’avez pas de schéma, vous pouvez toujours annoter manuellement (createStep<MyInput, MyOutput>(...)), comme dans les versions précédentes.

3. Assemble a workflow

import { createWorkflow } from "@ai_kit/core";
import { z } from "zod";
import { fetchWeather } from "./steps/fetchWeather";

export const weatherWorkflow = createWorkflow({
  id: "weather-line",
  description: "Simple weather workflow",
  inputSchema: z.object({ city: z.string() }),
  outputSchema: z.object({ forecast: z.string() }),
})
  .then(fetchWeather)
  .commit();
commit() returns an immutable Workflow. The output schema is applied to the value returned by the last step (or finalize when defined).

4. Run and inspect

const result = await weatherWorkflow.run({
  inputData: { city: "Paris" },
});

if (result.status === "success") {
  console.log(result.result.forecast);
} else {
  console.error("Failed", result.error);
}

Control execution

  • workflow.createRun() returns a reusable WorkflowRun.
  • run.watch(listener) fires on every event (workflow:start, step:success, step:event, …).
  • run.stream() exposes an async iterator so you can consume events in real time while awaiting completion.
  • run.cancel() aborts the execution via an AbortSignal.
const run = weatherWorkflow.createRun();

const unwatch = run.watch(event => {
  console.log(`[${event.type}]`, event);
});

const { stream, final } = await run.stream({ inputData: { city: "Lyon" } });

for await (const evt of stream) {
  // Feed a live UI or log pipeline progress
}

const outcome = await final;
unwatch();

Shared metadata

Initialise shared metadata through metadata when starting the run. Access it inside a step with context.getMetadata() and update it via context.updateMetadata(). context.store exposes a shared Map to keep temporary references.
const notifyTeam = createStep({
  id: "notify-team",
  handler: async ({ context }) => {
    context.emit({ type: "notification", data: { channel: "slack" } });
    return { status: "sent" };
  },
});

Execution context (ctx)

Carry a typed execution context between steps:
type OrderCtx = {
  orgId: string;
  total: number;
  userId?: string;
};

const orderWorkflow = createWorkflow({
  id: "order-processing",
  ctx: { orgId: "default-org", total: 0 } as OrderCtx,
})
  .then(
    createStep({
      id: "apply-amount",
      handler: ({ input, ctx, stepRuntime }) => {
        stepRuntime.updateCtx(current => ({
          ...current,
          total: current.total + input.amount,
          userId: input.userId,
        }));
        return `Recorded for ${ctx.orgId}`;
      },
    }),
  )
  .then(
    createStep({
      id: "format-summary",
      handler: ({ ctx }) => `Organisation ${ctx.orgId} — total ${ctx.total}`,
    }),
  )
  .commit();

const run = await orderWorkflow.run({
  inputData: { amount: 120, userId: "user_42" },
  ctx: { orgId: "acme-co" },
});

console.log(run.ctx);
// { orgId: "acme-co", total: 120, userId: "user_42" }

Choose your typing level

  • Workflow without generics – Quick to write; TypeScript accepts any inputData/ctx, so you can move fast without annotations:
const simpleWorkflow = createWorkflow({ id: "simple" })
  .then(fetchWeather) // schema-typed step, no generics
  .commit();
  • Workflow with explicit generics – Add <Input, Output, Meta, Ctx> when you want IDE hints on ctx, metadata or inputs. Steps declared with schemas remain plug-and-play; you never need to restate their generics.

5. Full example (agent + workflow)

import { Agent, createStep, createWorkflow, scaleway } from "@ai_kit/core";
import { z } from "zod";

const weatherSnapshotSchema = z.object({
  location: z.string(),
  temperature: z.number(),
  feelsLike: z.number(),
  humidity: z.number(),
  windSpeed: z.number(),
  windGust: z.number(),
  conditions: z.string(),
});

type WeatherSnapshot = z.infer<typeof weatherSnapshotSchema>;

const weatherCodeLabels: Record<number, string> = {
  0: "Clear sky",
  1: "Mostly clear",
  2: "Partly cloudy",
  3: "Overcast",
  45: "Fog",
  48: "Freezing fog",
  51: "Light drizzle",
  53: "Moderate drizzle",
  55: "Dense drizzle",
  56: "Light freezing drizzle",
  57: "Heavy freezing drizzle",
  61: "Light rain",
  63: "Moderate rain",
  65: "Heavy rain",
  66: "Light freezing rain",
  67: "Heavy freezing rain",
  71: "Light snow",
  73: "Moderate snow",
  75: "Heavy snow",
  77: "Snow grains",
  80: "Light rain showers",
  81: "Moderate rain showers",
  82: "Violent rain showers",
  85: "Light snow showers",
  86: "Heavy snow showers",
  95: "Thunderstorm",
  96: "Thunderstorm with light hail",
  99: "Thunderstorm with heavy hail",
};

const fetchWeather = createStep({
  id: "fetch-weather",
  description: "Fetch weather data from an external service",
  inputSchema: z.object({ city: z.string().min(1) }),
  outputSchema: z.object({ forecast: weatherSnapshotSchema }),
  handler: async ({ input, signal }) => {
    const response = await fetch(
      `https://api.open-meteo.com/v1/forecast?current=temperature_2m,apparent_temperature,relative_humidity_2m,wind_speed_10m,wind_gusts_10m,weather_code&timezone=Europe/Paris&latitude=48.8566&longitude=2.3522`,
      { signal },
    );

    if (!response.ok) {
      throw new Error(`Weather API error (${response.status})`);
    }

    const body = await response.json();
    const current = body.current;

    const snapshot: WeatherSnapshot = {
      location: input.city,
      temperature: current.temperature_2m,
      feelsLike: current.apparent_temperature,
      humidity: current.relative_humidity_2m,
      windSpeed: current.wind_speed_10m,
      windGust: current.wind_gusts_10m,
      conditions:
        weatherCodeLabels[current.weather_code as keyof typeof weatherCodeLabels] ??
        "Unknown conditions",
    };

    return { forecast: snapshot };
  },
});

const generateAdvice = createStep({
  id: "generate-advice",
  description: "Use an agent to craft weather advice",
  inputSchema: z.object({ forecast: weatherSnapshotSchema }),
  outputSchema: z.object({ text: z.string() }),
  handler: async ({ input, context }) => {
    context.emit({
      type: "forecast",
      data: input.forecast,
    });

    const agent = new Agent({
      name: "weather-advisor",
      instructions:
        "You are a friendly weather assistant that provides practical advice.",
      model: scaleway("gpt-oss-120b"),
    });

    const advice = await agent.generate({
      prompt: `City: ${input.forecast.location}
Feels like: ${input.forecast.feelsLike}°C
Conditions: ${input.forecast.conditions}
Humidity: ${input.forecast.humidity}%
Wind: ${input.forecast.windSpeed} km/h (gusts ${input.forecast.windGust} km/h)

Write a short message in French that summarises the weather and gives a concrete tip.`,
    });

    return { text: advice.text };
  },
});

export const weatherAdvisorWorkflow = createWorkflow({
  id: "weather-advisor",
  description: "Weather advisory workflow powered by an agent",
})
  .then(fetchWeather)
  .then(generateAdvice)
  .commit();
This workflow combines an automatic step (API call) and an AI step (agent generation). The context.emit events can feed a real-time interface while the run progresses.