Spring AI Alibaba 项目源码学习(二)-Graph 定义与描述分析

news/2025/11/10 22:24:59/文章来源:https://www.cnblogs.com/wasp520/p/19208457

Graph 定义与描述分析

请关注公众号:阿呆-bot

概述

本文档分析 spring-ai-alibaba-graph-core 模块中 Graph 的定义和描述机制,包括入口类、关键类关系、核心实现代码和设计模式。

入口类说明

StateGraph - Graph 定义入口

StateGraph 是定义工作流图的主要入口类,用于构建包含节点(Node)和边(Edge)的状态图。

核心职责

  • 管理图中的节点集合(Nodes)
  • 管理图中的边集合(Edges)
  • 提供节点和边的添加方法
  • 支持条件路由和子图
  • 编译为可执行的 CompiledGraph

关键代码

public class StateGraph {/*** Constant representing the END of the graph.*/public static final String END = "__END__";/*** Constant representing the START of the graph.*/public static final String START = "__START__";/*** Constant representing the ERROR of the graph.*/public static final String ERROR = "__ERROR__";/*** Constant representing the NODE_BEFORE of the graph.*/public static final String NODE_BEFORE = "__NODE_BEFORE__";/*** Constant representing the NODE_AFTER of the graph.*/public static final String NODE_AFTER = "__NODE_AFTER__";/*** Collection of nodes in the graph.*/final Nodes nodes = new Nodes();/*** Collection of edges in the graph.*/final Edges edges = new Edges();/*** Factory for providing key strategies.*/private KeyStrategyFactory keyStrategyFactory;/*** Name of the graph.*/private String name;/*** Serializer for the state.*/private final StateSerializer stateSerializer;

构造函数

public StateGraph(String name, KeyStrategyFactory keyStrategyFactory, PlainTextStateSerializer stateSerializer) {this.name = name;this.keyStrategyFactory = keyStrategyFactory;this.stateSerializer = stateSerializer;}public StateGraph(KeyStrategyFactory keyStrategyFactory, PlainTextStateSerializer stateSerializer) {this.keyStrategyFactory = keyStrategyFactory;this.stateSerializer = stateSerializer;}/*** Constructs a StateGraph with the specified name, key strategy factory, and SpringAI* state serializer.* @param name the name of the graph* @param keyStrategyFactory the factory for providing key strategies* @param stateSerializer the SpringAI state serializer to use*/public StateGraph(String name, KeyStrategyFactory keyStrategyFactory, SpringAIStateSerializer stateSerializer) {this.name = name;this.keyStrategyFactory = keyStrategyFactory;this.stateSerializer = stateSerializer;}/*** Constructs a StateGraph with the specified key strategy factory and SpringAI state* serializer.* @param keyStrategyFactory the factory for providing key strategies* @param stateSerializer the SpringAI state serializer to use*/public StateGraph(KeyStrategyFactory keyStrategyFactory, SpringAIStateSerializer stateSerializer) {this.keyStrategyFactory = keyStrategyFactory;this.stateSerializer = stateSerializer;}public StateGraph(String name, KeyStrategyFactory keyStrategyFactory) {this.name = name;this.keyStrategyFactory = keyStrategyFactory;this.stateSerializer = new JacksonSerializer();}/*** Constructs a StateGraph with the provided key strategy factory.* @param keyStrategyFactory the factory for providing key strategies*/public StateGraph(KeyStrategyFactory keyStrategyFactory) {this.keyStrategyFactory = keyStrategyFactory;this.stateSerializer = new JacksonSerializer();}/*** Default constructor that initializes a StateGraph with a Gson-based state* serializer.*/public StateGraph() {this.stateSerializer = new JacksonSerializer();this.keyStrategyFactory = HashMap::new;}

CompiledGraph - 编译后的可执行图

CompiledGraphStateGraph 编译后的可执行形式,包含了优化后的节点工厂和边映射。

核心职责

