.NET+AI | Workflow | 核心概念速通(1)

news/2026/1/20 20:37:32/文章来源:https://www.cnblogs.com/sheng-jie/p/19508743

MAF Workflow 核心概念详解

📚 本课概览

Microsoft Agent Framework (MAF) 提供了一套强大的 Workflow(工作流) 框架,用于编排和协调多个智能体(Agent)或处理组件的执行流程。

本课将以通俗易懂的方式,帮助你理解 MAF Workflow 的核心概念:

概念 说明 类比
Workflow 工作流定义 📋 流程图
Executor 执行器/处理节点 🔧 工人
Edge 连接边 ➡️ 传送带
SuperStep 超步/批量处理 ⏱️ 处理周期
WorkflowContext 工作流上下文 📦 工作台
WorkflowEvent 工作流事件 📣 通知
Run 运行实例 🏃 一次执行
Checkpoint 检查点 💾 存档点

让我们逐一深入了解这些概念!

🎯 为什么需要 Workflow?

在构建 AI 智能体应用时,我们经常需要:

  1. 多步骤处理:用户请求需要经过多个处理环节
  2. 多智能体协作:不同的 Agent 各司其职,协同完成任务
  3. 条件分支:根据处理结果决定下一步走向
  4. 并行处理:某些任务可以同时进行以提高效率
  5. 状态管理:需要在处理过程中保存和恢复状态

传统方式的问题:

// ❌ 硬编码的流程,难以维护和扩展
var result1 = await agent1.ProcessAsync(input);
if (result1.Success)
{var result2 = await agent2.ProcessAsync(result1.Data);// ... 越来越复杂
}

使用 Workflow 的优势:

// ✅ 声明式定义,清晰可维护
var workflow = new WorkflowBuilder(startExecutor).AddEdge(executor1, executor2).AddEdge(executor2, executor3, condition: x => x.Success).Build();


一、核心概念:Executor(执行器)

🔧 什么是 Executor?

Executor(执行器) 是 Workflow 中的最小工作单元,类似于:

类比 说明
🧑‍🏭 工厂里的工人 每个工人负责一道工序
🧱 乐高积木块 每个积木有特定功能,组合成整体
🔌 电路中的元件 接收输入信号,输出处理结果

📌 Executor 的核心特征

  1. 唯一标识(Id):每个 Executor 有一个唯一的 ID,用于在 Workflow 中引用
  2. 消息处理:接收特定类型的输入消息,处理后产生输出消息
  3. 路由配置:通过 ConfigureRoutes 方法定义能处理哪些类型的消息
  4. 状态感知:可以通过 IWorkflowContext 访问和修改工作流状态

🏗️ Executor 的类型层次

MAF 提供了多种 Executor 类型,满足不同场景需求:

类型 用途 示例场景
Executor<TInput> 处理消息,无返回值 日志记录、通知发送
Executor<TInput, TOutput> 处理消息,有返回值 数据转换、AI 调用
FunctionExecutor<T> 用委托函数快速创建 简单处理逻辑
StatefulExecutor<TState> 需要维护状态的执行器 会话管理、计数器

💡 Executor 源码解析

从源码中可以看到 Executor 的核心设计:

// 来自 Executor.cs
public abstract class Executor : IIdentified
{// 唯一标识符public string Id { get; }// 配置消息路由(子类必须实现)protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);// 执行消息处理public async ValueTask<object?> ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default){// 记录调用事件await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message));// 路由消息到正确的处理器CallResult? result = await this.Router.RouteMessageAsync(message, context, ...);// 记录完成或失败事件ExecutorEvent executionResult = result?.IsSuccess is not false? new ExecutorCompletedEvent(this.Id, result?.Result): new ExecutorFailedEvent(this.Id, result.Exception);await context.AddEventAsync(executionResult);return result.Result;}
}

关键点

  • ✅ 每个 Executor 有唯一 ID
  • ✅ 通过 ConfigureRoutes 声明能处理的消息类型
  • ✅ 执行过程会产生事件(ExecutorInvokedEventExecutorCompletedEvent 等)

二、核心概念:Edge(边)

➡️ 什么是 Edge?

Edge(边) 是连接两个 Executor 的消息通道,类似于:

类比 说明
🏭 工厂传送带 把上一道工序的产品传送到下一道工序
🔗 水管 把水从一个容器引导到另一个容器
📨 邮路 把信件从发件人送到收件人

📌 Edge 的三种类型

MAF 支持三种类型的 Edge,适用于不同的流程模式:

类型 说明 使用场景
Direct(直连) 一对一连接 顺序处理流程
FanOut(扇出) 一对多连接 并行分发任务
FanIn(扇入) 多对一连接 汇聚多个结果

💡 Edge 源码解析

从源码可以看到 Edge 的核心结构:

// 来自 Edge.cs
public enum EdgeKind
{Direct,   // 直连:一对一FanOut,   // 扇出:一对多FanIn     // 扇入:多对一
}public sealed class Edge
{public EdgeKind Kind { get; init; }    // 边的类型public EdgeData Data { get; init; }    // 边的具体数据
}// 来自 DirectEdgeData.cs - 直连边的数据
public sealed class DirectEdgeData : EdgeData
{public string SourceId { get; }        // 源 Executor IDpublic string SinkId { get; }          // 目标 Executor IDpublic Func<object?, bool>? Condition; // 可选的条件判断
}

Direct Edge 示意图

关键点

  • ✅ Edge 定义了消息从哪里来、到哪里去
  • ✅ Direct Edge 支持条件路由(只有满足条件的消息才传递)
  • ✅ FanOut Edge 可以实现广播分区逻辑

三、核心概念:Workflow(工作流)

📋 什么是 Workflow?

Workflow(工作流) 是将多个 Executor 通过 Edge 连接起来的完整流程定义,类似于:

类比 说明
📋 流程图 定义了从开始到结束的完整流程
🏭 生产线 多个工位通过传送带连接成完整生产线
🎼 乐谱 规定了演奏的顺序和节奏

📌 Workflow 的核心属性

从源码中可以看到 Workflow 的核心结构:

// 来自 Workflow.cs
public class Workflow
{// 起始 Executor 的 IDpublic string StartExecutorId { get; }// 工作流名称(可选)public string? Name { get; internal init; }// 工作流描述(可选)public string? Description { get; internal init; }// Executor 绑定字典internal Dictionary<string, ExecutorBinding> ExecutorBindings { get; init; }// 边的集合(按源节点分组)internal Dictionary<string, HashSet<Edge>> Edges { get; init; }// 输出 Executor 集合internal HashSet<string> OutputExecutors { get; init; }
}

🔨 使用 WorkflowBuilder 构建工作流

MAF 采用 建造者模式(Builder Pattern) 来构建 Workflow,这使得工作流的定义更加直观:

// 创建工作流示例
var workflow = new WorkflowBuilder(startExecutor)  // 指定起点.WithName("订单处理工作流")                      // 设置名称.WithDescription("处理电商订单的完整流程")       // 设置描述.AddEdge(receiveOrder, validateOrder)           // 添加边.AddEdge(validateOrder, processPayment, condition: x => x.IsValid)             // 条件边.AddEdge(processPayment, sendNotification).WithOutputFrom(sendNotification)               // 指定输出节点.Build();                                       // 构建工作流

关键方法

方法 说明
AddEdge(source, sink) 添加直连边
AddEdge(..., condition) 添加条件边
AddFanOut(source, sinks) 添加扇出边
AddFanIn(sources, sink) 添加扇入边
WithOutputFrom(executor) 指定输出节点
BindExecutor(executor) 绑定占位符执行器
Build() 构建最终的 Workflow

四、核心概念:SuperStep(超步)

⏱️ 什么是 SuperStep?

SuperStep(超步) 是 Workflow 执行的基本处理周期。可以类比为:

类比 说明
🎮 游戏中的"回合" 每个回合内所有玩家同时行动
⏰ 工厂的"班次" 每个班次内完成一批任务
🌊 海浪的"一波" 一波消息被处理,然后产生下一波

📌 SuperStep 的执行流程

每个 SuperStep 内部执行的步骤:

关键事件

  • SuperStepStartedEvent:超步开始
  • SuperStepCompletedEvent:超步完成
// SuperStep 事件定义
public class SuperStepEvent(int stepNumber, object? data = null) : WorkflowEvent(data)
{// 超步的序号(从 0 开始)public int StepNumber => stepNumber;
}

五、核心概念:WorkflowContext(工作流上下文)

📦 什么是 WorkflowContext?

WorkflowContext(工作流上下文) 是 Executor 执行时的运行环境,类似于:

类比 说明
🛠️ 工人的工作台 提供工具、材料和通信渠道
📞 通信枢纽 允许各个工位之间传递信息
💾 共享内存 存储和读取状态数据

📌 IWorkflowContext 核心接口

