Skip to content

[Question] ReactAgent 流式中断恢复机制不完善:stream() 不支持恢复,invokeAndGetOutput() 非流式 #4287

@junxwang

Description

@junxwang

Question

正文

问题描述

在使用 Spring AI Alibaba 的 ReactAgent 实现**人工介入(Human-in-the-Loop)**功能时,发现流式输出与中断恢复机制存在以下问题:

  1. stream() 方法无法用于中断恢复 - 会抛出阻塞线程异常
  2. invokeAndGetOutput() 支持中断恢复,但非流式 - 需要手动包装为 Flux

环境信息

  • Spring AI Alibaba 版本: 1.1.2.0
  • JDK 版本: 21
  • Spring Boot 版本: 3.5.x

复现步骤

场景1:使用 stream() 进行中断恢复(失败)

// 首次调用,正常触发中断
String threadId = "session-001";
RunnableConfig config = RunnableConfig.builder()
    .threadId(threadId)
    .build();

// 假设已触发中断,现在尝试恢复执行
InterruptionMetadata approval = buildApprovalMetadata(...);

RunnableConfig resumeConfig = RunnableConfig.builder()
    .threadId(threadId)
    .addMetadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, approval)
    .build();

// ❌ 抛出异常:阻塞操作不支持在 reactive 线程执行
reactAgent.stream("", resumeConfig)  // 空消息,期望从 checkpoint 恢复
    .subscribe(...);

异常堆栈:

org.springframework.ai.tool.execution.ToolExecutionException: 
block()/blockFirst()/blockLast() are blocking, which is not supported in thread 

场景2:使用 invokeAndGetOutput() 恢复(成功但非流式)

// ✅ 可以恢复,但返回的是 NodeOutput,需要手动提取内容
Optional<NodeOutput> result = reactAgent.invokeAndGetOutput("", resumeConfig);

// 需要手动从 OverAllState 中提取消息,无法直接流式输出给前端
State state = result.get().getState();
Object data = state.getData(); // 需要自行解析结构

期望行为

以下任一方案均可接受:

方案 描述
A stream() 方法支持中断恢复,内部正确处理阻塞操作
B invokeAndGetOutput() 提供流式版本,或返回结构更友好的输出
C 提供官方推荐的"人工介入+流式输出"最佳实践示例

当前临时解决方案(请求确认是否合理)

目前我只能采用以下 workaround,请维护者确认这是否是推荐做法:

// 手动包装为 Flux,主动提取 OverAllState 中的消息
public Flux<String> resumeStreamingWithHack(
        String threadId, 
        InterruptionMetadata approval) {
    
    return Mono.fromCallable(() -> {
        RunnableConfig config = RunnableConfig.builder()
            .threadId(threadId)
            .addMetadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, approval)
            .build();
        
        // 使用 invokeAndGetOutput 恢复(同步阻塞)
        Optional<NodeOutput> result = reactAgent.invokeAndGetOutput("", config);
        
        if (result.isPresent()) {
            // 从 OverAllState 中提取实际消息内容
            OverAllState state = result.get().getState();
            // 假设消息存储在特定 key 中,需要硬编码解析
            return extractMessageFromState(state);
        }
        return "";
    })
    .subscribeOn(Schedulers.boundedElastic())  // 转移到弹性线程池
    .flux()
    .flatMapMany(msg -> Flux.just(msg.split("(?<=\\n)"))); // 模拟流式分行
}

private String extractMessageFromState(OverAllState state) {
    // ❓ 这里需要硬编码 key,是否有更优雅的方式?
    Object messages = state.data().get("messages");
    // 解析 messages 列表获取最后一条 AI 消息...
    return parseMessages(messages);
}

该方案的问题:

  1. 需要 subscribeOn(Schedulers.boundedElastic()) 规避线程阻塞限制
  2. 需要硬编码 OverAllState 的 key 结构(如 "messages"),耦合实现细节
  3. 失去了真正的流式效果(SSE 逐字输出),只是模拟分行

具体问题

  1. stream() 不支持恢复是 by design 还是 bug?
  2. 如果是 by design,是否有计划支持 真正的流式中断恢复

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions