解析 ‘Progressive Revelation’:如何在图执行过程中,分阶段向用户展示 Agent 的思考进度?

尊敬的各位同仁、技术爱好者们:

大家好!

今天,我们聚焦一个在构建智能体(Agent)系统时至关重要,却又常被忽视的议题——渐进式揭示(Progressive Revelation)。特别是在复杂的图执行(Graph Execution)过程中,如何分阶段地向用户展示 Agent 的思考进度,这不仅是技术挑战,更是用户体验设计的艺术。

想象一下,你正在与一个 AI 助手交互,它需要完成一个复杂的多步骤任务:搜索信息、分析数据、调用工具、生成报告。如果它在处理过程中没有任何反馈,只是长时间的“思考中……”,你会感到焦虑、困惑,甚至开始怀疑它是否真的在工作,或者它卡住了。这就是“黑盒”问题。而渐进式揭示,正是为了打破这个黑盒,让 Agent 的内部运作变得透明、可追踪、可理解。

本次讲座,我将作为一名编程专家,带领大家深入探讨如何在图执行框架下,设计并实现一套高效、实时的渐进式揭示系统。我们将从核心原理出发,逐步深入到架构设计、通信机制、代码实现,并探讨其在提升用户体验、增强系统可信度、简化调试流程方面的巨大价值。

1. Agent 的思考流程与图执行:揭开复杂性的面纱

在深入渐进式揭示之前,我们首先需要理解 Agent 的“思考”是如何发生的,以及图执行在其中扮演的角色。

1.1 什么是 Agent 的“思考”?

对于一个基于大型语言模型(LLM)的 Agent 而言,其“思考”并非人类意义上的意识活动,而是一系列结构化的决策、规划、执行和反思过程。它通常包含以下核心阶段:

  1. 目标理解与分解(Goal Understanding & Planning):Agent 接收用户请求,理解其意图,并将其分解为一系列可执行的子任务。
  2. 工具选择(Tool Selection):根据当前子任务的需求,从可用的工具集中选择最合适的工具。工具可以是搜索、代码解释器、API 调用等。
  3. 参数生成与工具调用(Parameter Generation & Tool Invocation):为选定的工具生成正确的输入参数,并执行工具。
  4. 结果观察与分析(Observation & Analysis):接收工具执行结果,并对其进行分析,判断是否达到预期,或是否需要进一步行动。
  5. 反思与修正(Reflection & Refinement):基于观察结果,评估当前进度,调整后续计划,或修正错误。
  6. 最终答案生成(Final Answer Generation):综合所有中间结果,生成最终的用户响应。

这些阶段并非线性,而是可能形成复杂的循环和分支。

1.2 图执行:结构化 Agent 复杂逻辑的利器

为了管理 Agent 复杂的、非线性的思考流程,图执行(Graph Execution)成为了一种非常有效且流行的范式。它将 Agent 的每一个思考步骤或状态建模为图中的一个节点(Node),而节点之间的依赖关系和数据流则通过边(Edge)来表示。

一个典型的 Agent 图执行流程可能包含以下节点类型:

节点类型描述示例输入示例输出
Start任务的起始点,接收用户初始请求用户请求文本初始 Agent 状态,包含目标
PlanAgent 基于目标制定初步计划目标,历史对话规划的步骤列表
Tool_Select根据当前计划和上下文,选择合适的工具当前步骤,可用工具列表选定的工具名称
Tool_Execute执行选定的工具,并获取结果工具名称,工具参数工具执行的原始输出
Observe解析工具执行结果,提取关键信息工具原始输出结构化的观察结果
Reflect对观察结果进行反思,决定下一步行动(继续、修正、完成)观察结果,当前计划,历史记录下一步决策(如“继续规划”、“调用工具”、“完成”)
Generate_Answer综合所有信息,生成最终答案所有中间结果,历史对话最终用户响应
End任务的结束点最终用户响应完成状态

通过图执行,Agent 的复杂逻辑被解构为一系列离散、可管理的步骤。这为我们提供了天然的观测点,正是渐进式揭示的基础。

2. 渐进式揭示的核心原理与挑战

渐进式揭示并非简单地打印日志,它要求我们深思熟虑,以一种结构化、实时、可理解的方式呈现信息。

2.1 核心原则
  1. 粒度适中(Appropriate Granularity):信息的揭示应有不同的粒度层级。太粗糙,用户依然茫然;太细致,则信息过载。我们需要找到一个平衡点,通常是节点级别、关键思考步骤级别。
  2. 实时性(Real-time):用户期望在 Agent 思考的同时看到进度,而不是等待所有任务完成后一次性展示。这意味着需要使用实时通信技术。
  3. 可理解性(Understandability):揭示的信息必须是用户能够理解的,避免过多的技术细节或内部状态。可以适当进行抽象和解释。
  4. 可追溯性(Traceability):用户应该能够看到 Agent 的决策路径,理解它为何采取某个行动,以及在何处可能出现问题。
  5. 可控性(Controllability):高级的渐进式揭示甚至可以允许用户在特定阶段介入,提供反馈或修正。