// 来自 IWorkflowContext.cs
public interface IWorkflowContext
{// 添加工作流事件(在当前 SuperStep 结束时触发)ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);// 发送消息给下游 Executor(在下一个 SuperStep 处理)ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default);// 输出工作流结果ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default);// 请求在当前 SuperStep 结束时停止工作流ValueTask RequestHaltAsync();// 读取状态ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null, CancellationToken cancellationToken = default);// 读取或初始化状态ValueTask<T> ReadOrInitStateAsync<T>(string key, Func<T> initialStateFactory, string? scopeName = null, CancellationToken cancellationToken = default);// 更新状态(排队更新,在 SuperStep 结束时应用)ValueTask QueueStateUpdateAsync<T>(string key, T value, string? scopeName = null, CancellationToken cancellationToken = default);
}

关键点

  • 消息传递:通过 SendMessageAsync 在 Executor 之间传递消息
  • 状态管理:支持读取、初始化和更新状态
  • 事件通知:通过 AddEventAsync 发出事件
  • 流程控制:通过 RequestHaltAsync 停止工作流

六、核心概念:WorkflowEvent(工作流事件)

📣 什么是 WorkflowEvent?

WorkflowEvent(工作流事件) 是工作流执行过程中产生的通知消息,类似于:

类比 说明
📢 广播通知 向所有人广播系统状态变化
📝 日志记录 记录系统执行过程中的关键节点
🔔 事件订阅 允许外部监听并响应特定事件

📌 事件分类

事件层级 事件类型 说明
工作流级别 WorkflowStartedEvent 工作流开始执行
WorkflowOutputEvent 工作流产生输出
WorkflowErrorEvent 工作流发生错误
WorkflowWarningEvent 工作流产生警告
超步级别 SuperStepStartedEvent 超步开始
SuperStepCompletedEvent 超步完成
执行器级别 ExecutorInvokedEvent Executor 被调用
ExecutorCompletedEvent Executor 完成处理
ExecutorFailedEvent Executor 处理失败

七、核心概念:Run(运行实例)

🏃 什么是 Run?

Run(运行实例) 是 Workflow 的一次具体执行,类似于:

类比 说明
🎬 电影的一场放映 同一部电影可以放映多场
🏭 生产线的一个批次 同一条生产线可以生产多个批次
🎮 游戏的一局 同一个游戏可以玩多局

📌 Run 的核心特性

// 来自 Run.cs
public sealed class Run : IAsyncDisposable
{// 运行实例的唯一标识符public string RunId => this._runHandle.RunId;// 获取当前执行状态public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default);// 获取所有产生的事件public IEnumerable<WorkflowEvent> OutgoingEvents => this._eventSink;// 获取自上次访问后的新事件public IEnumerable<WorkflowEvent> NewEvents { get; }// 恢复执行(带外部响应)public async ValueTask<bool> ResumeAsync(IEnumerable<ExternalResponse> responses, CancellationToken cancellationToken = default);
}

📌 RunStatus(运行状态)

public enum RunStatus
{NotStarted,      // 尚未开始Idle,            // 空闲(已暂停,无待处理请求)PendingRequests, // 等待外部响应Ended,           // 已结束Running          // 正在运行
}


八、核心概念:Checkpoint(检查点)

💾 什么是 Checkpoint?

Checkpoint(检查点) 是工作流在某个时刻的完整状态快照,类似于:

类比 说明
🎮 游戏存档 保存游戏进度,随时可以读档继续
📸 照片 记录某一时刻的完整状态
🔖 书签 标记阅读进度,下次从这里继续

📌 Checkpoint 的核心信息

// 来自 CheckpointInfo.cs
public sealed class CheckpointInfo
{// 运行实例的唯一标识符public string RunId { get; }// 检查点的唯一标识符public string CheckpointId { get; }
}// 检查点的完整数据(来自 Checkpoint.cs)
internal sealed class Checkpoint
{public int StepNumber { get; }                                    // 超步编号public WorkflowInfo Workflow { get; }                             // 工作流信息public RunnerStateData RunnerData { get; }                        // 运行器状态public Dictionary<ScopeKey, PortableValue> StateData { get; }     // 状态数据public Dictionary<EdgeId, PortableValue> EdgeStateData { get; }   // 边状态数据public CheckpointInfo? Parent { get; }                            // 父检查点
}

📌 Checkpoint 的使用场景

场景 说明
故障恢复 系统崩溃后从最近的检查点恢复
长时间运行 分段执行,每段结束保存进度
人机交互 等待用户输入时保存状态
调试回放 从任意检查点重新执行
版本分支 从同一个检查点创建多个分支执行


九、核心概念关系图

🔗 概念之间的关系

让我们把所有核心概念联系起来,看看它们是如何协作的:

📌 生命周期视角

从工作流的完整生命周期来看:

📌 消息流视角

从消息在工作流中的流动来看:

关键理解

  1. 消息驱动:Executor 之间通过消息传递数据
  2. 异步批处理:同一 SuperStep 内的 Executor 可以并行执行
  3. 边控制流向:Edge 决定消息从哪里到哪里
  4. 状态隔离:每个 SuperStep 结束时应用状态更新

十、实际应用示例

🛒 场景:电商订单处理工作流

让我们通过一个实际场景来理解这些概念的应用:

📌 概念对应关系

概念 在此场景中的体现
Workflow 整个订单处理流程
Executor 每个处理步骤(接收、验证、支付等)
Edge 步骤之间的连接(含条件判断)
SuperStep 每一轮处理(如:超步1处理接收,超步2处理验证...)
WorkflowContext 提供订单状态读写、消息发送能力
WorkflowEvent 每个步骤的开始/完成/失败事件
Run 一个具体订单的处理过程
Checkpoint 处理中途保存的状态(如支付完成后保存)

🤖 场景:多智能体协作工作流

在 AI Agent 场景中,Workflow 可以用来编排多个 Agent 的协作:

Workflow 的优势

优势 说明
🔄 灵活编排 可以轻松调整 Agent 之间的协作关系
💾 状态管理 自动管理各 Agent 的状态和上下文
⏸️ 可中断/恢复 支持人机交互,随时暂停和恢复
🔍 可观测性 通过事件追踪整个执行过程
🛡️ 容错能力 通过检查点支持故障恢复

十一、概念总结

📚 核心概念速查表

概念 定义 关键类 核心职责
Executor 执行器/处理节点 Executor, Executor<T>, FunctionExecutor<T> 处理消息,产生输出
Edge 连接边 Edge, EdgeData, DirectEdgeData 定义消息流向和条件
Workflow 工作流定义 Workflow, WorkflowBuilder 组织 Executor 和 Edge
SuperStep 超步/批量处理周期 SuperStepEvent, SuperStepStartedEvent 批量处理消息
WorkflowContext 工作流上下文 IWorkflowContext 提供运行时服务
WorkflowEvent 工作流事件 WorkflowEvent, ExecutorEvent 通知执行状态
Run 运行实例 Run, RunStatus 管理一次执行
Checkpoint 检查点 CheckpointInfo, ICheckpointStore 保存和恢复状态

📌 记忆口诀

"工作流程边连接,执行器来做处理"
"超步批量往前走,上下文中读写态"
"事件通知全过程,运行实例跑一次"
"检查点来存进度,随时恢复不怕挂"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1190695.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MyBatis的二级缓存

二级缓存是SqlSessionFactory级别&#xff0c;通过同一个SqlSessionFactory创建的SqlSession查询得的结果会被缓存&#xff0c;此后若再次执行相同的查询语句&#xff0c;结果就会从缓存中获取 二级缓存开启的条件 在核心配置文件中&#xff0c;设置全局配置属性cacheEnabled&q…

为什么 IO 流通常只能被读取一次

IO 流只能读取一次,是 精心设计的,贴合操作系统文件 / 网络 IO 的 "顺序消费" 特性,保持和底层系统的一致性。今天我们来一起探讨下 为什么 IO 流通常只能被读取一次? 我为什么会发出这个疑问呢?是因为…

【总结】说课的语言风格

根据所提供的多份高中信息技术说课逐字稿内容,可以归纳出其在语言表达上具有以下鲜明特点。这些特点既体现了教师专业表达的规范性,也反映了当前基础教育领域对“教学设计可视化”“素养导向”和“学生中心”理念的语…

第六天|454.四数相加II 383. 赎金信 15. 三数之和 18. 四数之和

第六天|454.四数相加II 383. 赎金信 15. 三数之和 18. 四数之和 454.四数相加II 第454题.四数相加II | 代码随想录 学透哈希表,map使用有技巧!LeetCode:454.四数相加II_哔哩哔哩_bilibili 笔记 通过单独遍历两个…

2026年比较好的酶解海藻液,纯酶解海藻,高浓度酶解海藻厂家选购选型手册 - 品牌鉴赏师

引言在农业现代化进程中,酶解海藻液凭借其独特的优势,在提高农作物产量、改善农产品品质等方面发挥着重要作用。为了帮助广大用户在众多的酶解海藻厂家中做出更优选择,我们依据国内相关行业协会公开的数据以及权威白…

天然蛋白vs重组蛋白:核心差异、应用选择与质量控制全解析

天然蛋白与重组蛋白是现代生命科学研究与生物技术应用中的两大核心物质基础。它们虽然在最终功能上可能相似,但在来源、制备路径、分子特性及应用指向性上存在根本性差异。明确理解二者的区别,对于科研实验设计、数据…

