Skip to main content
Use the fluent .branchParallel() helper on WorkflowBuilder whenever you want to fan out into multiple branch sequences (each with their own then, while, or conditions) while keeping a single linear definition. It creates a synthetic ParallelWorkflowStep that orchestrates the branches, aggregates their outputs, and forwards the result to the next step.
import { createStep, createWorkflow } from "@ai_kit/core";

const workflow = createWorkflow({
  id: "create-workflow",
})
  .then(prepareEnvironmentStep)
  .branchParallel("prepare-infra", parallel =>
    parallel
      .branch("provisioning", branch =>
        branch
          .then(createClusterStep)
          .then(configureIngressStep),
      )
      .branch("observability", branch =>
        branch
          .then(setupGrafanaStep)
          .then(configureAlertsStep),
      )
      .onError("wait-all"),
  )
  .then(summarizeStep)
  .commit();

API surface

  • .branchParallel(id, configure, options?) registers a new parallel block. options lets you provide description, inputSchema, or outputSchema for the synthetic step.
  • configure receives a ParallelWorkflowBuilder instance:
    • .branch(name, build => build.then(step).then(...)) defines each branch sequence. Branches reuse the workflow input and have read-only access to ctx.
    • .aggregate(fn) changes the resulting value. Without it, the block returns { [branchName]: branchOutput }.
    • .onError("fail-fast" | "wait-all") controls how branch failures propagate. Fail-fast aborts siblings immediately; wait-all waits for every branch and surfaces a combined WorkflowExecutionError containing parallelErrors.

Constraints & behaviour

  • Nested .branchParallel() blocks are rejected in the current release to keep the graph simple.
  • Human steps are not allowed inside branches. Use .branchParallel() to orchestrate automatic work, then chain a human step afterwards if needed.
  • Branches cannot mutate the workflow runtime context; any stepRuntime.updateCtx() call from a branch throws. Emit events or return outputs instead.
  • Each branch emits the standard step:* events and snapshots decorated with parallelGroupId / parallelBranchId. This makes watchers, telemetry traces, and history dumps easy to filter.

Observability tips

  • run.watch() will surface step:start, step:success, step:error, and step:branch events tagged with the parallel identifiers so you can chart branch-level timings.
  • OpenTelemetry spans inherit the same attributes: ai_kit.workflow.step.parallel_group_id and ai_kit.workflow.step.parallel_branch_id.
  • When using "wait-all", inspect the parallelErrors array on the thrown error to list failing branches and bubble up richer status reports.

Relation to createParallelStep

  • createParallelStep is a single step composed of substeps that all consume the same input, ideal for chunk-level analytics or when used inside createForEachStep.
  • .branchParallel() works at the workflow-builder level: each branch can hold full sequences (including other builders like while or conditions), and the block returns a single aggregated value to the next workflow step.
  • Keep using createParallelStep for localised concurrent work, and reach for .branchParallel() when you need to fan out entire sections of your workflow graph.