Skip to content

[Feature Request] Add streaming chunk events to WorkforceCallback for real-time agent output monitoring #3379

@Wendong-Fan

Description

@Wendong-Fan

Required prerequisites

Motivation

Workforce internally calls agent.step() and waits for the final result, never iterating through streaming chunks
. Users cannot monitor real-time streaming output from agents when using Workforce, even with custom callbacks.

Solution

  1. Add new event type
  @dataclass
  class TaskStreamingChunkEvent:
      task_id: str
      worker_id: str
      chunk: ChatAgentResponse
      chunk_index: int
  1. Add callback method
  class WorkforceCallback:
      def log_task_streaming_chunk(self, event: TaskStreamingChunkEvent) -> None:
          """Called when a task produces a streaming output chunk."""
          pass
  1. Auto-detect and conditionally enable streaming
  class Workforce:
      def __init__(self, name, callbacks=None):
          self.callbacks = callbacks or []
          # Auto-detect if any callback needs streaming
          self._streaming_enabled = any(
              callable(getattr(cb, 'log_task_streaming_chunk', None))
              for cb in self.callbacks
          )

      async def _execute_task(self, task, worker):
          self._emit(TaskStartedEvent(...))

          if self._streaming_enabled:
              # Iterate through streaming chunks
              chunk_index = 0
              async for chunk in worker.agent.step(task.content):
                  self._emit(TaskStreamingChunkEvent(
                      task_id=task.id,
                      worker_id=worker.id,
                      chunk=chunk,
                      chunk_index=chunk_index
                  ))
                  chunk_index += 1
          else:
              # Current behavior: wait for final result
              result = await worker.agent.step(task.content)

          self._emit(TaskCompletedEvent(...))

Alternatives

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

P1Task with middle level priority

Type

No type

Projects

Status

No status

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions