<!-- %{
  title: "Parent-child agent hierarchies",
  description: "Spawn child agents, route signals between layers, and aggregate results.",
  category: :docs,
  order: 16,
  tags: [:docs, :learn, :hierarchies, :runtime, :livebook],
  draft: false,
  prerequisites: ["/docs/learn/state-machines-with-fsm"],
  livebook: %{
    runnable: true,
    required_env_vars: [],
    requires_network: false,
    setup_instructions: "No API keys required. Run the setup cell, then execute the examples in order."
  }
} -->

## Prerequisites

Complete [State machines with FSM](/docs/learn/state-machines-with-fsm) before starting this tutorial. You need familiarity with Signal routing, Directives, and running Agents in the Jido runtime.

## Setup

```elixir
Mix.install([
  {{mix_dep:jido}}
])

Logger.configure(level: :warning)

# Livebook imports can execute generated docs as doctests.
# Disable compiler docs until the current Jido Hex release drops the invalid signal_types/0 example.
Code.put_compiler_option(:docs, false)
```

A polling helper replaces the test-only `eventually` macro. You will use it later to wait for asynchronous results.

This tutorial runs entirely locally. No provider keys or network calls are required.

```elixir
defmodule Helpers do
  def wait_for(fun, timeout \\ 10_000, interval \\ 200) do
    deadline = System.monotonic_time(:millisecond) + timeout
    do_wait(fun, deadline, interval)
  end

  defp do_wait(fun, deadline, interval) do
    if System.monotonic_time(:millisecond) > deadline do
      raise "Timed out waiting for condition"
    end

    case fun.() do
      {:ok, result} -> result
      :retry -> Process.sleep(interval); do_wait(fun, deadline, interval)
    end
  end
end
```

## The hierarchy pattern

Some work decomposes naturally into layers. A job splits into tasks, and each task becomes an isolated unit of execution. This tutorial builds a three-layer processing system:

```
Orchestrator (layer 1)
    |
    |-- spawns --> Coordinator (layer 2)
    |                  |
    |                  |-- spawns --> Worker A (layer 3)
    |                  |-- spawns --> Worker B (layer 3)
    |                  |
    |                  |<-- task.result -- Worker A
    |                  |<-- task.result -- Worker B
    |                  |
    |<-- job.result -- Coordinator (aggregated)
```

Signals flow downward as work assignments (`job.assign`, `task.execute`) and upward as results (`task.result`, `job.result`). Each layer only talks to its direct parent or children. The runtime handles process lifecycle through the `SpawnAgent` Directive and the built-in `jido.agent.child.started` Signal.

## Define the Worker

The Worker Agent processes individual tasks. Its single Action computes a result and emits it to the parent Coordinator using `Directive.emit_to_parent/2`.

```elixir
defmodule MyApp.ExecuteTaskAction do
  use Jido.Action,
    name: "execute_task",
    schema: [
      task_id: [type: :string, required: true],
      job_id: [type: :string, required: true],
      operation: [type: :atom, required: true],
      value: [type: :integer, required: true]
    ]

  alias Jido.Agent.Directive

  def run(params, context) do
    result = execute(params.operation, params.value)

    result_signal =
      Jido.Signal.new!(
        "task.result",
        %{
          task_id: params.task_id,
          job_id: params.job_id,
          result: result,
          operation: params.operation
        },
        source: "/worker"
      )

    agent_like = %{state: context.state}
    emit_directive = Directive.emit_to_parent(agent_like, result_signal)

    {:ok,
     %{
       last_task: %{task_id: params.task_id, result: result},
       tasks_completed: Map.get(context.state, :tasks_completed, 0) + 1
     }, List.wrap(emit_directive)}
  end

  defp execute(:compute, v), do: v * 2 + 1
  defp execute(:validate, v), do: if(v > 0, do: :valid, else: :invalid)
  defp execute(:transform, v), do: "#{v}_processed"
  defp execute(_, v), do: v
end
```

`emit_to_parent/2` reads the `__parent__` reference from agent state (set automatically when the runtime spawns a child) and returns an `Emit` Directive targeting the parent PID. It returns `nil` when no parent exists, so `List.wrap/1` safely handles both cases.

The Worker Agent itself is minimal. It maps the `task.execute` Signal to the Action above.

