Pulsar 特性在 AI 场景中的使用!

引言

没有意外,随着模型规模的持续增长和应用场景的日益复杂,AI Infra 也自然地从"单体架构" -> "分布式架构"进行演进,例如:

在大模型训练和推理阶段,随着模型规模的增长,需要通过多维度并行技术(数据并行、张量并行、流水线并行等)并发使用数百甚至数千个GPU才能满足训练需求;

在智能体应用阶段,从能对话、写文案的 Chatbot到如今能自主规划、工具调用、多 Agent 协作,工具越来越智能,调用链也越来越复杂;

再到各行业落地时,应用的业务主路径开始集成AI能力,也对部署架构本身的高可靠、高可用及高性能提出了更多的要求

然而这个从单体架构到分布式架构的升级,最核心的变化就是通过消息中间件让数据、模型、服务之间能够异步、可靠、松耦合地协同工作,从而构建可扩展、可维护、可演进的AI平台的基础设施。

Pulsar 作为消息中间件的中流砥柱,以其更鲜明的存算分离、云原生特性,可发挥着更大的价值。

场景一【多模态】:让 Pulsar 直接吞进 超大消息,多模态训练“零”切片

传统的单模态模型,如自然语言处理(NLP)模型仅处理文本,计算机视觉(CV)模型仅处理图像,自动语音识别(ASR)模型仅处理音频,它们彼此独立。多模态AI旨在让机器能够像人类一样,通过融合和理解来自多种感官通道(如视觉、听觉、语言)的信息来进行感知、推理和交互。这个给多模态训练增加了不小的难度;

多模态 AI 系统处理的数据类型远超传统文本,包含了图像、视频、音频、3D 点云等大体积的非结构化数据。这些数据单个文件的大小就可能从几 MB 到几 GB 不等。其他的消息队列系统往往对单条消息的大小有严格限制(例如,Kafka 默认单条消息上限约 1 MB,调参后虽可放大,但需权衡副本同步压力。),这迫使开发者在传输大文件时采用复杂的变通方案,如将文件存储在对象存储中,然后在消息中只传递文件的路径或 URL。

这种方式虽然可行,但增加了系统的复杂性和处理延迟,并且无法充分利用消息队列在数据流管理和处理方面的优势 。

然而 Pulsar 原生支持超大消息体,即 Pulsar 的 Chunk message,Pulsar 的 Chunk Message 是多模态训练的数据管道利器,它解决了大消息传输的完整性、顺序性、简化性三大问题,可显著降低多模态数据管道的工程负担,使开发者聚焦模型逻辑而非传输细节。

|Pulsar Chunk Message(分块消息)是 Apache Pulsar 提供的一种用于透明处理超大消息(>5MB)的机制。它允许生产者端自动将大消息拆分为多个小块传输,并在消费者端自动重组,业务层无需感知分块细节。

场景二【多模态】:用 Pulsar 把文本、图像、音频流绑定到一起

多模态AI需要处理和融合的数据类型极其多样化。系统需要同时处理文本(自然语言描述)、图像(像素矩阵)、音频(波形信号)、视频(图像序列和音频流的组合)等多种异构数据。

在许多场景中,不同模态的数据在时间上存在紧密的依赖关系。例如,在视频理解任务中,音频中的对话内容需要与视频中人物的口型、动作在时间上精确对齐;在自动驾驶场景中,激光雷达的点云数据、摄像头的图像数据和GPS的定位数据必须在同一时间点或时间窗口内进行融合,才能构建出对周围环境准确的感知。因此,消息中间件不仅要能传输数据,还需要提供机制来保证跨模态数据的时间同步和顺序性 。

利用 Pulsar 的 keyshare 消费模型,可以将同一key的数据总是被路由分配到同一实例完成聚合,方案如下:

时间同步:选定一个物理时钟源(PTP/NTP/帧时钟),所有模态 Producer 在本地打时间桶 ID(t-bucket),粒度 = 1 ms 或 1 帧间隔。