2.2 挑战

实现渐进式揭示并非没有挑战:

  1. 性能开销:频繁地收集、处理和传输状态更新会带来额外的计算和网络开销。
  2. 数据量管理:复杂的 Agent 流程可能产生大量中间数据和事件,如何有效地过滤、聚合和传输这些数据是关键。
  3. 并发与异步:图执行通常是异步的,可能涉及并行处理。如何准确地追踪和报告并发执行的进度需要精巧的设计。
  4. 错误处理:当 Agent 流程中出现错误时,如何清晰地报告错误发生的位置、原因,并提示可能的解决方案,是提升用户体验的重要一环。
  5. 前端展示复杂性:后端发送的数据需要前端能够有效地渲染成用户友好的界面,这本身也是一个复杂的前端工程问题。
  6. 安全与隐私:某些中间数据可能包含敏感信息,在揭示过程中需要进行过滤或脱敏。

3. 构建可观测的图执行框架

为了实现渐进式揭示,我们首先需要一个能够发出(emit)状态更新的图执行框架。这通常通过事件驱动的架构来实现。

3.1 事件驱动架构的核心思想

在图的每个关键节点执行前、执行中、执行后,以及发生重要状态转换时,Agent 应该发出一个事件(Event)。这些事件包含了当前 Agent 状态的快照或关键信息。一个中央的事件发射器(Event Emitter)负责收集这些事件,并将其转发给所有注册的事件监听器(Event Listener)

3.2 定义事件数据结构

首先,我们定义一个通用的数据结构来封装所有状态更新事件。

import time from enum import Enum from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field # 定义事件类型 class GraphEventType(str, Enum): NODE_START = "node_start" NODE_PROGRESS = "node_progress" # 例如,LLM流式输出 NODE_END = "node_end" GRAPH_START = "graph_start" GRAPH_END = "graph_end" TOOL_CALL = "tool_call" TOOL_RESULT = "tool_result" LLM_INPUT = "llm_input" LLM_OUTPUT = "llm_output" ERROR = "error" # 定义事件的通用载荷 class EventPayload(BaseModel): # 可以在这里定义一个通用的字段,例如,一个字典来存储动态数据 data: Dict[str, Any] = Field(default_factory=dict) # 核心事件模型 class GraphExecutionEvent(BaseModel): event_id: str = Field(default_factory=lambda: str(time.time_ns())) timestamp: float = Field(default_factory=time.time) event_type: GraphEventType graph_id: str node_id: Optional[str] = None # 如果是节点相关的事件,则有节点ID node_name: Optional[str] = None # 节点名称,更友好 status: Optional[str] = None # 例如 'running', 'success', 'failed' message: Optional[str] = None # 简短的用户友好信息 payload: Optional[EventPayload] = None # 详细的事件数据 class Config: use_enum_values = True
3.3 实现一个简单的事件发射器

我们将创建一个EventEmitter类,它能够注册监听器并广播事件。

import asyncio from typing import Callable, List, Coroutine class EventEmitter: def __init__(self): self._listeners: List[Callable[[GraphExecutionEvent], Coroutine[Any, Any, None]]] = [] def on(self, listener: Callable[[GraphExecutionEvent], Coroutine[Any, Any, None]]): """注册一个异步监听器""" self._listeners.append(listener) async def emit(self, event: GraphExecutionEvent): """异步广播事件给所有监听器""" # 注意:这里我们使用 asyncio.gather 来并发执行所有监听器 # 如果任何一个监听器失败,它不会阻止其他监听器 await asyncio.gather(*[listener(event) for listener in self._listeners], return_exceptions=True) # 全局或单例事件发射器 global_event_emitter = EventEmitter()
3.4 可观测的 Agent 节点和图执行器

现在,我们来设计一个可观测的BaseNodeGraphExecutor

