Spring AI Alibaba 项目源码学习(三)-Graph 执行流程分析

news/2025/11/12 22:32:14/文章来源:https://www.cnblogs.com/wasp520/p/19215689

Graph 执行流程分析

概述

本文档分析 spring-ai-alibaba-graph-core 模块中 Graph 的执行流程,包括执行器(Executor)、调度机制、Checkpoint 机制和状态管理。

入口类说明

GraphRunner - 执行入口

GraphRunner 是基于 Project Reactor 的响应式图执行引擎,是执行 Graph 的主要入口。

核心职责

  • 封装编译后的图和执行配置
  • 提供响应式执行接口(返回 Flux)
  • 委托给 MainGraphExecutor 执行实际流程

关键代码

public class GraphRunner {private final CompiledGraph compiledGraph;private final RunnableConfig config;private final AtomicReference<Object> resultValue = new AtomicReference<>();// Handler for main execution flow - demonstrates encapsulationprivate final MainGraphExecutor mainGraphExecutor;public GraphRunner(CompiledGraph compiledGraph, RunnableConfig config) {this.compiledGraph = compiledGraph;this.config = config;// Initialize the main execution handler - demonstrates encapsulationthis.mainGraphExecutor = new MainGraphExecutor();}public Flux<GraphResponse<NodeOutput>> run(OverAllState initialState) {return Flux.defer(() -> {try {GraphRunnerContext context = new GraphRunnerContext(initialState, config, compiledGraph);// Delegate to the main execution handler - demonstrates polymorphismreturn mainGraphExecutor.execute(context, resultValue);}catch (Exception e) {return Flux.error(e);}});}

MainGraphExecutor - 主执行器

MainGraphExecutor 是主执行流程处理器,继承自 BaseGraphExecutor,负责协调整个图的执行。

核心职责

  • 处理开始节点和结束节点
  • 管理迭代次数和中断逻辑
  • 协调节点执行和边路由
  • 处理 Checkpoint 和恢复

关键代码

public class MainGraphExecutor extends BaseGraphExecutor {private final NodeExecutor nodeExecutor;public MainGraphExecutor() {this.nodeExecutor = new NodeExecutor(this);}/*** Implementation of the execute method. This demonstrates polymorphism as it provides* a specific implementation for main execution flow.* @param context the graph runner context* @param resultValue the atomic reference to store the result value* @return Flux of GraphResponse with execution result*/@Overridepublic Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {try {if (context.shouldStop() || context.isMaxIterationsReached()) {return handleCompletion(context, resultValue);}final var returnFromEmbed = context.getReturnFromEmbedAndReset();if (returnFromEmbed.isPresent()) {var interruption = returnFromEmbed.get().value(new TypeRef<InterruptionMetadata>() {});if (interruption.isPresent()) {return Flux.just(GraphResponse.done(interruption.get()));}return Flux.just(GraphResponse.done(context.buildCurrentNodeOutput()));}if (context.getCurrentNodeId() != null && context.getConfig().isInterrupted(context.getCurrentNodeId())) {context.getConfig().withNodeResumed(context.getCurrentNodeId());return Flux.just(GraphResponse.done(GraphResponse.done(context.getCurrentStateData())));}if (context.isStartNode()) {return handleStartNode(context);}if (context.isEndNode()) {return handleEndNode(context, resultValue);}final var resumeFrom = context.getResumeFromAndReset();if (resumeFrom.isPresent()) {if (context.getCompiledGraph().compileConfig.interruptBeforeEdge()&& java.util.Objects.equals(context.getNextNodeId(), INTERRUPT_AFTER)) {var nextNodeCommand = context.nextNodeId(resumeFrom.get(), context.getCurrentStateData());context.setNextNodeId(nextNodeCommand.gotoNode());context.setCurrentNodeId(null);}}if (context.shouldInterrupt()) {try {InterruptionMetadata metadata = InterruptionMetadata.builder(context.getCurrentNodeId(), context.cloneState(context.getCurrentStateData())).build();return Flux.just(GraphResponse.done(metadata));

NodeExecutor - 节点执行器

NodeExecutor 负责执行单个节点,处理节点动作的执行和结果处理。

核心职责