Produce 发送:每条消息把 t-bucket 放在 Pulsar 的事件时间(eventTime ,SDK 原生字段)里,同时作为路由 Key。

消费者使用 Key_Shared 订阅,Key = t-bucket,Pulsar 可保证相同 Key 的消息只会发给同一消费者实例

收到模态 A、B、C 的同一桶消息后,再打包成一条 MultiModalFrame 喂给模型;

|Key_Shared(键共享)是 Pulsar 的一种订阅模式,它在 Shared 模式的基础上增加了按消息 Key 的路由规则:相同 Key 的消息始终被分配到同一个消费者,而不同 Key 的消息可分布在多个消费者上并行处理,实现Key 级别的有序性与负载均衡。

场景三【模型训练】:用好 Pulsar 压缩 Topic,实现 Checkpoint 秒级断点续训

模型训练周期长、数据量大、集群规模大,出现中断的概率显著提高,且重启代价高昂;

所以通常会使用Checkpoint机制来加速恢复的过程,但保存 Checkpoint 耗时较高,若存储服务瞬时故障,写入请求直接丢失,导致训练状态丢失。

引入 Pulsar 作为中间层后,可以将异常数据跳过、Checkpoint 异步缓存、任务级重试等操作都交给 Pulsar 的特性来解决,方案如下:

Checkpoint 数据具有明显的历史消息无效的特性,如果发生积压时,只有最新的一条 checkpoint 才有价值,这时可以使用 Pulsar 的压缩 Topic(Compaction Topic),压缩 topic 将 Checkpoint Topic 从日志流变为 KV 存储,仅保留每个 Key 的最新消息,自动清理历史版本,这样对比传统方案(扫描 S3 文件列表 → 排序 → 下载)需要耗时 3-5 分钟到直接接收最新 Key 的方案,耗时<1S;

Compaction Topic 是 Apache Pulsar 提供的一种基于消息 Key 的日志压缩机制,它会自动清理主题中每个 Key 的旧版本消息,仅保留最新版本,从而显著减少主题体积、加速消费回溯,适用于"只关心最终状态"的场景。

场景四【模型训练】:以 Pulsar 为“输油管”:优化模型训练中的 GPU 饥饿

在大规模模型训练中,数据是驱动整个训练过程的“燃料”,特别是针对拥有数十亿甚至万亿级参数的深度学习模型,能高效且稳定的确保“燃料”能够持续、稳定地供应给计算引擎(如 GPU 集群)是关键所在。

训练这些庞然大物需要海量的训练数据,这些数据通常以 TB 甚至 PB 计。数据加载和预处理的速度直接决定了 GPU 这一昂贵计算资源的利用率。有数据表明 I/O 延迟使 GPU 每步等待数百毫秒,空闲率可达30-50%。为了充分利用昂贵的计算资源,必须确保数据能够以足够快的速度被加载到每个计算节点的内存中,如果数据供给速度跟不上模型消耗数据的速度,就会有大量时间浪费在等待数据上,即所谓的“数据饥饿”问题。

历史的架构中,数据预处理模块与训练模块存在耦合的情况,然而耦合的模块可能相互影响从而降低了 GPU 的读取效率;

这种架构中,非常适合引入 Pulsar 在其中作为缓冲层,在数据平面预处理服务独立扩展,训练节点只专注消费,利用 Pulsar 的高吞吐特性,“喂养”GPU 的数据高速且稳定;

并且当 GPU 消费慢时,还可以利用 Pulsar 的背压机制,预处理消费时自动降低预取速率,避免 OOM,从而让整个链路更加健壮;

不止如此,还可以继续针对 Topic 的消费进行积压监控,如果出现积压,辅以 K8S 的KEDA机制+ Pulsar 的Share消费类型可以整个扩缩容过程更加平滑和稳定;