import uuid class BaseNode: def __init__(self, node_id: str, node_name: str, emitter: EventEmitter): self.node_id = node_id self.node_name = node_name self.emitter = emitter async def _emit_event(self, event_type: GraphEventType, graph_id: str, status: Optional[str] = None, message: Optional[str] = None, payload_data: Optional[Dict[str, Any]] = None): payload = EventPayload(data=payload_data) if payload_data else None event = GraphExecutionEvent( event_type=event_type, graph_id=graph_id, node_id=self.node_id, node_name=self.node_name, status=status, message=message, payload=payload ) await self.emitter.emit(event) async def execute(self, graph_id: str, input_data: Any) -> Any: """ 抽象方法,子类需实现具体的节点逻辑。 此方法将作为事件发出点。 """ raise NotImplementedError # 示例:一个简单的规划节点 class PlanningNode(BaseNode): def __init__(self, emitter: EventEmitter): super().__init__(node_id="node_plan", node_name="规划任务", emitter=emitter) async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]: user_query = input_data.get("user_query", "") await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message=f"开始规划:'{user_query}'") # 模拟LLM思考时间 await asyncio.sleep(1) plan = f"根据查询 '{user_query}',我将执行以下步骤:1. 搜索相关信息;2. 总结信息;3. 生成最终报告。" await self._emit_event(GraphEventType.LLM_INPUT, graph_id, payload_data={"prompt": f"规划任务:{user_query}"}) await self._emit_event(GraphEventType.LLM_OUTPUT, graph_id, payload_data={"response": plan}) await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="规划完成") return {"plan": plan, "next_step": "search_info"} # 示例:一个简单的工具执行节点 class ToolExecutionNode(BaseNode): def __init__(self, emitter: EventEmitter): super().__init__(node_id="node_tool_exec", node_name="执行工具", emitter=emitter) async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]: tool_name = input_data.get("tool_name", "unknown_tool") tool_params = input_data.get("tool_params", {}) await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message=f"开始执行工具:'{tool_name}'") await self._emit_event(GraphEventType.TOOL_CALL, graph_id, payload_data={"tool": tool_name, "params": tool_params}) # 模拟工具执行 if tool_name == "search_web": await asyncio.sleep(2) # 模拟网络延迟 result = f"搜索 '{tool_params.get('query', '')}' 的结果:发现3篇文章..." else: result = f"未知工具 '{tool_name}'" await self._emit_event(GraphEventType.TOOL_RESULT, graph_id, payload_data={"tool": tool_name, "result": result}) await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="工具执行完成") return {"tool_output": result, "next_step": "summarize_result"} # 示例:一个最终答案生成节点 class GenerateAnswerNode(BaseNode): def __init__(self, emitter: EventEmitter): super().__init__(node_id="node_answer", node_name="生成最终答案", emitter=emitter) async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]: summary = input_data.get("summary", "") await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message="开始生成最终答案") final_answer_prompt = f"根据以下信息生成最终答案:{summary}" await self._emit_event(GraphEventType.LLM_INPUT, graph_id, payload_data={"prompt": final_answer_prompt}) await asyncio.sleep(1.5) final_answer = f"根据您的查询,Agent 经过搜索和总结,最终得出结论:{summary[:50]}..." # 简化处理 await self._emit_event(GraphEventType.LLM_OUTPUT, graph_id, payload_data={"response": final_answer}) await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="最终答案生成完成") return {"final_answer": final_answer} # 简单的图执行器 class SimpleGraphExecutor: def __init__(self, emitter: EventEmitter): self.emitter = emitter self.nodes: Dict[str, BaseNode] = { "plan": PlanningNode(emitter), "tool_search": ToolExecutionNode(emitter), "generate_answer": GenerateAnswerNode(emitter), # 可以添加更多节点 } # 简化图结构:直接定义执行顺序 self.workflow = [ "plan", "tool_search", "generate_answer" ] async def execute_graph(self, user_query: str) -> Dict[str, Any]: graph_id = str(uuid.uuid4()) current_data = {"user_query": user_query} await self.emitter._emit_event(GraphExecutionEvent( event_type=GraphEventType.GRAPH_START, graph_id=graph_id, message=f"Agent 任务开始,目标:'{user_query}'" )) try: for node_name in self.workflow: node = self.nodes.get(node_name) if not node: raise ValueError(f"Unknown node: {node_name}") # 模拟数据传递和决策 if node_name == "tool_search": # 从plan节点获取的plan,这里简化为直接使用查询 current_data["tool_name"] = "search_web" current_data["tool_params"] = {"query": user_query} elif node_name == "generate_answer": # 从tool_search节点获取的output current_data["summary"] = current_data.get("tool_output", "未获取到搜索结果") node_output = await node.execute(graph_id, current_data) current_data.update(node_output) # 更新当前数据流 await self.emitter._emit_event(GraphExecutionEvent( event_type=GraphEventType.GRAPH_END, graph_id=graph_id, status="success", message=f"Agent 任务完成,最终答案:{current_data.get('final_answer', '无')}" )) return current_data except Exception as e: await self.emitter._emit_event(GraphExecutionEvent( event_type=GraphEventType.ERROR, graph_id=graph_id, message=f"Agent 任务执行失败:{str(e)}", status="failed", payload_data={"error_type": type(e).__name__, "error_message": str(e)} )) raise

通过这种方式,我们的 Agent 框架就具备了发出实时进度更新的能力。下一步是讨论如何将这些事件实时传输给用户。

4. 实时通信机制的选择与实现

将后端生成的事件实时推送到前端是渐进式揭示的关键。有几种主流的实时通信技术,每种都有其适用场景。

4.1 通信机制对比
特性HTTP Polling (轮询)Server-Sent Events (SSE)WebSockets
连接方式短连接,多次请求长连接,单向(服务器->客户端)长连接,双向
协议HTTP/1.1HTTP/1.1 (基于 HTTP)WebSocket Protocol
实时性较低(取决于轮询间隔)较高极高(全双工)
实现难度简单中等中等偏上
浏览器支持普遍支持良好良好
使用场景简单、低实时性要求纯粹的服务器推送,如通知、日志流实时聊天、游戏、协同编辑、Agent 进度
开销每次请求/响应都有 HTTP 头开销保持连接有少量开销协议升级和保持连接有少量开销

对于 Agent 思考进度的渐进式揭示,我们通常需要高实时性且纯粹的服务器推送(如果用户不需要主动发送控制命令),或者双向通信(如果用户可能需要暂停、干预 Agent)。因此,SSEWebSockets是最佳选择。

4.2 使用 FastAPI 实现 SSE

Server-Sent Events 允许服务器通过一个持久的 HTTP 连接,向客户端发送事件流。它的实现相对简单,非常适合纯粹的单向进度推送。

# main.py (FastAPI 应用) from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import asyncio import json app = FastAPI() # 导入之前定义的 EventEmitter 和 GraphExecutionEvent # from your_module import global_event_emitter, GraphExecutionEvent, GraphEventType, SimpleGraphExecutor # 初始化 Agent 执行器 agent_executor = SimpleGraphExecutor(global_event_emitter) @app.get("/events") async def sse_events(request: Request): """ SSE 端点,用于向客户端推送 Agent 进度事件。 """ async def event_generator(): # 用于存储待发送事件的队列 event_queue = asyncio.Queue() # 注册一个监听器,将事件放入队列 async def listener(event: GraphExecutionEvent): await event_queue.put(event.json()) # 将事件转换为 JSON 字符串 global_event_emitter.on(listener) try: while True: # 检查客户端是否断开连接 if await request.is_disconnected(): print("Client disconnected from SSE.") break event_data = await event_queue.get() # SSE 规范要求数据格式为 "data: [json_string]nn" yield f"data: {event_data}nn" await asyncio.sleep(0.01) # 小幅延迟,避免CPU空转 except asyncio.CancelledError: print("SSE generator cancelled.") finally: # 在连接断开或取消时,移除监听器 # 注意:如果有很多客户端,每个客户端都需要一个独立的监听器 # 这里的实现是简化的,实际生产中可能需要更复杂的管理 print("SSE listener cleaned up.") # global_event_emitter._listeners.remove(listener) # 移除监听器,这里需要考虑并发和唯一性 return StreamingResponse(event_generator(), media_type="text/event-stream") @app.post("/run_agent") async def run_agent(query: str): """ 启动 Agent 任务的 HTTP 端点。 """ try: # 在后台启动 Agent 任务,不阻塞当前请求 asyncio.create_task(agent_executor.execute_graph(query)) return {"message": "Agent 任务已启动,请连接 /events 端点获取进度。"} except Exception as e: return {"error": str(e)}, 500 # 启动 FastAPI: uvicorn main:app --reload

前端可以使用 JavaScript 的EventSourceAPI 连接/events端点:

// 简单的前端示例(HTML/JS) const eventSource = new EventSource('/events'); const logDiv = document.getElementById('log'); eventSource.onmessage = function(event) { const eventData = JSON.parse(event.data); // 在这里处理并展示 eventData logDiv.innerHTML += `<p><strong>[${eventData.node_name || eventData.event_type}]</strong> ${eventData.message || ''} - ${JSON.stringify(eventData.payload?.data || {})}</p>`; logDiv.scrollTop = logDiv.scrollHeight; // 滚动到底部 }; eventSource.onerror = function(err) { console.error("EventSource failed:", err); eventSource.close(); }; function runAgent() { const query = document.getElementById('queryInput').value; fetch(`/run_agent?query=${encodeURIComponent(query)}`, { method: 'POST' }) .then(response => response.json()) .then(data => console.log(data)) .catch(error => console.error('Error:', error)); }
4.3 使用 FastAPI 实现 WebSockets

WebSockets 提供了全双工的通信通道,这意味着客户端和服务器可以同时发送和接收消息。这对于需要用户干预 Agent 流程的场景非常有用,或者仅仅是为了更强大的实时性。