  • 执行节点动作(NodeAction)
  • 处理中断逻辑(InterruptableAction)
  • 处理流式输出(StreamingOutput)
  • 更新状态并确定下一个节点

关键代码

public class NodeExecutor extends BaseGraphExecutor {private final MainGraphExecutor mainGraphExecutor;public NodeExecutor(MainGraphExecutor mainGraphExecutor) {this.mainGraphExecutor = mainGraphExecutor;}/*** Implementation of the execute method. This demonstrates polymorphism as it provides* a specific implementation for node execution.* @param context the graph runner context* @param resultValue the atomic reference to store the result value* @return Flux of GraphResponse with execution result*/@Overridepublic Flux<GraphResponse<NodeOutput>> execute(GraphRunnerContext context, AtomicReference<Object> resultValue) {return executeNode(context, resultValue);}/*** Executes a node and handles its result.* @param context the graph runner context* @param resultValue the atomic reference to store the result value* @return Flux of GraphResponse with node execution result*/private Flux<GraphResponse<NodeOutput>> executeNode(GraphRunnerContext context,AtomicReference<Object> resultValue) {try {context.setCurrentNodeId(context.getNextNodeId());String currentNodeId = context.getCurrentNodeId();AsyncNodeActionWithConfig action = context.getNodeAction(currentNodeId);if (action == null) {return Flux.just(GraphResponse.error(RunnableErrors.missingNode.exception(currentNodeId)));}if (action instanceof InterruptableAction) {context.getConfig().metadata(RunnableConfig.STATE_UPDATE_METADATA_KEY).ifPresent(updateFromFeedback -> {if (updateFromFeedback instanceof Map<?, ?>) {context.mergeIntoCurrentState((Map<String, Object>) updateFromFeedback);} else {throw new RuntimeException();}});Optional<InterruptionMetadata> interruptMetadata = ((InterruptableAction) action).interrupt(currentNodeId, context.cloneState(context.getCurrentStateData()), context.getConfig());if (interruptMetadata.isPresent()) {

GraphRunnerContext - 执行上下文

GraphRunnerContext 管理图执行过程中的状态和上下文信息。

核心职责

  • 管理当前节点和下一个节点
  • 管理迭代次数
  • 处理 Checkpoint 和恢复
  • 管理状态更新

关键代码

public class GraphRunnerContext {public static final String INTERRUPT_AFTER = "__INTERRUPTED__";private static final Logger log = LoggerFactory.getLogger(GraphRunner.class);final CompiledGraph compiledGraph;final AtomicInteger iteration = new AtomicInteger(0);OverAllState overallState;RunnableConfig config;String currentNodeId;String nextNodeId;String resumeFrom;ReturnFromEmbed returnFromEmbed;public GraphRunnerContext(OverAllState initialState, RunnableConfig config, CompiledGraph compiledGraph)throws Exception {this.compiledGraph = compiledGraph;this.config = config;if (config.metadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY).isPresent()) {initializeFromResume(initialState, config);} else {initializeFromStart(initialState, config);}}private void initializeFromResume(OverAllState initialState, RunnableConfig config) {log.trace("RESUME REQUEST");var saver = compiledGraph.compileConfig.checkpointSaver().orElseThrow(() -> new IllegalStateException("Resume request without a configured checkpoint saver!"));var checkpoint = saver.get(config).orElseThrow(() -> new IllegalStateException("Resume request without a valid checkpoint!"));

执行流程时序图

以下 PlantUML 时序图展示了 Graph 的完整执行流程:
image

Checkpoint 机制

BaseCheckpointSaver - Checkpoint 保存器接口

BaseCheckpointSaver 定义了 Checkpoint 保存和恢复的接口。

关键代码

public interface BaseCheckpointSaver {String THREAD_ID_DEFAULT = "$default";record Tag(String threadId, Collection<Checkpoint> checkpoints) {public Tag(String threadId, Collection<Checkpoint> checkpoints) {this.threadId = threadId;this.checkpoints = ofNullable(checkpoints).map(List::copyOf).orElseGet(List::of);}}default Tag release(RunnableConfig config) throws Exception {return null;}Collection<Checkpoint> list(RunnableConfig config);Optional<Checkpoint> get(RunnableConfig config);RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;boolean clear(RunnableConfig config);default Optional<Checkpoint> getLast(LinkedList<Checkpoint> checkpoints, RunnableConfig config) {return (checkpoints == null || checkpoints.isEmpty()) ? Optional.empty() : ofNullable(checkpoints.peek());}default LinkedList<Checkpoint> getLinkedList(List<Checkpoint> checkpoints) {return Objects.nonNull(checkpoints) ? new LinkedList<>(checkpoints) : new LinkedList<>();}}

Checkpoint 流程