|背压(Backpressure)是Pulsar中用于防止生产者过载消费者的流量控制机制。当消费者处理速度跟不上生产者发送速度时,系统通过多级反馈控制主动减缓上游生产速率,避免内存溢出、数据丢失和系统崩溃。

|KEDA(Kubernetes Event-driven Autoscaling),是一种基于事件驱动的自动扩容解决方案,支持通过外部事件源动态调整Pod副本数;

|Share消费类型(Shared Subscription)是Apache Pulsar的一种订阅模式,允许多个消费者绑定到同一个订阅名上,消息通过轮询机制分发给不同的消费者,每个消息仅会被分发给一个消费者,不保证消息顺序,适合高吞吐、无需顺序消费的场景。

场景五【智能体】:利用 Pulsar 轻量化主题(non-persistent)解决 AI 应用的异步通信难题

模型迭代日新月异,企业正在积极把 AI 能力嵌入业务流程。然而,企业应用从调用传统微服务应用 API 接口 到 调用大模型“生成式”的 API 接口过程中,一个显著的特征是任务处理时耗变的很长,传统微服务应用通常能实现毫秒级响应,而 AI 应用的处理周期跨度极大——从几分钟到数小时不等;

这就意味着原本微服务间的同步调用就不再适用,可将同步调用改为异步通知来解决长耗时的阻塞;改为异步通知后,那又如何能实现同步调用的即时通信呐,可以采取以下模型:

Agent1 在启动时注册一个专属于自己的用于接收回包的非持久化 Topic(non-persistent Topic),非持久化 Topic 非常轻量化,数据不落盘存储,生命周期可由 TTL 自动或人工回收,Agent1 可使用独占消费模型进行消费该 Topic

当 Agent1 有长耗时的调用模型请求时,向正常 Topic 发送请求,并由模型处理模块处理;该 Topic 为常规 Topic,具备消息持久化、消息回放、海量积压等队列特性

当 LLM 处理模块完成后,根据请求包中的回包地址进行回包投递

基于此模型,可以利用 Pulsar 的 Persistent-topic,将长时耗任务进行异步化处理,利用 Pulsar 的高可用、低延时的特性来保障请求任务的可靠、解耦和削峰填谷;又可以利用 Pulsar的 Non-Persistent-topic 的轻量化,实现百万级创建,快速回收等能力。

|Non-persistent Topic:是 Pulsar 的一种 Topic 类型,是“不落盘、纯内存” 的消息通道——数据不会写入磁盘、不会做副本复制,Broker 宕机或进程重启即丢失,因此极致轻量、低延迟,适合“可丢、可重试、要快、要大量”的短时消息场景。

场景六【智能体】:Pulsar 可为事件驱动的智能体提供“新基建”

AI Agent 的概念正在经历一场深刻的变革,从简单的对话式 AI(Chatbot)向复杂的独立实体转变。AI Agent 就是将一个大模型(大脑)和一系列工具(感官与四肢)组装起来,形成的一个能够感知和改变外部环境的智能程序。

以创建一个营销 Agent 为例,采用ReAct的模型,Agent 可能首先从 CRM 中提取客户数据,使用 API 收集市场趋势,并在新信息出现时不断调整策略。通过通过记忆保留上下文并迭代查询,Agent 能够生成更准确、更相关的输出。


当外部接口越来越丰富,Agent 需要不断的扩展收集信息来源,包括其他 Agent、工具和外部系统等等,以便做出更精准的决策。


而这,从系统架构设计的角度上讲,就是一个分布式系统问题。这和微服务时代面临的挑战相似,因为在微服务中,各个组件必须高效地进行通信,而不产生瓶颈或僵化的依赖关系。也和微服务架构系统一样,它们的输出不仅仅应该回流到 AI 应用程序中,它们还应该流入其他关键系统,如数据仓库、CRM、CDP 和客户成功平台。所以完全可以将 Agent 理解为:有“大脑”的微服务;