# main.py (FastAPI 应用 - WebSockets 部分) from fastapi import WebSocket, WebSocketDisconnect # ... (之前定义的 EventEmitter, GraphExecutionEvent, SimpleGraphExecutor) ... # 存储所有活跃的 WebSocket 连接 active_websocket_connections: List[WebSocket] = [] async def websocket_listener(event: GraphExecutionEvent): """ WebSocket 专用的事件监听器,将事件广播给所有连接的客户端。 """ # 广播事件给所有活跃的 WebSocket 连接 for connection in active_websocket_connections: try: await connection.send_text(event.json()) except RuntimeError as e: # 如果连接已经关闭,可能会抛出 RuntimeError print(f"Failed to send to WebSocket, connection might be closed: {e}") # 可以在这里移除失效的连接,但需要注意并发问题 except Exception as e: print(f"Error sending to WebSocket: {e}") global_event_emitter.on(websocket_listener) # 注册 WebSocket 监听器 @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() active_websocket_connections.append(websocket) print(f"WebSocket client connected: {websocket.client}") try: while True: # 客户端可以发送消息来控制 Agent,例如暂停、取消 message = await websocket.receive_text() print(f"Received message from WebSocket client: {message}") # 这里可以解析 message,并触发 Agent 的控制逻辑 # 例如:if message == "cancel": agent_executor.cancel_current_task() await websocket.send_text(f"Server received: {message}") # 简单回复 except WebSocketDisconnect: print(f"WebSocket client disconnected: {websocket.client}") except Exception as e: print(f"WebSocket error: {e}") finally: active_websocket_connections.remove(websocket) print(f"WebSocket client removed: {websocket.client}") # run_agent 保持不变,它会触发事件,然后由 websocket_listener 广播出去

前端可以使用 JavaScript 的WebSocketAPI 连接/ws端点:

// 简单的前端示例(HTML/JS) const websocket = new WebSocket('ws://localhost:8000/ws'); // 替换为你的服务器地址 const wsLogDiv = document.getElementById('ws-log'); websocket.onopen = function(event) { console.log("WebSocket connection opened."); wsLogDiv.innerHTML += `<p><em>WebSocket 连接已建立。</em></p>`; }; websocket.onmessage = function(event) { const eventData = JSON.parse(event.data); wsLogDiv.innerHTML += `<p><strong>[${eventData.node_name || eventData.event_type}]</strong> ${eventData.message || ''} - ${JSON.stringify(eventData.payload?.data || {})}</p>`; wsLogDiv.scrollTop = wsLogDiv.scrollHeight; }; websocket.onclose = function(event) { console.log("WebSocket connection closed:", event); wsLogDiv.innerHTML += `<p><em>WebSocket 连接已关闭。</em></p>`; }; websocket.onerror = function(error) { console.error("WebSocket Error:", error); wsLogDiv.innerHTML += `<p style="color:red;"><em>WebSocket 错误: ${error.message}</em></p>`; }; function runAgentWS() { const query = document.getElementById('queryInputWS').value; fetch(`/run_agent?query=${encodeURIComponent(query)}`, { method: 'POST' }) .then(response => response.json()) .then(data => console.log(data)) .catch(error => console.error('Error:', error)); } // 客户端也可以发送消息给服务器 function sendToAgent(message) { if (websocket.readyState === WebSocket.OPEN) { websocket.send(message); } else { console.warn("WebSocket is not open."); } }

在实际生产环境中,WebSocket 的连接管理会更加复杂,需要处理重连、心跳包、认证授权等问题。

5. Agent 思考进度的粒度控制与数据呈现

渐进式揭示的质量,很大程度上取决于我们如何控制事件的粒度以及事件中包含的数据。

5.1 粒度控制
  1. 粗粒度(Node-level):

    • 何时发出:节点开始执行(NODE_START)、节点完成执行(NODE_END)、节点失败(ERROR)。
    • 内容:节点名称、状态、耗时、简要结果或错误信息。
    • 优点:信息量适中,易于理解整体流程。
    • 适用场景:概览 Agent 进度条,高亮当前活动节点。
  2. 中粒度(Tool-level / LLM Interaction-level):

    • 何时发出:Agent 决定调用某个工具(TOOL_CALL)、工具执行完成并返回结果(TOOL_RESULT)、LLM 收到输入(LLM_INPUT)、LLM 生成输出(LLM_OUTPUT)。
    • 内容:工具名称、参数、返回结果;LLM 提示词、响应。
    • 优点:揭示 Agent 的具体行动和思考过程。
    • 适用场景:展示 Agent 的“思考气泡”,工具调用日志。
  3. 细粒度(Token-level LLM Streaming):

    • 何时发出:LLM 在生成响应时,每生成一个 token 就发出一个事件(NODE_PROGRESS或专门的LLM_TOKEN)。
    • 内容:当前生成的 token。
    • 优点:提供极致的实时性,用户几乎可以同步看到 Agent 的“打字”过程。
    • 适用场景:聊天界面中 Agent 响应的流式显示。

在我们的GraphExecutionEvent中,event_typepayload字段就是为了支持这种多粒度设计。例如,LLM_INPUTLLM_OUTPUT提供了 LLM 交互的中间粒度信息。

实现 LLM Token 流式输出的示例:

假设你正在使用一个支持流式输出的 LLM 客户端(如 OpenAI 的client.chat.completions.create(stream=True))。你可以在LLMNode内部这样处理:

# LLMNode 示例,假设 LLM 客户端支持流式输出 class LLMGenerationNode(BaseNode): def __init__(self, emitter: EventEmitter, llm_client): super().__init__(node_id="node_llm_gen", node_name="LLM生成响应", emitter=emitter) self.llm_client = llm_client async def execute(self, graph_id: str, input_data: Dict[str, Any]) -> Dict[str, Any]: prompt = input_data.get("prompt", "请生成一个通用响应。") await self._emit_event(GraphEventType.NODE_START, graph_id, status="running", message="开始调用LLM生成响应") await self._emit_event(GraphEventType.LLM_INPUT, graph_id, payload_data={"prompt": prompt}) full_response_content = [] try: # 假设 llm_client.chat_stream 是一个异步生成器,返回 token async for chunk in self.llm_client.chat_stream(prompt): # 模拟一个流式LLM客户端 token = chunk.get("token", "") # 假设 chunk 包含 token if token: full_response_content.append(token) await self._emit_event( GraphEventType.NODE_PROGRESS, # 或者定义一个 LLM_TOKEN 事件类型 graph_id, message="LLM 正在生成...", payload_data={"token": token} ) # 模拟 LLM 思考 await asyncio.sleep(0.05) final_response = "".join(full_response_content) await self._emit_event(GraphEventType.LLM_OUTPUT, graph_id, payload_data={"response": final_response}) await self._emit_event(GraphEventType.NODE_END, graph_id, status="success", message="LLM响应生成完成") return {"llm_response": final_response} except Exception as e: await self._emit_event(GraphEventType.ERROR, graph_id, status="failed", message=f"LLM调用失败: {str(e)}") raise
5.2 数据呈现策略 (前端视角)

虽然这是后端讲座,但了解数据如何被消费有助于我们设计更好的事件。

  1. 实时日志流:最简单直观的方式,按照时间顺序显示所有事件。用户可以看到每个步骤的详细信息。
  2. 动态图可视化:可以在前端渲染一个 Agent 图,并根据NODE_STARTNODE_END事件高亮显示当前正在执行的节点,或者用不同颜色表示节点状态(进行中、成功、失败)。
  3. “思考气泡”/“草稿纸”:专门的区域显示 Agent 的 LLM 提示词、内部思考过程、工具调用及结果,模拟 Agent 的“内心独白”。
  4. 进度条/状态指示:整体任务的进度条,或者每个节点旁边的加载动画,提供即时视觉反馈。
  5. 结构化输出区:当 Agent 最终生成答案时,将其清晰地展示在指定区域。

6. 高级主题与最佳实践

6.1 错误处理与重试策略

在渐进式揭示中,错误是不可避免的。当节点执行失败时:

  • 发出ERROR事件:包含详细的错误信息、堆栈追踪(如果安全允许)、发生错误的节点 ID。
  • 图的健壮性:可以设计图执行器,在某些节点失败时,尝试回退、重试,或跳过,并发出相应的事件。
  • 用户反馈:前端应清晰地展示错误信息,并引导用户进行修正或重试。
6.2 持久化与回放

为了调试、审计和用户体验,将事件流持久化非常重要。

  • 存储:GraphExecutionEvent存储到数据库(如 PostgreSQL, MongoDB)或日志系统(如 Elasticsearch)。
  • 回放:允许用户或开发者加载历史事件流,并在前端“回放”Agent 的思考过程,这对于理解复杂 Bug 或展示 Agent 能力非常有帮助。
6.3 安全性与敏感信息过滤

Agent 的中间思考过程可能包含用户数据、API 密钥片段(如果处理不当)、内部系统路径等敏感信息。

  • 数据过滤:在事件发出前,对payload中的数据进行严格过滤和脱敏。
  • 权限控制:只有授权用户才能查看完整的、未经脱敏的事件流。
  • 传输加密:使用 HTTPS 和 WSS 确保数据在传输过程中的安全。
6.4 可扩展性与事件总线

在大型分布式 Agent 系统中,单个EventEmitter可能会成为瓶颈。

  • 消息队列:引入 Kafka、RabbitMQ 等消息队列作为事件总线。Agent 的各个服务将事件发布到队列,监听服务从队列消费事件并推送给客户端。这提供了高度解耦和可伸缩性。
  • 服务注册与发现:动态管理事件监听器和 Agent 服务。
6.5 用户干预与交互

渐进式揭示的终极目标之一是允许用户在 Agent 思考的关键节点进行干预。

  • 暂停/继续:用户可以通过前端发送命令,暂停 Agent 的执行,检查当前状态,然后决定继续或修改。
  • 修改计划:在 Agent 规划阶段,用户可以审查计划并提出修改建议。
  • 工具选择确认:在 Agent 决定调用某个工具前,征求用户确认。
  • 错误纠正:当 Agent 遇到错误时,用户可以直接提供修正方案。

