Chapter 7: Interrupt/Resume (Human-in-the-Loop)

Goal of this chapter: understand the Interrupt/Resume mechanism and implement a Tool approval flow so users can confirm before sensitive operations.

Code Location

Prerequisites

Same as Chapter 1: you need a configured and available ChatModel (OpenAI or Ark). Also, like Chapter 4, set PROJECT_ROOT:

export PROJECT_ROOT=/path/to/eino  # Eino core library root directory (defaults to current directory if not set)

Running

In the examples/quickstart/chatwitheino directory:

# Set project root directory
export PROJECT_ROOT=/path/to/your/project

go run ./cmd/ch07

Example output:

you> Please execute the command echo hello

⚠️  Approval Required ⚠️
Tool: execute
Arguments: {"command":"echo hello"}

Approve this action? (y/n): y
[tool result] hello

hello

From Automatic Execution to Human Approval: Why We Need Interrupt

In previous chapters, the Agent automatically executed all Tool calls, but in certain scenarios this is dangerous:

Risks of automatic execution:

  • Deleting files: accidentally deleting important data
  • Sending emails: sending incorrect content
  • Executing commands: running dangerous operations
  • Modifying config: breaking system settings

Interrupt’s role:

  • Interrupt is the Agent’s pause mechanism: pauses before critical operations, waiting for user confirmation
  • Interrupt carries information: shows the user the operation about to be executed
  • Interrupt is resumable: continues after user confirmation, or returns an error on rejection

Simple analogy:

  • Automatic execution = “autopilot” (fully trusting the system)
  • Interrupt = “manual override” (critical decisions made by humans)

Key Concepts

Interrupt Mechanism

Interrupt is the core mechanism in Eino for implementing human-agent collaboration.

Core idea: pause before executing critical operations, wait for user confirmation, then continue.

A Tool that requires approval has its execution split into two phases:

  1. First call (triggers interrupt): the Tool saves the current arguments, then returns an interrupt signal. Runner pauses execution and returns an Interrupt event to the caller.
  2. Resume after user approval: Runner re-invokes the Tool; this time the Tool detects it was “previously interrupted”, reads the user’s approval result, and executes (or rejects).

Simplified pseudocode:

func myTool(ctx, args):
    if first_call:
        save args
        return interrupt_signal  // Runner pauses, shows approval prompt
    else:  // Second call after Resume
        if user_approved:
            return execute_operation(saved_args)
        else:
            return "Operation rejected by user"

Full code with key field explanations:

// Trigger interrupt in a Tool
func myTool(ctx context.Context, args string) (string, error) {
    // wasInterrupted: whether this is the second call after Resume (false on first call, true after Resume)
    // storedArgs: arguments saved via StatefulInterrupt on first call, retrievable after Resume
    wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)

    if !wasInterrupted {
        // First call: trigger interrupt, saving args for use after Resume
        return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
            ToolName:        "my_tool",
            ArgumentsInJSON: args,
        }, args)  // Third parameter is the state to save (retrieved via storedArgs after Resume)
    }

    // Second call after Resume: read user's approval result
    // isTarget: whether this Resume targets the current Tool (each Resume targets only one Tool)
    // hasData:  whether Resume carried approval result data
    // data:     the user's approval result
    isTarget, hasData, data := tool.GetResumeContext[*ApprovalResult](ctx)
    if isTarget && hasData {
        if data.Approved {
            return doSomething(storedArgs)  // Execute actual operation with saved args
        }
        return "Operation rejected by user", nil
    }

    // Other cases (isTarget=false means this Resume's target is not the current Tool): re-interrupt
    return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
        ToolName:        "my_tool",
        ArgumentsInJSON: storedArgs,
    }, storedArgs)
}

ApprovalMiddleware

ApprovalMiddleware is a generic approval middleware that intercepts specific Tool calls:

type approvalMiddleware struct {
    *adk.BaseChatModelAgentMiddleware
}

func (m *approvalMiddleware) WrapInvokableToolCall(
    _ context.Context,
    endpoint adk.InvokableToolCallEndpoint,
    tCtx *adk.ToolContext,
) (adk.InvokableToolCallEndpoint, error) {
    // Only intercept Tools that require approval
    if tCtx.Name != "execute" {
        return endpoint, nil
    }
    
    return func(ctx context.Context, args string, opts ...tool.Option) (string, error) {
        wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
        
        if !wasInterrupted {
            return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
                ToolName:        tCtx.Name,
                ArgumentsInJSON: args,
            }, args)
        }
        
        isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
        if isTarget && hasData {
            if data.Approved {
                return endpoint(ctx, storedArgs, opts...)
            }
            if data.DisapproveReason != nil {
                return fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason), nil
            }
            return fmt.Sprintf("tool '%s' disapproved", tCtx.Name), nil
        }
        
        isTarget, _, _ = tool.GetResumeContext[any](ctx)
        if !isTarget {
            return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
                ToolName:        tCtx.Name,
                ArgumentsInJSON: storedArgs,
            }, storedArgs)
        }

        return endpoint(ctx, storedArgs, opts...)
    }, nil
}