```elixir
defmodule MyApp.WorkerAgent do
  use Jido.Agent,
    name: "worker_agent",
    schema: [
      last_task: [type: :map, default: nil],
      tasks_completed: [type: :integer, default: 0]
    ]

  def signal_routes(_ctx) do
    [{"task.execute", MyApp.ExecuteTaskAction}]
  end
end
```

## Define the Coordinator

The Coordinator receives a job assignment, spawns Worker children, dispatches tasks, and aggregates results. This requires three Actions.

**Handle job assignment.** When the Coordinator receives a `job.assign` Signal, it records the job metadata and spawns one Worker per task using `Directive.spawn_agent/3`.

```elixir
defmodule MyApp.HandleJobAssignAction do
  use Jido.Action,
    name: "handle_job_assign",
    schema: [
      job_id: [type: :string, required: true],
      tasks: [type: {:list, :map}, required: true]
    ]

  alias Jido.Agent.Directive

  def run(%{job_id: job_id, tasks: tasks}, context) do
    pending = Map.get(context.state, :pending_tasks, %{})

    job_info = %{
      job_id: job_id,
      total_tasks: length(tasks),
      completed_tasks: 0,
      results: [],
      started_at: DateTime.utc_now()
    }

    updated_pending = Map.put(pending, job_id, job_info)

    spawn_directives =
      Enum.map(tasks, fn task ->
        task_id = "#{job_id}-task-#{task.index}"

        Directive.spawn_agent(
          MyApp.WorkerAgent,
          String.to_atom(task_id),
          meta: %{
            task_id: task_id,
            job_id: job_id,
            operation: task.operation,
            value: task.value
          }
        )
      end)

    {:ok, %{pending_tasks: updated_pending, current_job: job_id},
     spawn_directives}
  end
end
```

The `meta` map on each `SpawnAgent` Directive carries task details. When the child starts, the runtime delivers a `jido.agent.child.started` Signal back to this Coordinator with that metadata attached.

**React to child startup.** When a Worker child starts, the Coordinator sends it a `task.execute` Signal using `Directive.emit_to_pid/2`.

```elixir
defmodule MyApp.CoordinatorChildStartedAction do
  use Jido.Action,
    name: "coordinator_child_started",
    schema: [
      parent_id: [type: :string, required: true],
      child_id: [type: :string, required: true],
      child_module: [type: :any, required: true],
      tag: [type: :any, required: true],
      pid: [type: :any, required: true],
      meta: [type: :map, default: %{}]
    ]

  alias Jido.Agent.Directive

  def run(%{pid: pid, meta: meta}, _context) do
    task_signal =
      Jido.Signal.new!(
        "task.execute",
        %{
          task_id: meta.task_id,
          job_id: meta.job_id,
          operation: meta.operation,
          value: meta.value
        },
        source: "/coordinator"
      )

    {:ok, %{}, [Directive.emit_to_pid(task_signal, pid)]}
  end
end
```

The `jido.agent.child.started` Signal payload includes the child's `pid` and the `meta` you passed in the `SpawnAgent` Directive. This is the hook that connects spawning to dispatching.

**Aggregate task results.** Each Worker emits a `task.result` Signal to its parent. The Coordinator collects them and, when the count matches the total, emits `job.result` up to the Orchestrator.

```elixir
defmodule MyApp.HandleTaskResultAction do
  use Jido.Action,
    name: "handle_task_result",
    schema: [
      task_id: [type: :string, required: true],
      job_id: [type: :string, required: true],
      result: [type: :any, required: true],
      operation: [type: :atom, required: true]
    ]

  alias Jido.Agent.Directive
  alias Jido.Agent.StateOp

  def run(params, context) do
    pending = Map.get(context.state, :pending_tasks, %{})

    job_info =
      Map.get(pending, params.job_id, %{
        results: [],
        completed_tasks: 0,
        total_tasks: 0
      })

    task_result = %{
      task_id: params.task_id,
      result: params.result,
      operation: params.operation
    }

    updated = %{
      job_info
      | results: [task_result | job_info.results],
        completed_tasks: job_info.completed_tasks + 1
    }

    updated_pending = Map.put(pending, params.job_id, updated)

    if updated.completed_tasks >= updated.total_tasks do
      emit_job_result(params, updated, updated_pending, context)
    else
      {:ok, %{pending_tasks: updated_pending}}
    end
  end

  defp emit_job_result(params, updated, pending, context) do
    signal =
      Jido.Signal.new!(
        "job.result",
        %{
          job_id: params.job_id,
          results: updated.results,
          total_tasks: updated.total_tasks
        },
        source: "/coordinator"
      )

    agent_like = %{state: context.state}
    emit = Directive.emit_to_parent(agent_like, signal)
    completed = Map.get(context.state, :completed_jobs, [])
    set_op = StateOp.set_path([:pending_tasks], Map.delete(pending, params.job_id))

    {:ok, %{completed_jobs: [params.job_id | completed]},
     [set_op | List.wrap(emit)]}
  end
end
```

