Passer au contenu principal
La méthode fluide .branchParallel() sur WorkflowBuilder permet d’ouvrir plusieurs séquences (chacune avec ses propres then, while, conditions, etc.) tout en conservant un workflow linéaire. Elle crée un ParallelWorkflowStep synthétique qui pilote les branches, agrège leurs sorties, puis enchaîne avec l’étape suivante.
import { createStep, createWorkflow } from "@ai_kit/core";

const workflow = createWorkflow({
  id: "create-workflow",
})
  .then(prepareEnvironmentStep)
  .branchParallel("prepare-infra", parallel =>
    parallel
      .branch("provisioning", branch =>
        branch
          .then(createClusterStep)
          .then(configureIngressStep),
      )
      .branch("observability", branch =>
        branch
          .then(setupGrafanaStep)
          .then(configureAlertsStep),
      )
      .onError("wait-all"),
  )
  .then(summarizeStep)
  .commit();

Surface API

  • .branchParallel(id, configure, options?) ajoute un bloc parallèle. options accepte description, inputSchema et outputSchema pour l’étape synthétique.
  • configure reçoit un ParallelWorkflowBuilder :
    • .branch(nom, build => build.then(step).then(...)) définit chaque séquence. Les branches partagent l’entrée courante et voient ctx en lecture seule.
    • .aggregate(fn) transforme le résultat final. Par défaut, la sortie est un objet { [nomDeBranche]: résultat }.
    • .onError("fail-fast" | "wait-all") choisit la stratégie d’erreur. En fail-fast, les branches s’annulent mutuellement dès qu’une échoue ; en wait-all, toutes les branches se terminent et un WorkflowExecutionError unique expose parallelErrors.

Contraintes & comportement

  • L’imbrication de .branchParallel() est refusée dans cette version pour garder le graphe explicite.
  • Les étapes humaines ne sont pas autorisées dans les branches. Orchestrer l’automatique dans le bloc, puis ajouter une étape humaine ensuite si nécessaire.
  • Les branches ne peuvent pas modifier le contexte d’exécution (stepRuntime.updateCtx() lève une erreur). Utilisez les sorties ou stepRuntime.emit() pour partager des informations.
  • Chaque branche publie les événements step:* et les snapshots avec parallelGroupId / parallelBranchId, facilitant le suivi via run.watch() ou la télémétrie.

Observabilité

  • Les watchers reçoivent les événements step:start, step:success, step:error et step:branch enrichis des identifiants parallèles pour tracer les durées.
  • Les spans OpenTelemetry exposent également ai_kit.workflow.step.parallel_group_id et ai_kit.workflow.step.parallel_branch_id.
  • Avec "wait-all", inspectez parallelErrors dans l’erreur renvoyée pour lister précisément les branches défaillantes et enrichir vos rapports.

Différence avec createParallelStep

  • createParallelStep construit un step unique composé de sous-steps partageant le même input — idéal dans un createForEachStep ou pour exécuter quelques tâches analytiques ensemble.
  • .branchParallel() agit au niveau du builder : chaque branche peut contenir plusieurs steps (y compris while, conditions, etc.) et le bloc renvoie une valeur agrégée au workflow principal.
  • Combinez-les selon le besoin : gardez createParallelStep pour la concurrence locale et utilisez .branchParallel() pour déployer des pans entiers de workflow en parallèle.