1/17考试总结

前言 哼啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊 T1 没考虑完所有情况。 没想到正解,括号匹配是复习了的。 时间安排不是很合理,调了1h15min。 多练练思维。 T2 我用记忆化补的,dp[i][j]表示当剩下的礼物区间是 [L,…

scATAC Transformer 输入的token是什么,句子是什么?

对于 scATAC-seq(单细胞染色质可及性测序)数据,将其输入 Transformer 模型时,其 Token 和 句子 的定义与 scRNA-seq(如 scBERT)既有相似之处,也有显著的生物学差异。 在 scATAC-seq Transformer 模型(如 scATA…

HBase在大数据领域金融数据处理中的应用

HBase在大数据领域金融数据处理中的应用 关键词&#xff1a;HBase、大数据、金融数据处理、分布式存储、实时读写 摘要&#xff1a;本文主要探讨了HBase在大数据领域金融数据处理中的应用。首先介绍了相关背景知识&#xff0c;包括HBase的基本概念、金融数据处理的特点和需求。…

本人入住博客园啦 原CSDN昵称大Mod_abfun是本人

本人入住博客园啦 原CSDN昵称大Mod_abfun是本人这是我的CSDN主页接下来的内容会将大部分的博客迁移过来,如有之前搬运的文章,不算做侵权,但后续(2026年1月20日 20点31分后)出现文章的搬运将追究责任,搬运需要经过…

2026年诚信的立式混料机,连续螺带混料机,混料机厂家行业优选榜单 - 品牌鉴赏师

引言在2026年的工业领域,立式混料机、连续螺带混料机的市场竞争愈发激烈,众多混料机厂家如雨后春笋般涌现。为了给行业内的从业者、采购商等提供一个客观、公正、权威的选择参考,我们依据国内权威行业协会公开数据形…

上海智推时代对接指南:官方认证联系方式汇总 - 速递信息

在生成式 AI 重塑商业生态的今天,“被 AI 看见、被 AI 推荐” 已经成为企业生存与发展的核心命题。曾经,企业通过线下渠道拓展、传统广告投放就能打开市场;而如今,消费者的信息获取方式发生了根本性转变 —— 从主…

动态SQL(七)sql标签

sql标签 可以将常用的sql片段进行记录 需要用的时候直接引入即可 设置sql片段引用sql片段测试

上海智推时代官方联系方式:企业合作必备指南 - 速递信息

在生成式 AI 重塑商业生态的今天,“被 AI 看见、被 AI 推荐” 已经成为企业生存与发展的核心命题。曾经,企业通过线下渠道拓展、传统广告投放就能打开市场;而如今,消费者的信息获取方式发生了根本性转变 —— 从主…

2026年口碑好的高纯度壳寡糖,壳寡糖水溶肥,酶解壳寡糖厂家采购推荐指南 - 品牌鉴赏师

引言在2026年,随着农业现代化进程的加速以及对高品质农产品需求的不断增长,高纯度壳寡糖、壳寡糖水溶肥和酶解壳寡糖作为新型生物刺激素,在农业领域的应用愈发广泛。为了帮助广大采购商能够挑选到口碑好、质量优的相…

MyBatis的一级缓存

什么是缓存&#xff1f; 把当前查询出来的数据进行记录&#xff0c;下一次查询相同数据时&#xff0c;从缓存中去取&#xff0c;就不会重新访问数据库了 MyBatis的缓存分为一级缓存和二级缓存 一级缓存默认是开启的 缓存只针对查询功能有效 CacheMapperCacheMapper.xml测试 pac…

twonkyserver 目录遍历 (CVE-2018-7171)

get请求构造payload:/rpc/dir?path=查看010

MBA必看!10个高效降aigc工具推荐,轻松应对AI检测

MBA必看&#xff01;10个高效降aigc工具推荐&#xff0c;轻松应对AI检测 AI降重工具&#xff1a;高效应对论文查重难题 在当前的学术环境中&#xff0c;随着AI技术的广泛应用&#xff0c;论文的AIGC率逐渐成为高校和研究机构关注的重点。对于MBA学生而言&#xff0c;如何在保证…

技术面:MySQL篇(InnoDB的锁机制)

共享锁、排他锁、意向锁、记录锁、间隙锁、临键锁(Next Key Lock)、插入意向锁、AUTO-INC、悲观锁、乐观锁MySQL的InnoDB的锁机制 MySQL的InnoDB引擎下,在锁的级别上一般分为两种:共享锁(S锁)、排他锁(X锁) 共…