`StateOp.set_path/2` directly overwrites a nested state key. This removes the completed job from `pending_tasks` without a merge conflict.

The Coordinator Agent wires these three Actions to their respective Signals.

```elixir
defmodule MyApp.CoordinatorAgent do
  use Jido.Agent,
    name: "coordinator_agent",
    schema: [
      pending_tasks: [type: :map, default: %{}],
      current_job: [type: :string, default: nil],
      completed_jobs: [type: {:list, :string}, default: []]
    ]

  def signal_routes(_ctx) do
    [
      {"job.assign", MyApp.HandleJobAssignAction},
      {"jido.agent.child.started", MyApp.CoordinatorChildStartedAction},
      {"task.result", MyApp.HandleTaskResultAction}
    ]
  end
end
```

## Define the Orchestrator

The Orchestrator is the entry point. It accepts job submissions, spawns a Coordinator per job, and collects final results.

**Submit a job.** Generate a unique job ID, record it as pending, and spawn a Coordinator with the job metadata.

```elixir
defmodule MyApp.SubmitJobAction do
  use Jido.Action,
    name: "submit_job",
    schema: [
      job_name: [type: :string, required: true],
      tasks: [type: {:list, :map}, required: true]
    ]

  alias Jido.Agent.Directive

  def run(%{job_name: job_name, tasks: tasks}, context) do
    job_id = "job-#{System.unique_integer([:positive])}"
    tag = String.to_atom("coordinator-#{job_id}")
    pending = Map.get(context.state, :pending_jobs, %{})

    job_info = %{
      job_id: job_id,
      job_name: job_name,
      tasks: tasks,
      coordinator_tag: tag,
      submitted_at: DateTime.utc_now()
    }

    spawn =
      Directive.spawn_agent(
        MyApp.CoordinatorAgent,
        tag,
        meta: %{job_id: job_id, job_name: job_name, tasks: tasks}
      )

    {:ok, %{pending_jobs: Map.put(pending, job_id, job_info),
            last_submitted: job_id}, [spawn]}
  end
end
```

**React to Coordinator startup.** When the Coordinator child starts, send it the `job.assign` Signal with the task list. Tasks get indexed here so the Coordinator can track them.

```elixir
defmodule MyApp.OrchestratorChildStartedAction do
  use Jido.Action,
    name: "orchestrator_child_started",
    schema: [
      parent_id: [type: :string, required: true],
      child_id: [type: :string, required: true],
      child_module: [type: :any, required: true],
      tag: [type: :any, required: true],
      pid: [type: :any, required: true],
      meta: [type: :map, default: %{}]
    ]

  alias Jido.Agent.Directive

  def run(%{pid: pid, meta: meta}, _context) do
    indexed_tasks =
      meta.tasks
      |> Enum.with_index(1)
      |> Enum.map(fn {task, idx} -> Map.put(task, :index, idx) end)

    signal =
      Jido.Signal.new!(
        "job.assign",
        %{job_id: meta.job_id, tasks: indexed_tasks},
        source: "/orchestrator"
      )

    {:ok, %{}, [Directive.emit_to_pid(signal, pid)]}
  end
end
```

**Aggregate job results.** When a Coordinator finishes, it emits `job.result` to the Orchestrator. This Action moves the job from pending to completed.

```elixir
defmodule MyApp.HandleJobResultAction do
  use Jido.Action,
    name: "handle_job_result",
    schema: [
      job_id: [type: :string, required: true],
      results: [type: {:list, :map}, required: true],
      total_tasks: [type: :integer, required: true]
    ]

  alias Jido.Agent.StateOp

  def run(params, context) do
    pending = Map.get(context.state, :pending_jobs, %{})
    completed = Map.get(context.state, :completed_jobs, [])
    job_info = Map.get(pending, params.job_id, %{})

    record = %{
      job_id: params.job_id,
      job_name: Map.get(job_info, :job_name, "unknown"),
      total_tasks: params.total_tasks,
      results: params.results,
      completed_at: DateTime.utc_now()
    }

    set_op =
      StateOp.set_path(
        [:pending_jobs],
        Map.delete(pending, params.job_id)
      )

    {:ok, %{completed_jobs: [record | completed]}, [set_op]}
  end
end
```