从微服务的架构演进来看,Agent的未来是事件驱动的,事件驱动的架构需要一个高效的消息中间件作为“基建”,因为消息中间的特性可以很好的匹配事件驱动需要的横向扩展性、低延迟、松耦合、事件持久化等诉求。

Pulsar 除了以上在消息中间件的优势外,还提供了 Function Mesh 的能力,利用 Function 的能力可以更进一步简化 AI Agent 的架构:

|ReAct模式:ReAct(Reasoning and Action)是目前应用最广泛、最经典的 AI Agent 运行模式之一 。其核心思想是模拟人类解决复杂问题的过程,通过一个 “思考(Thought)→ 行动(Action)→ 观察(Observation)”的循环来逐步推进任务 。

|Pulsar Function :Pulsar 提供的轻量级、Serverless 流处理框架,定位是“用写普通函数的代码量,完成 ETL、过滤、聚合、打标签等实时计算”。它把“消息 → 计算 → 消息”的闭环直接跑在 Pulsar 集群内部,简单场景不需要额外部署 Flink、Storm 等重型流处理引擎。

场景七【智能体】:具身智能需要“传感器流+任务队列”

在具身智能的场景中,既需要处理传感器读数流(连续、有序的数据),也需要处理独立的指令或任务(这些任务需要独立处理);

例如:一个机器人 Agent 在处理任务时,首先机器人的视觉或遥测传感器持续发布事件流,这些事件需要按顺序来处理或者来理解当前所处环境的变化;然后当机器人的 AI 决定采取行动,例如“拾取物体”或“导航到位置”时,这些任务会被添加到工作队列中。这些行动消息可能需要多个执行器模块(消费者)会分担这些任务。每个任务消息会被分配给一个执行器,执行器在完成任务后会进行确认。如果任务失败,执行器可以发送负向确认(表示失败),然后另一个实例可以重试。

我们回顾上述的过程,虽然都是利用消息管道进行消息传递,但是这是两种不同的数据类型:

类似传感器流,生产者将数据追加到一个无界、有序的日志(即流)中。消费者随后按顺序从这个日志中读取数据,并维护流中的偏移量(offset)。每个分区内的顺序是有保障的,消息在消费时不会被移除,这就是 kafka 专注的 stream(流)场景,它提供了高吞吐量和分区的严格排序,这使得它非常适合处理有序的事件流。

类似任务消息,生产者将消息发送到队列,每条消息只由一个消费者处理(即使有多个消费者在监听)。消费者从队列中拉取消息,并在处理完成后确认每条消息,消息随后会从队列中移除。队列擅长分发可以并行处理且无需全局排序要求的任务或工作。这就是 rabbitmq、rocketmq 专注的 queues(消息)场景,专注于每个消费者只处理一条消息,并具备消息重试和死信队列等能力。

然后,在越来越多的 AI 场景,需要两者兼具,因为 AI 代理在实时环境中观测,同时必须执行可靠的操作。这正是Pulsar 持续专注的方向,将流+消息进行融合,Pulsar 原生支持多种消息语义。其灵活的订阅模式(独占、共享、故障转移、键共享)让你能在同一平台上为不同任务选择合适的工具。这意味着系统扩展更少,组件间集成更简单——这对复杂的 AI 代理架构来说是一个很大的优势。

|Pulsar 的流(Streaming)和消息(Messaging)场景结合,通过 Key-shard(键共享)、Failover(故障转移)、Exclusive(独占)、Shared(共享)四种订阅模式来实现。

参 考

参 考

https://huggingface.co/spaces/nanotron/ultrascale-playbook

https://seanfalconer.medium.com/the-future-of-ai-agents-is-event-driven-9e25124060d6

https://www.linkedin.com/pulse/kafkas-role-powering-next-wave-event-driven-agentic-ai-jeyaraman-xq0kc

https://mp.weixin.qq.com/s/4pIAZqH01Ib_OGGGD9OWQg