  • 存储编译后的节点工厂映射
  • 管理边映射关系
  • 提供节点执行入口
  • 支持中断和恢复

关键代码

public class CompiledGraph {private static final Logger log = LoggerFactory.getLogger(CompiledGraph.class);private static String INTERRUPT_AFTER = "__INTERRUPTED__";/*** The State graph.*/public final StateGraph stateGraph;/*** The Compile config.*/public final CompileConfig compileConfig;/*** The Node Factories - stores factory functions instead of instances to ensure thread safety.*/final Map<String, Node.ActionFactory> nodeFactories = new LinkedHashMap<>();/*** The Edges.*/final Map<String, EdgeValue> edges = new LinkedHashMap<>();private final Map<String, KeyStrategy> keyStrategyMap;private final ProcessedNodesEdgesAndConfig processedData;private int maxIterations = 25;/*** Constructs a CompiledGraph with the given StateGraph.* @param stateGraph the StateGraph to be used in this CompiledGraph* @param compileConfig the compile config* @throws GraphStateException the graph state exception*/protected CompiledGraph(StateGraph stateGraph, CompileConfig compileConfig) throws GraphStateException {maxIterations = compileConfig.recursionLimit();

关键类关系

Node - 节点抽象

Node 表示图中的节点,包含唯一标识符和动作工厂。

关键代码

public class Node {public static final String PRIVATE_PREFIX = "__";public interface ActionFactory {AsyncNodeActionWithConfig apply(CompileConfig config) throws GraphStateException;}private final String id;private final ActionFactory actionFactory;public Node(String id, ActionFactory actionFactory) {this.id = id;this.actionFactory = actionFactory;}/*** Constructor that accepts only the `id` and sets `actionFactory` to null.* @param id the unique identifier for the node*/public Node(String id) {this(id, null);}public void validate() throws GraphStateException {if (Objects.equals(id, StateGraph.END) || Objects.equals(id, StateGraph.START)) {return;}if (id.isBlank()) {throw Errors.invalidNodeIdentifier.exception("blank node id");}if (id.startsWith(PRIVATE_PREFIX)) {throw Errors.invalidNodeIdentifier.exception("id that start with %s", PRIVATE_PREFIX);}}/*** id* @return the unique identifier for the node.*/public String id() {return id;}/*** actionFactory* @return a factory function that takes a {@link CompileConfig} and returns an* {@link AsyncNodeActionWithConfig} instance for the specified {@code State}.*/public ActionFactory actionFactory() {return actionFactory;}

Edge - 边抽象

Edge 表示图中的边,连接源节点和目标节点,支持条件路由。

关键代码

public record Edge(String sourceId, List<EdgeValue> targets) {public Edge(String sourceId, EdgeValue target) {this(sourceId, List.of(target));}public Edge(String id) {this(id, List.of());}public boolean isParallel() {return targets.size() > 1;}public EdgeValue target() {if (isParallel()) {throw new IllegalStateException(format("Edge '%s' is parallel", sourceId));}return targets.get(0);}public boolean anyMatchByTargetId(String targetId) {return targets().stream().anyMatch(v -> (v.id() != null) ? Objects.equals(v.id(), targetId): v.value().mappings().containsValue(targetId));}public Edge withSourceAndTargetIdsUpdated(Node node, Function<String, String> newSourceId,Function<String, EdgeValue> newTarget) {var newTargets = targets().stream().map(t -> t.withTargetIdsUpdated(newTarget)).toList();return new Edge(newSourceId.apply(sourceId), newTargets);}

OverAllState - 全局状态

OverAllState 是贯穿整个图执行过程的全局状态对象,用于在节点间传递数据。

关键代码

public final class OverAllState implements Serializable {public static final Object MARK_FOR_REMOVAL = new Object();/*** Internal map storing the actual state data. All get/set operations on state values* go through this map.*/private final Map<String, Object> data;/*** Mapping of keys to their respective update strategies. Determines how values for* each key should be merged or updated.*/private final Map<String, KeyStrategy> keyStrategies;/*** Store instance for long-term memory storage across different executions.*/private Store store;/*** The default key used for standard input injection into the state. Typically used* when initializing the state with user or external input.*/

关键类关系图

以下 PlantUML 类图展示了 Graph 定义相关的关键类及其关系:

@startuml
!theme plain
skinparam classAttributeIconSize 0package "Graph Definition" {class StateGraph {-Nodes nodes-Edges edges-KeyStrategyFactory keyStrategyFactory-String name-StateSerializer stateSerializer+addNode(String, NodeAction)+addEdge(String, String)+addConditionalEdges(...)+compile(CompileConfig): CompiledGraph}class CompiledGraph {+StateGraph stateGraph+CompileConfig compileConfig-Map<String, Node.ActionFactory> nodeFactories-Map<String, EdgeValue> edges-Map<String, KeyStrategy> keyStrategyMap}class Node {-String id-ActionFactory actionFactory+id(): String+actionFactory(): ActionFactory+validate(): void}class Edge {-String sourceId-List<EdgeValue> targets+isParallel(): boolean+target(): EdgeValue+validate(Nodes): void}class OverAllState {-Map<String, Object> data-Map<String, KeyStrategy> keyStrategies-Store store+get(String): Object+put(String, Object): void+registerKeyAndStrategy(String, KeyStrategy): void}interface NodeAction {+apply(OverAllState): Map<String, Object>}interface EdgeAction {+apply(OverAllState): String}class KeyStrategy {+merge(Object, Object): Object}class StateSerializer {+serialize(OverAllState): String+deserialize(String): OverAllState}
}StateGraph "1" --> "1" CompiledGraph : compiles to
StateGraph "1" --> "*" Node : contains
StateGraph "1" --> "*" Edge : contains
StateGraph --> KeyStrategyFactory : uses
StateGraph --> StateSerializer : uses
Node --> NodeAction : creates via ActionFactory
Edge --> EdgeAction : uses for conditional routing
CompiledGraph --> Node : stores factories
CompiledGraph --> Edge : stores mappings
OverAllState --> KeyStrategy : uses
OverAllState --> Store : usesnote right of StateGraph入口类:用于定义工作流图支持添加节点和边支持条件路由和子图
end notenote right of CompiledGraph编译后的可执行图包含优化的节点工厂线程安全的工厂函数
end notenote right of OverAllState全局状态对象在节点间传递数据支持键策略管理
end note@enduml

实现关键点说明

1. Builder 模式

StateGraph 使用链式调用模式构建图,支持流畅的 API:

StateGraph graph = new StateGraph("MyGraph", keyStrategyFactory).addNode("node1", nodeAction1).addNode("node2", nodeAction2).addEdge(START, "node1").addConditionalEdges("node1", edgeAction, Map.of("yes", "node2", "no", END)).addEdge("node2", END);

2. 工厂模式

Node 使用 ActionFactory 接口延迟创建节点动作,确保线程安全:

public interface ActionFactory {AsyncNodeActionWithConfig apply(CompileConfig config) throws GraphStateException;
}

这种设计允许在编译时创建工厂函数,在执行时根据配置创建实际的动作实例。

3. 策略模式

KeyStrategy 用于控制状态键的更新策略,支持不同的合并逻辑(Replace、Append、Reduce 等)。

4. 序列化支持

StateGraph 支持多种状态序列化器:

  • PlainTextStateSerializer:纯文本序列化
  • SpringAIStateSerializer:Spring AI 标准序列化
  • JacksonSerializer:Jackson JSON 序列化(默认)

5. 验证机制

NodeEdge 都实现了 validate() 方法,确保图的完整性:

  • 节点 ID 不能为空或使用保留前缀
  • 边引用的节点必须存在
  • 并行边不能有重复目标

总结说明

核心设计理念

  1. 声明式 API:通过 StateGraph 提供声明式的图定义方式,隐藏底层复杂性
  2. 编译时优化StateGraph 编译为 CompiledGraph,将定义转换为可执行形式
  3. 状态管理OverAllState 作为全局状态容器,支持键策略和序列化
  4. 类型安全:使用泛型和接口确保类型安全
  5. 可扩展性:通过接口和工厂模式支持自定义节点和边动作

关键优势

  • 灵活性:支持同步和异步节点、条件路由、并行执行
  • 可维护性:清晰的类层次结构和职责分离
  • 可测试性:接口抽象便于单元测试
  • 性能:编译时优化和工厂模式减少运行时开销

使用流程

  1. 定义图:使用 StateGraph 添加节点和边
  2. 编译图:调用 compile() 方法生成 CompiledGraph
  3. 执行图:通过 GraphRunner 执行编译后的图
  4. 状态传递:使用 OverAllState 在节点间传递数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/961799.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

20232422 2024-2025-1 《网络与系统攻防技术》实验四实验报告

20232312 2024-2025-1 《网络与系统攻防技术》实验四实验报告

SpringBoot热启动

引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional> </dependency>设置确…

SPI 设备与多从机冲突的解决之道:片选管理、CS 去抖与总线隔离策略 - 实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025 年 11 月超声波检测设备厂家推荐排行榜,超声波检测系统,相控阵/高频/水浸/液冷板/钎焊超声波检测,高频相控阵超声波检测设备厂家推荐

2025 年 11 月超声波检测设备厂家推荐排行榜,超声波检测系统,相控阵/高频/水浸/液冷板/钎焊超声波检测,高频相控阵超声波检测设备厂家推荐 行业技术发展背景 超声波检测技术作为现代工业无损检测的重要手段,在材料…

对于生成虚tree进行DP——CF1097G Vladislav and a Great Legend

对于生成虚tree进行DP——CF1097G Vladislav and a Great Legend 首先 \[\sum_Xf^k(X)=\sum_{i=1}^k{k \brace i}i!\sum_X{F(X)\choose i} \]考虑如何 \(dp\) \(\sum_X{F(X)\choose i}\)。 设 \(f_{x,i}\) 表示考虑 \(…

2025 年 11 月除蜡水厂家推荐排行榜,钢铁除蜡水,不锈钢除蜡水,金属除蜡水,工业除蜡水公司推荐

2025年11月除蜡水厂家推荐排行榜:专业解析钢铁除蜡水与金属表面处理技术 在当今精密制造与金属加工行业快速发展的背景下,除蜡水作为金属表面处理的关键制剂,其性能优劣直接影响着工件的清洁度与产品质量。钢铁除蜡…

使用napi-rs,通过node调用rust代码

0、node版本建议在16以上 1、安装napi-rs npm init -y npm install @napi-rs/cli --save-dev2、初始化一个 napi 项目 npx @napi-rs/cli new my-rust-addon # 如果这句不行就试试 npx napi new my-rust-addon3、进入my…

20232309 2025-2026-1 《网络与系统攻防技术》实验四实验报告

1.实验内容 1.1学习内容 1.恶意代码分析方法(静态分析、动态分析等) 2.PEiD、IDA Pro、ProcessExplorer等分析工具的使用方法 1.2实验任务恶意代码文件类型标识、脱壳与字符串提取 使用IDA Pro静态或动态分析crackme…

智语写作都有哪些功能?看这一篇就够了!智语写作全功能详解

作为AI写小说全能工具箱的智语写作,功能非常丰富,其AI生成能力可覆盖小说、短剧、公众号、视频等创作全流程。 本篇文章,就来给大家详细介绍一下这些功能。 一、AI辅助创作功能:已对接GPT、Gemini、claude、grok、…

pythontip 字符串转为字典

编写一个程序将字符串转换为字典。 定义函数convert_str_list_to_dict(),参数为str_list(输入的字符串)。 在函数内部,创建一个字典,其中每个字符串使用=进行分割,第一部分为键,第二部分为值。 返回字典。先用字符…

Microsoft Activation Scripts (MAS)

GitHub - massgravel/Microsoft-Activation-Scripts: Open-source Windows and Office activator featuring HWID, Ohook, TSforge, KMS38, and Online KMS activation methods, along with advanced troubleshooting.…

rufus.ini

; 基础显示设置 DISP W1920 H1080 B32 F60; 初始化变量(区分文件全路径和所在目录) SET FullPath=未选择文件 SET FileDir=未选择文件 SET KK=未选择文件; 核心文件选择子过程(同时提取文件所在路径) _SUB SelectF…

团队作业2

团队作业2团队作业2这个作业属于哪个课程 计科23级12班 这个作业要求在哪里 团队作业2-《需求规格说明书》 这个作业的目标 体育场馆预约系统github仓库:https://github.com/skymoon-13/Sports_Venue_Reservation_Sys…

Explorer++

Explorer++ - A small and fast file manager for Windows

Interpretability-Guided Test-Time Adversarial Defense

会议/期刊: ECCV 2024 作者:Akshay Kulkarni、Tsui-Wei Weng 代码仓库:https://github.com/Trustworthy-ML-Lab/Interpretability-Guided-Defense研究背景与动机 一次对抗训练成本远高于一次标准训练的成本,因此有…

JavaWeb04-JUnit

JavaWeb04-JUnit单元测试测试:是一种用来促进鉴定软件的正确性、完整性、安全性和质量的过程。阶段划分:单元测试(白盒)、集成测试(灰盒)、系统测试(黑盒)、验收测试(黑盒)测试方法:白盒测试、黑盒测试及灰…

2025 年 11 月开窗器厂家推荐排行榜,链条开窗器,机芯开窗器,配件开窗器,电动开窗器公司推荐

2025年11月开窗器厂家推荐排行榜:链条开窗器、机芯开窗器、配件开窗器、电动开窗器公司推荐 行业背景分析 随着建筑智能化水平的不断提升,开窗器作为建筑通风系统的重要组成部分,其市场需求呈现稳步增长态势。开窗器…

详细介绍:用户体验就是新SEO:如何同时提升搜索者满意度和搜索排名

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

P6688 可重集 笔记

\(\mathrm{base}^{x}\) 的哈希方式具有可减的性质。

哪款学习机适合小学生用?2025年11月多款主流品牌告诉你如何选

随着双十一购物潮临近,学习机市场的促销大战也愈演愈烈 —— 满减、赠品、限时折扣轮番上阵,叠加原本就处于红海竞争阶段的市场环境:新品牌扎堆涌现,老品牌高频推新,各类宣传噱头更是眼花缭乱,有的标榜 “全科提…