Skip to main content
Workflows emit a root span for every run and child spans for each step (including human interactions). Enable telemetry globally, per instance, or per run.

Enable at build time

import { createWorkflow } from "@ai_kit/core";
import { normalizeInput, fetchForecast } from "./steps";

export const weatherWorkflow = createWorkflow({
  id: "weather-run",
  telemetry: true,
})
  .then(normalizeInput)
  .then(fetchForecast)
  .commit();
With telemetry: true, AI Kit:
  • creates a root span named after the workflow id;
  • automatically records input, output, and metadata (recordInputs/recordOutputs);
  • attaches ai_kit.workflow.* attributes for easier filtering in Langfuse or OTEL.
Provide an object to customise the trace:
import { createStep, createWorkflow } from "@ai_kit/core";
import { z } from "zod";

const normalizeInput = createStep({
  id: "normalize",
  inputSchema: z.object({ city: z.string() }),
  outputSchema: z.object({ city: z.string() }),
  handler: ({ input }) => ({ city: input.city.trim() }),
});

export const weatherWorkflow = createWorkflow({
  id: "weather-run",
  description: "Fetch weather data and generate a summary",
  inputSchema: z.object({ city: z.string() }),
  outputSchema: z.object({ forecast: z.string() }),
  telemetry: {
    traceName: "workflow.weather-run",
    recordInputs: true,
    recordOutputs: true,
    metadata: {
      domain: "weather",
    },
    userId: "anonymous",
  },
})
  .then(normalizeInput)
  .commit();
When userId is provided, it is exported as langfuse.user.id, user.id, and ai_kit.workflow.user_id.

Update an existing instance

import { withTelemetry } from "@ai_kit/core";

const workflow = createWorkflow({
  id: "tickets",
  inputSchema: z.object({ topic: z.string() }),
  outputSchema: z.object({ summary: z.string() }),
})
  .then(classifyTicket)
  .then(generateAnswer)
  .commit();

workflow.withTelemetry({
  traceName: "workflow.tickets",
  recordOutputs: true,
});

const instrumented = withTelemetry(workflow, {
  metadata: { team: "support" },
});

Per-run overrides

await weatherWorkflow.run({
  inputData: { city: "Paris" },
  telemetry: {
    userId: session.user.id,
    metadata: { requestId: "req_42" },
  },
});

await weatherWorkflow.run({
  inputData: { city: "Lyon" },
  telemetry: false,
});
Overrides merge with the global configuration (recordInputs, recordOutputs, metadata, …).

Associate a user

const productWorkflow = createWorkflow({
  id: "product-search",
  telemetry: { userId: "anonymous" },
})
  .then(generateProductDataStep)
  .commit();

await productWorkflow.run({
  inputData: { prompt },
  telemetry: {
    userId: session.user.id,
    metadata: { tenantId: session.tenantId },
  },
});
The user ID surfaces in Langfuse and in trace attributes. Combine it with extra metadata (tenantId, plan, …) for richer filters.

Plug into Langfuse

// instrumentation.ts
import { ensureLangfuseTelemetry } from "@ai_kit/core";

export const telemetry = ensureLangfuseTelemetry({
  autoFlush: "process",
});
// entrypoint.ts
import "./instrumentation";
import { weatherWorkflow } from "./workflows/weather";

await telemetry;

await weatherWorkflow.run({
  inputData: { city: "Marseille" },
  telemetry: {
    metadata: { environment: process.env.NODE_ENV },
  },
});
Each step records:
  • ai_kit.workflow.step.kind (automatic or human);
  • ai_kit.workflow.step.occurrence (iteration counter for loops);
  • ai_kit.workflow.step.branch_id / next_step_id when conditions are involved.
Human interactions reuse the same span between request and resume. human.requested / human.completed events are emitted for easier tracking.
If you don’t use autoFlush: "process", call provider.forceFlush() / provider.shutdown() when your app shuts down.