https://streamnative.io/blog/streams-vs-queues-why-your-agents-need-both–and-why-pulsar-protocol-delivers

https://dzone.com/articles/agentic-ai-using-apache-kafka-as-event-broker-with-agent2agent-protocol

https://mp.weixin.qq.com/s?__biz=MjM5MDE0Mjc4MA==&mid=2651248787&idx=2&sn=b2bc09cebce5296ba7d7b5cab1b4c76a&poc_token=HMD7OmmjPWSU8S4Wv17TfFVZvOZepoGlcSeCHT0I

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1155774.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

运用AI提升论文撰写生产力,7个推荐资源涵盖格式标准化和LaTeX排版功能

工具快速对比排名&#xff08;前7推荐&#xff09; 工具名称 核心功能亮点 处理时间 适配平台 aibiye 学生/编辑双模式降AIGC 1分钟 知网、万方等 aicheck AI痕迹精准弱化查重一体 ~20分钟 知网、格子达、维普 askpaper AIGC率个位数优化 ~20分钟 高校检测规则通…

电商行业的数据分析工具推荐

电商行业的数据分析工具推荐 关键词:电商行业、数据分析工具、数据挖掘、可视化、数据洞察 摘要:本文聚焦于电商行业,深入探讨了适用于该领域的各类数据分析工具。从工具的背景介绍出发,阐述其目的、适用读者和文档结构,详细解释相关术语。接着介绍核心概念与联系,通过文…

利用人工智能自动化论文生成,7个实用网站包含学术格式规范与LaTeX解决方案

工具快速对比排名&#xff08;前7推荐&#xff09; 工具名称 核心功能亮点 处理时间 适配平台 aibiye 学生/编辑双模式降AIGC 1分钟 知网、万方等 aicheck AI痕迹精准弱化查重一体 ~20分钟 知网、格子达、维普 askpaper AIGC率个位数优化 ~20分钟 高校检测规则通…

通过AI技术快速产出合规论文,7个工具网站提供LaTeX适配及格式校对服务

工具快速对比排名&#xff08;前7推荐&#xff09; 工具名称 核心功能亮点 处理时间 适配平台 aibiye 学生/编辑双模式降AIGC 1分钟 知网、万方等 aicheck AI痕迹精准弱化查重一体 ~20分钟 知网、格子达、维普 askpaper AIGC率个位数优化 ~20分钟 高校检测规则通…

什么是RPKI

文章目录为什么需要RPKIRPKI是如何工作的RPKI功能扩展RPKI&#xff08;Resource Public Key Infrastructure&#xff0c;资源公钥基础设施&#xff09;是一种基于PKI&#xff08;Public Key Infrastructure&#xff0c;公钥基础设施&#xff09;的技术&#xff0c;专门用于验证…

什么是RR

文章目录为什么使用RRRR的工作原理RR的使用场景在大规模的BGP网络中加入RR&#xff08;Route Reflector&#xff09;&#xff0c;是用来解决IBGP全连接网络的一种方案。 RR&#xff08;Route Reflector&#xff09;作为一种特殊的IBGP路由器&#xff0c;可以作为全网路由条目存…

什么是热插拔

文章目录 什么场景下需要进行热插拔如何进行热插拔 热插拔又称为带电插拔或热替换&#xff0c;是指在不切断设备电源的情况下&#xff0c;将主控板、接口板、光模块等部件插入或拔出设备。执行热插拔操作前需要详细阅读部件说明文档&#xff0c;避免误操作造成人身伤害或设备损…

亲测广东等离子处理机厂家

等离子表面处理技术&#xff1a;如何甄选真正可靠的设备制造商&#xff1f;在制造业向绿色、精密化转型的浪潮中&#xff0c;等离子表面处理机作为一种高效、环保的预处理技术&#xff0c;正日益成为提升产品性能与合格率的关键装备。然而&#xff0c;面对市场上琳琅满目的设备…

智能表格识别技术突破传统OCR局限,实现复杂纸质表格的精准数字化转换