这需要双向通信(如 WebSocket)和 Agent 内部状态管理机制,使其能够响应外部指令并调整行为。

总结

渐进式揭示不仅仅是一种技术实现,它更是一种设计哲学,旨在将智能体从“黑盒”转变为“透明盒”。通过在 Agent 图执行的关键节点注入可观测性,并利用实时通信机制将这些洞察传递给用户,我们不仅极大地提升了用户体验,减少了等待焦虑,更建立了用户对 Agent 的信任。同时,它也为开发者提供了强大的调试和审计工具,让复杂 Agent 系统的开发和维护变得更加可控。掌握并实践渐进式揭示,是构建下一代智能、透明、用户友好型 AI 系统的必由之路。

感谢大家的聆听!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1119671.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

直接上干货!这个通信信号调制识别数据集生成工具能让你摆脱数据荒,咱们从核心代码开始拆解。先看信号生成器的核心逻辑

通信信号调制识别所用数据集生成代码 Matlab自动生成数据集&#xff0c;打标签&#xff0c;绘制不同训练策略和不同训练样本数量的对比曲线图&#xff0c;可以绘制模型在测试集上的虚警率&#xff0c;精确率和平均误差。 可以绘制不同信噪比下测试集各个参数的直方图。 注释非常…

深入 ‘Steering the Agent’:利用输入反馈实时改变正在运行中的 Graph 权重,实现‘人机共驾’

尊敬的各位技术同仁&#xff0c;大家好&#xff01;今天&#xff0c;我们将深入探讨一个激动人心的主题——“Steering the Agent”&#xff0c;特别是如何利用实时输入反馈来动态调整正在运行中的图&#xff08;Graph&#xff09;的权重&#xff0c;最终实现真正意义上的“人机…

基于SpringBoot和Vue的公司文档档案借阅管理系统设计与开发应用和研究

文章目录摘要项目简介大数据系统开发流程主要运用技术介绍爬虫核心代码展示结论源码文档获取定制开发/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;摘要 公司文档档案借阅管理系统基于SpringBoot和Vue技术栈开发&#xff0c;旨在提升企业文档管理…

LangGraph之State的定义

在 LangGraph&#xff08;LangChain 生态中的一个用于构建状态机和有向无环图工作流的库&#xff09;中&#xff0c;State&#xff08;状态&#xff09; 是整个工作流的核心数据结构。它用于在节点&#xff08;Node&#xff09;之间传递信息、维护上下文&#xff0c;并驱动整个…

【后端开发面试高频场景题设计题】深度解析(万字干货)| 面试通关必备

文章目录目录一、 前言&#xff1a;场景题&设计题的面试考察逻辑二、 高频场景题深度解析2.1 缓存三大问题&#xff1a;穿透、击穿、雪崩&#xff08;面试最高频&#xff09;问题描述分析思路参考答案面试考察点面试追问2.2 分布式事务的解决方案及适用场景问题描述分析思路…

基于MATLAB Simulink Simscape的倒立摆仿真控制器文档详解

MATLAB倒立摆仿真 simulink simscape 控制器 有文档刚上手倒立摆仿真时总觉得这玩意儿像在钢丝上跳舞——明明物理模型不复杂&#xff0c;但控制器稍微不听话整个系统就翻车。好在MATLAB的SimulinkSimScape组合给咱们配了把瑞士军刀&#xff0c;今天咱们边拆解边实操。先打开Si…

c盘红了怎么清理垃圾而不误删,教您一套安全又效率的清理方法!

“这是怎么回事啊&#xff1f;我的电脑C盘怎么爆满了&#xff1f;我记得自己没往C盘放过东西啊&#xff1f;怎么自己就红了啊&#xff1f;我想自己清理一下C盘&#xff0c;但是又不知道该从哪里入手&#xff0c;害怕删错了东西&#xff0c;那可就完犊子了&#xff0c;有谁知道C…

web自动化测试窗口框架与验证码登录处理

前言 selenium的作用域切换 selenium在处理元素时遇见新窗口、网页嵌套网页、网页的原生弹窗&#xff0c;无法进行直接处理作用域里元素的内容&#xff0c;需要通过切换作用域来处理此类问题。 selenium三种作用域切换&#xff1a; ①、window窗口切换 ②、iframe切换 ③、al…

探秘AI应用架构师的智能营销AI决策系统数据分析能力

探秘AI应用架构师的智能营销AI决策系统数据分析能力 1. 引入与连接:智能营销的变革与数据分析的核心作用 1.1 开场故事:营销困境与AI破局 场景: 2023年,某快消品牌市场总监李明正面临一个典型的营销困境——公司投入了数百万营销预算,却无法准确追踪哪些渠道带来了实际…