  1. 保存 Checkpoint:在执行节点后,通过 GraphRunnerContext.addCheckpoint() 保存当前状态
  2. 恢复 Checkpoint:通过 RunnableConfig 中的 checkpoint ID 恢复执行
  3. 释放 Checkpoint:执行完成后释放 Checkpoint 资源

状态管理

OverAllState 更新机制

状态更新通过 KeyStrategy 控制:

  1. ReplaceStrategy:替换策略,新值完全替换旧值
  2. AppendStrategy:追加策略,新值追加到列表
  3. Reducer:归约策略,使用自定义函数合并值

状态更新流程:

  1. 节点执行返回 Map<String, Object> 更新
  2. GraphRunnerContext 根据 KeyStrategy 合并更新
  3. 更新后的状态传递给下一个节点

实现关键点说明

1. 响应式编程

使用 Project Reactor 的 Flux 实现响应式执行:

  • 支持流式输出
  • 支持背压控制
  • 支持异步执行

2. 模板方法模式

BaseGraphExecutor 定义执行框架,子类实现具体逻辑:

  • MainGraphExecutor:主执行流程
  • NodeExecutor:节点执行流程

3. 上下文模式

GraphRunnerContext 封装执行上下文:

  • 管理当前执行状态
  • 提供状态访问接口
  • 处理 Checkpoint 和恢复

4. 中断和恢复机制

支持执行中断和恢复:

  • InterruptableAction:可中断的动作
  • InterruptionMetadata:中断元数据
  • Checkpoint 保存和恢复

5. 迭代控制

通过 maxIterations 控制最大迭代次数,防止无限循环。

总结说明

核心执行流程

  1. 初始化:创建 GraphRunnerContext,初始化状态
  2. 开始执行:从 START 节点开始
  3. 节点执行:执行当前节点,更新状态
  4. 边路由:根据 EdgeAction 确定下一个节点
  5. Checkpoint:保存执行状态(可选)
  6. 迭代:重复步骤 3-5,直到到达 END 节点
  7. 完成:返回最终结果

关键设计特点