当你翻出一张泛黄的纸质电费单、一份密密麻麻的医院化验报告&#xff0c;或是一本夹着复杂统计表格的老年鉴&#xff0c;是否曾想过&#xff1a;这些看似“沉默”的纸面信息&#xff0c;能不能一键变成电脑里整齐可查的电子表格&#xff1f;过去&#xff0c;答案往往是“难”—…

这几款iPhone“邪修”软件,好用到逆天!

在日常使用iPhone的过程中&#xff0c;不知道屏幕前的你是否和我一样&#xff0c;总希望有一些“神奇”的APP&#xff0c;能悄无声息地提升效率、丰富生活&#xff0c;甚至帮你完成一些看似不可能的任务&#xff1f;本篇内容&#xff0c;小编就为大家精心推荐7款被不少用户称为…

【开题答辩全过程】以 基于小程序的精品衣柜系统的设计与实现为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

[大模型架构] LangGraph AI 工作流编排(15)

一、LangGraph 核心概念拆解&#xff08;理解工作流的 “骨架”&#xff09; 视频首先系统讲解 LangGraph 的核心组件与设计思想&#xff0c;避免开发者因概念模糊导致后续开发踩坑&#xff0c;核心围绕 “状态&#xff08;State&#xff09;、节点&#xff08;Node&#xff0…

【开题答辩全过程】以 基于Java的健身俱乐部管理系统的设计与开发为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

【大数据毕设推荐】基于Spark的大学排名数据可视化系统,Python+Hadoop技术栈详解 毕业设计 选题推荐 毕设选题 数据分析 机器学习

✍✍计算机毕设指导师** ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡有什么问题可以…

【开题答辩全过程】以 基于Web技术的知识付费平台为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

深度解读.NET中ConcurrentDictionary:高效线程安全字典的原理与应用

深度解读.NET中ConcurrentDictionary&#xff1a;高效线程安全字典的原理与应用 在多线程编程场景下&#xff0c;数据的并发访问控制是确保程序正确性和性能的关键。.NET中的ConcurrentDictionary提供了一种线程安全的字典实现&#xff0c;允许在多个线程同时访问和修改字典时&…

在外如何用手机像翻相册一样查看其他设备里所有文件?

在外急需调取家里NAS、电脑的资料&#xff1f;别再折腾U盘和复杂的远程设置了。今天就教你用节点小宝&#xff0c;像翻看自己手机相册一样&#xff0c;随时随地、直观地访问你所有设备里的文件。一把钥匙 打开所有设备的“文件抽屉”节点小宝的“远程文件”功能就是一把钥匙。它…

智能电商客服:AI工具驱动的服务价值链重构与行业突围

一、行业核心矛盾&#xff1a;全渠道割裂与价值创造乏力的双重瓶颈当前电商服务场景已呈现“多触点、碎片化、高并发”特征&#xff0c;传统客服模式难以适配行业发展需求。一方面&#xff0c;消费者分散于抖音、小红书、电商平台等多渠道&#xff0c;人工客服需频繁切换操作界…

C++跨平台开发的核心挑战平台差异性处理操作系统AP

C跨平台开发的核心挑战平台差异性处理 硬件架构差异&#xff08;x86/ARM&#xff09;、操作系统API&#xff08;Windows/Linux/macOS&#xff09;、编译器行为&#xff08;GCC/Clang/MSVC&#xff09;带来的兼容性问题。需要抽象系统调用&#xff0c;处理字节序、内存对齐等底层…

Linux网络编程-UDP 广播原理与实战

一、UDP 广播核心概念 UDP 广播是指一台主机向所在子网&#xff08;同一局域网&#xff09;内的所有主机发送数据的通信方式&#xff0c;是 UDP 无连接特性的典型应用场景。 1.1 广播地址分类 类型格式 / 示例特点受限广播地址255.255.255.255① 不会被路由器转发&#xff1…