编程语言最核心的方面是什么?

编程语言最核心的区分要素及原理 编程语言的核心区别主要体现在以下几个方面&#xff0c;每个方面都有其独特的机制和原理&#xff1a; 一、核心区分要素 1. 编程范式 这是最根本的区别&#xff0c;决定语言如何组织和表达逻辑。 实例对比&#xff1a; # Python&#xff08;多范…

rdd的持久化

在Apache Spark中&#xff0c;RDD&#xff08;弹性分布式数据集&#xff09;的持久化&#xff08;Persistence&#xff09;是一种优化技术&#xff0c;用于将RDD的计算结果存储在内存或磁盘中&#xff0c;避免重复计算。以下是关键要点&#xff1a;核心作用避免重复计算&#x…

[Windows] 局域网共享精灵v2025.11.10绿色版

[Windows] 局域网共享精灵v2025.11.10绿色版 链接&#xff1a;https://pan.xunlei.com/s/VOiI2bKifFbU2d-SbBTjWrfPA1?pwdpsbj# 局域网共享精灵是一款Windows环境下助力于局域网环境文件共享和打印机共享,帮助您快捷高效的在局域网内实现文件共享和打印机共享的操作&#xf…

强烈安利!继续教育必用TOP10 AI论文工具测评

强烈安利&#xff01;继续教育必用TOP10 AI论文工具测评 2026年继续教育AI论文工具测评&#xff1a;为何需要这份权威榜单 在当前学术研究日益数字化的背景下&#xff0c;继续教育群体面临着前所未有的挑战。无论是撰写高质量论文&#xff0c;还是高效完成科研任务&#xff0c;…

介电强度试验仪解决材料在高压环境下的绝缘性能评估问题

介电强度试验仪主要解决材料在高压环境下的绝缘性能评估问题&#xff0c;具体包括以下几个方面&#xff1a;1. ‌评估材料的绝缘性能‌核心功能‌&#xff1a;通过施加直流或交流电压&#xff0c;模拟高压环境&#xff0c;测试材料在电场作用下的击穿电压&#xff0c;从而评估其…

UTS API插件,助力uniapp开发者快速实现人脸识别活体检测

HelloKitty-FaceAIFaceAI人脸识别&#xff0c;活体检测UTS API插件&#xff0c;支持iOS&#xff0c;Android 双端&#xff0c;助力uniapp开发者快速实现人脸识别活体检测。 后面我们会支持主题色定制等功能&#xff0c;更多可根据原生工程项目修改升级插件原生工程&#xff1a;…

【Linux命令大全】003.文档编辑之nl命令(实操篇)

【Linux命令大全】003.文档编辑之nl命令&#xff08;实操篇&#xff09; ✨ 本文为Linux系统文档编辑与文本处理命令的全面汇总与深度优化&#xff0c;结合图标、结构化排版与实用技巧&#xff0c;专为高级用户和系统管理员打造。 (关注不迷路哈&#xff01;&#xff01;&#…

【Linux命令大全】003.文档编辑之od命令(实操篇)

【Linux命令大全】003.文档编辑之od命令&#xff08;实操篇&#xff09; ✨ 本文为Linux系统文档编辑与文本处理命令的全面汇总与深度优化&#xff0c;结合图标、结构化排版与实用技巧&#xff0c;专为高级用户和系统管理员打造。 (关注不迷路哈&#xff01;&#xff01;&#…

小迪安全2023-2024|第12天-扩展整理:信息打点-Web应用企业产权指纹识别域名资产网络空间威胁情报_笔记|web安全|渗透测试|网络安全_2023-2024

小迪安全2023-2024|第12天&#xff1a;信息打点-Web应用&企业产权&指纹识别&域名资产&网络空间&威胁情报_笔记&#xff5c;web安全&#xff5c;渗透测试&#xff5c;网络安全_2023-2024 一、信息打点概述 在渗透测试和安全评估中&#xff0c;信息收集是整个…

【用友U8cloud】修改Server和Data Source 访问IP地址

访问路径 C:\U8CloudCERP5.1\U8CERP\bin 运行u8SysConfig配置修改完成后&#xff0c;点击保存 运行启动U8cloud这块启动时间稍微长一些&#xff0c;配置好的电脑可能1-3分钟左右&#xff0c;配置不好的可能更长

ITSM 现代化实践与智能化趋势:从传统运维到数智化服务的演进

摘要如今&#xff0c;IT 服务管理&#xff08;ITSM&#xff09;已不再仅仅是 IT 部门的内部流程规范&#xff0c;而是企业构建高效、稳定、可持续服务体系的核心支撑。随着云计算、大模型及自动化技术的爆发&#xff0c;传统的“流程驱动”模式正在向“价值驱动”与“智能驱动”…