## Wire the Signal routes

The Orchestrator Agent maps three Signals: incoming job submissions, child lifecycle events, and final results from Coordinators.

```elixir
defmodule MyApp.OrchestratorAgent do
  use Jido.Agent,
    name: "orchestrator_agent",
    schema: [
      pending_jobs: [type: :map, default: %{}],
      completed_jobs: [type: {:list, :map}, default: []],
      last_submitted: [type: :string, default: nil]
    ]

  def signal_routes(_ctx) do
    [
      {"submit_job", MyApp.SubmitJobAction},
      {"jido.agent.child.started", MyApp.OrchestratorChildStartedAction},
      {"job.result", MyApp.HandleJobResultAction}
    ]
  end
end
```

Each layer handles exactly the Signals it needs. Workers never see `job.assign`. Coordinators never see `submit_job`. The `jido.agent.child.started` Signal is the glue between spawning and dispatching at every level.

## Run the hierarchy

Start the Jido runtime and launch the Orchestrator. Then submit a job with multiple tasks.

```elixir
runtime_name = :learn_hierarchy
{:ok, _runtime_pid} = Jido.start_link(name: runtime_name)

{:ok, orchestrator_pid} =
  Jido.start_agent(
    runtime_name,
    MyApp.OrchestratorAgent,
    id: "orchestrator-1"
  )
```

Build a `submit_job` Signal with two compute tasks and one validation task.

```elixir
signal =
  Jido.Signal.new!(
    "submit_job",
    %{
      job_name: "mixed_job",
      tasks: [
        %{operation: :compute, value: 5},
        %{operation: :compute, value: 10},
        %{operation: :validate, value: 42}
      ]
    },
    source: "/livebook"
  )

{:ok, _agent} = Jido.AgentServer.call(orchestrator_pid, signal)
```

The call returns immediately after the Orchestrator processes the `submit_job` Signal and emits the `SpawnAgent` Directive. The Coordinator and Workers start asynchronously. Poll for the completed results.

```elixir
result =
  Helpers.wait_for(fn ->
    case Jido.AgentServer.state(orchestrator_pid) do
      {:ok, %{agent: %{state: %{completed_jobs: [job | _]}}}} ->
        {:ok, job}

      _ ->
        :retry
    end
  end)

IO.inspect(result.job_name, label: "Job")
IO.inspect(result.total_tasks, label: "Tasks completed")

for r <- result.results do
  IO.inspect({r.operation, r.result}, label: "  result")
end
```

You should see three results: `:compute` yielding `11` and `21`, and `:validate` yielding `:valid`.

## Inspect state across layers

Use `AgentServer.state/1` to examine the hierarchy at each level. The Orchestrator's `children` map contains the Coordinator, and the Coordinator's `children` map contains the Workers.

```elixir
{:ok, orch_state} = Jido.AgentServer.state(orchestrator_pid)

IO.inspect(
  Map.keys(orch_state.children),
  label: "Orchestrator children"
)
```

Each entry in the `children` map holds the child's PID, module, and tag. You can walk further down the tree.

```elixir
[coordinator_info | _] = Map.values(orch_state.children)

{:ok, coord_state} =
  Jido.AgentServer.state(coordinator_info.pid)

IO.inspect(
  Map.keys(coord_state.children),
  label: "Coordinator children (workers)"
)

IO.inspect(
  coord_state.agent.state.completed_jobs,
  label: "Coordinator completed jobs"
)
```

The Orchestrator tracks Coordinators. Each Coordinator tracks its Workers. No layer knows about agents two levels away.

## Next steps

- [Sensors and real-time events](/docs/learn/sensors-and-real-time-events) connects external event sources to agent hierarchies
- [Directives](/docs/concepts/directives) covers the full Directive API including `stop_child/1` and `schedule/2`
- [Agent runtime](/docs/concepts/agent-runtime) explains supervision, process lifecycle, and the `children` map