  • 响应式:基于 Reactor 的响应式执行
  • 可中断:支持执行中断和恢复
  • 状态管理:灵活的状态更新策略
  • 可观测:支持监控和追踪
  • 容错性:错误处理和恢复机制

执行器层次结构

BaseGraphExecutor (抽象基类)├── MainGraphExecutor (主执行器)└── NodeExecutor (节点执行器)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/963832.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

逻辑回归原理与案例分析

一、逻辑回归基本概念 逻辑回归:用于解决二分类问题的统计学习方法,虽然名字中有"回归",但实际上解决的是分类问题。 核心思想:根据给定的输入特征,通过逻辑函数(Sigmoid函数)计算出样本属于某个特定…

杂题记录 4

NOIP 前咋还布置一堆数据结构 /yun,关键布置的有八成都是做过的。于是乱找了些 DS 做。 P14363 [CSP-S 2025] 谐音替换 / replace 发现是询问 \(p\in P,q\in Q\) 的 \((p,q)\) 的个数的形式,其中 \(p\in P\) 指的是 …

找唯一特征去重转移DP——CF1210F2 Marek and Matching

找唯一特征值去重转移DP——CF1210F2 Marek and Matching 匹配肯定利用霍尔定理,先写出:\(\forall S,|S|-|G(S)|\le 0\)。 图论计数往往考虑容斥,设 \(f_{S,T}\) 表示对于二分图 \((S,T)\),出现大小为 \(|S|\) 的匹…

UEFI Boot Manager

简介 UEFI boot manager是一个固件方针引擎,它可以通过修改global NVRAM 变量来配置;它会按照global NVRAM variable 定义的顺序来加载UEFI driver or UEFI application。当固件初始化完成,就会把控制权交给boot ma…

25年11月计数题做题记录

AGC073C 因为实数不好处理所以我们考虑将实数映射到整数上去做,相当于我们把值域在 \([-(n-1),1]\) 之间的实数映射到 \([-(n-1)m,m]\) 之间的整数上,其中 \(m\rightarrow\infty\),现在我们需要考虑的就是值域在 \(…

固体废物资源化处理简答题与论述题

固体废物处理核心内容简答题与论述题(含解说) 一、简答题(侧重基础概念与关键参数) 1. 简答题1:固体废物分类及“三化”“3R”原则分别是什么?答案:分类:工业固体废物、城市固体废物、农业固体废物、危险废物、…

详细介绍:Python基础语法与数据类型详解

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

noip6 多校1

11.1211.12 t1 \(O(nm^2)\)是简单的。 发挥人类智慧发现每次最优只在前面较少的状态。 于是可过。 其实人类智慧有证明的。 考虑若最大值越大,则选的次数越小,反之亦然。 平均一下就过了。 codet1 #include <bits…

CCPC2025哈尔滨站-H. 匹配

时停问题,考虑势能函数。设单个集合的势能函数为 \(f(x)\),其中 \(x\) 为集合大小,这是合法的。总的势能 \(\Phi = \sum\limits_{s\in S} f(|s|)\).考虑列出方程解出 \(f\)。 满足鞅的时停定理的势能 \(\Phi\) 满足…

通过开发环境部署工具安装qt相关c++开发环境

通过开发环境部署工具安装qt相关c++开发环境

第23天(简单题中等题 二分查找)

打卡第二十三天 1道简单题+2道中等题题目: 两数之和思路一: 双指针,题目说数组下标从1开始,先初始指针还是从0开始,返回时左右指针都加1就行 代码: class Solution { public:vector<int> twoSum(vector<int&g…

Cinema4D 2025保姆级下载安装教程|含安装包获取+新手入门指南

目录一、Cinema4D 2025 核心功能亮点二、Cinema4D 2025 正规下载渠道1.Cinema4D 2025 下载(推荐)2.其他下载注意事项三、Cinema4D 2025 详细安装激活步骤(Windows 系统)步骤 1:解压安装包(提前关闭杀毒软件)步骤…

CF2101D

给定一个长度为 \(n\) 的排列 \(a\),问其有多少个子串 \(b\),使得 \(LIS(b) + LDS(b) = |b| + 1\) \(n \le 2 \times 10^5\)考虑一下题目给的条件在说啥,其实就是每个元素都在 \(LIS/LDS\) 中,只有一个相交的地方(…

【做题记录】HZOJ 多校-数论

B. [POI2011] SEJ-Strongbox 2025noip模拟赛73 BCode #include<bits/stdc++.h> #define int long long #define il inline #define pb push_back using namespace std; namespace asbt{ const int maxn=2.5e5+5;…

2014 吉林省赛题解 | CCUT应用OJ题解——F[X] + X = N

题目简介题意:对于任意十进制整数 \(X\),定义 \(F[X]^\dagger\) 表示其循环左移一位所得到的数。给定非负整数 \(N\),求所有满足等式 \(X+F[X]=N\) 的 \(X\) 的个数。 \(\dagger\) 循环左移:设 \(X=a_0a_1\cdots a…

洛谷 P4859 已经没有什么好害怕的了 题解(DP,二项式反演)

给两个长为 \(n\) 的数组 \(a, b,\) 求将 \(a_i, b_j\) 两两匹配使得 \(a_i > b_j\) 的数量比 \(a_i < b_j\) 的数量多 \(k\)。数字不重复, \(k \le n \le 2000\)。注意到,其实 \(a_i>b_j\) 和 \(a_i<b_j…

01321:棋盘问题

|DFS|回溯| 难点1:DFS,对于dfs(h,t)表示的“即将在第h行进行摆放,已摆放的棋子数为t个”,即如何在dfs函数内部进行递归:若该棋可以放在第h行的第i个位置(标注take[i]=true),则对改行以下行中所有可行的点进行递归…

飞鱼uu单人防空4

飞鱼uu单人防空4盖亚特买40个-->超级防空100-->超级磁能40-->磁能巨无霸20.剩下的随便买.一直造坦克碉堡

C 变量的作用域与生存周期

弄清楚变量的作用域和生存周期,能够让我们不至于犯低级错误; 局部变量 在初学C语言变量的时候,我们通常将变量写在main函数内; 在函数内部,或者{}块内部的变量称为局部变量;它有以下特点:系统不会帮忙初始化,需…