func (m *approvalMiddleware) WrapStreamableToolCall(
    _ context.Context,
    endpoint adk.StreamableToolCallEndpoint,
    tCtx *adk.ToolContext,
) (adk.StreamableToolCallEndpoint, error) {
    // If the agent is configured with StreamingShell, execute goes through streaming calls;
    // this method must be implemented to intercept it
    if tCtx.Name != "execute" {
        return endpoint, nil
    }
    return func(ctx context.Context, args string, opts ...tool.Option) (*schema.StreamReader[string], error) {
        wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
        if !wasInterrupted {
            return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
                ToolName:        tCtx.Name,
                ArgumentsInJSON: args,
            }, args)
        }

        isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
        if isTarget && hasData {
            if data.Approved {
                return endpoint(ctx, storedArgs, opts...)
            }
            if data.DisapproveReason != nil {
                return singleChunkReader(fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason)), nil
            }
            return singleChunkReader(fmt.Sprintf("tool '%s' disapproved", tCtx.Name)), nil
        }

        isTarget, _, _ = tool.GetResumeContext[any](ctx)
        if !isTarget {
            return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
                ToolName:        tCtx.Name,
                ArgumentsInJSON: storedArgs,
            }, storedArgs)
        }

        return endpoint(ctx, storedArgs, opts...)
    }, nil
}

CheckPointStore

CheckPointStore is a key component for implementing interrupt/resume:

type CheckPointStore interface {
    // Save checkpoint
    Put(ctx context.Context, key string, checkpoint *Checkpoint) error
    
    // Get checkpoint
    Get(ctx context.Context, key string) (*Checkpoint, error)
}

Why do we need CheckPointStore?

  • Save state on interrupt: Tool arguments, execution position, etc.
  • Load state on resume: continue execution from the interrupt point
  • Support cross-process resume: can still resume after process restart

Interrupt/Resume Implementation

1. Configure Runner with CheckPointStore

runner := adk.NewTypedRunner[M](adk.TypedRunnerConfig[M]{
    Agent:           agent,
    EnableStreaming: true,
    CheckPointStore: adkstore.NewInMemoryStore(),  // In-memory storage
})

2. Configure Agent with ApprovalMiddleware

agent, err := deep.NewTyped[M](ctx, &deep.TypedConfig[M]{
    // ... other config
    Handlers: []adk.TypedChatModelAgentMiddleware[M]{
        newApprovalMiddleware[M](),  // Add approval middleware
        newSafeToolMiddleware[M](),  // Convert Tool errors to strings (interrupt errors propagate upward)
    },
})

3. Handle Interrupt Events

checkPointID := sessionID

events := runner.Run(ctx, msgops.NormalizeMessagesForModelInput(history), adk.WithCheckPointID(checkPointID))
result, err := helpers.PrintAndCollect[M](events, helpers.PrintOptions{
    ShowToolCalls:    true,
    ShowToolResults:  true,
    CaptureInterrupt: true,
})
if err != nil {
    return err
}

assistantText := result.AssistantText
if result.InterruptInfo != nil {
    // Note: it's recommended to use the same stdin reader for both "user input" and "approval y/n"
    // to avoid approval input being treated as the next round's you> message
    assistantText, err = handleInterrupt[M](ctx, runner, checkPointID, result.InterruptInfo, reader)
    if err != nil {
        return err
    }
}

_ = session.Append(msgops.NewAssistant[M](assistantText, nil))

Interrupt/Resume Execution Flow

┌─────────────────────────────────────────┐
│  User: execute command echo hello        │
└─────────────────────────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  Agent analyzes intent│
        │  Decides to call      │
        │  execute              │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  ApprovalMiddleware  │
        │  Intercepts Tool call │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  Trigger Interrupt    │
        │  Save state to Store  │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  Return Interrupt     │
        │  event; await user    │
        │  approval             │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  User inputs y/n      │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  runner.ResumeWith... │
        │  Resume execution     │
        └──────────────────────┘
                   ↓
        ┌──────────────────────┐
        │  Execute command      │
        │  or return rejection  │
        └──────────────────────┘

Chapter Summary

  • Interrupt: the Agent’s pause mechanism, pausing before critical operations to await confirmation
  • Resume: resume execution — continue after user confirmation, or return an error on rejection
  • ApprovalMiddleware: a generic approval middleware that intercepts specific Tool calls
  • CheckPointStore: saves interrupt state, supporting cross-process resume
  • Human-agent collaboration: critical decisions are confirmed by humans, improving safety

Extended Thinking

Other Interrupt scenarios:

  • Multi-option approval: user selects one of several options
  • Parameter completion: user provides missing parameters
  • Conditional branching: user decides the execution path

Approval strategies:

  • Allowlist: only approve sensitive operations
  • Blocklist: approve all operations except safe ones
  • Dynamic rules: decide whether to require approval based on argument content