Dify异步接口调用优化实践:解决长时任务处理与网络超时疑问

news/2025/11/21 21:52:13/文章来源:https://www.cnblogs.com/ljbguanli/p/19254604

Dify异步接口调用优化实践:解决长时任务处理与网络超时疑问

Dify异步接口调用优化实践:解决长时任务处理与网络超时问题

项目背景

在当前负责的AI研发试验落地项目中,我们面临一个关键的业务需求:用户需要对商品物料信息进行知识库匹配,以找到最相似的目标物料数据。该功能基于Dify平台构建工作流实现,技术实现难度适中。

技术挑战

初始方案及局限性

初期实现采用单条匹配模式,用户反馈处理效率较低。为此,我们升级为Excel批量导入并发匹配方案,但随之暴露出多个技术瓶颈:
  1. Dify平台限制
  • 接口调用超时限制
  • 循环执行次数限制
  • 网页请求超时限制
  1. 系统稳定性问题
  • 网络波动导致连接中断
  • 大数据量匹配时连接稳定性下降
  • 用户无法可靠获取处理结果

解决方案探索

虽然可以通过调整Dify配置参数缓解部分问题,但无法从根本上解决网络不稳定导致的连接中断。开放API日志查询权限让用户自行获取结果,又会带来安全管控风险。

技术实现方案

异步接口架构设计

经过技术调研,我们参考了GitHub社区的相关讨论,发现官方暂未正式支持异步接口。基于一个社区贡献的PR思路,我们在Dify 1.8.1版本上实现了异步接口功能。

核心代码实现

1. 接口参数扩展
Service API 配置 (api/controllers/service_api/__init__.py):
from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi
bp = Blueprint("service_api", __name__, url_prefix="/v1")
api = ExternalApi(bp,version="1.0",title="Service API",description="API for application services",doc="/docs",  # Enable Swagger UI at /v1/docs
)
service_api_ns = Namespace("service_api", description="Service operations", path="/")
from . import index
from .app import annotation, app, audio, completion, conversation, file, file_preview, message, site, workflow, workflow_run
from .dataset import dataset, document, hit_testing, metadata, segment, upload_file
from .workspace import models
api.add_namespace(service_api_ns)
Completion API 异步参数 (api/controllers/service_api/app/completion.py):
chat_parser = reqparse.RequestParser()
chat_parser.add_argument("inputs", type=dict, required=True, location="json", help="Input parameters for chat")
chat_parser.add_argument("query", type=str, required=True, location="json", help="The chat query")
chat_parser.add_argument("files", type=list, required=False, location="json", help="List of file attachments")
chat_parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json", help="Response mode"
)
chat_parser.add_argument("conversation_id", type=uuid_value, location="json", help="Existing conversation ID")
chat_parser.add_argument("retriever_from", type=str, required=False, default="dev", location="json", help="Retriever source"
)
chat_parser.add_argument("auto_generate_name",type=bool,required=False,default=True,location="json",help="Auto generate conversation name",
)
# 新增异步调用参数
chat_parser.add_argument("is_async", type=bool, required=False, default=False, location="json")
chat_parser.add_argument("workflow_id", type=str, required=False, location="json", help="Workflow ID for advanced chat")
Workflow API 异步参数 (api/controllers/service_api/app/workflow.py):
workflow_run_parser = reqparse.RequestParser()
workflow_run_parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
workflow_run_parser.add_argument("files", type=list, required=False, location="json")
workflow_run_parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
# 新增异步调用参数
workflow_run_parser.add_argument("is_async", type=bool, required=False, default=False, location="json")
2. 应用生成实体配置
Advanced Chat 应用生成器 (api/core/app/apps/advanced_chat/app_generator.py):
def generate(self, args: dict) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:# ... 原有代码 ...extras = {"auto_generate_conversation_name": args.get("auto_generate_name", False),# 新增异步配置"is_async": args.get("is_async", False),}# ... 后续处理逻辑 ...
Workflow 应用生成器 (api/core/app/apps/workflow/app_generator.py):
def generate(self, args: dict) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:# ... 原有代码 ...workflow_run_id = str(uuid.uuid4())application_generate_entity = WorkflowAppGenerateEntity(task_id=str(uuid.uuid4()),app_config=app_config,file_upload_config=file_extra_config,inputs=self._prepare_user_inputs(user_inputs=inputs,variables=app_config.variables,tenant_id=app_model.tenant_id,strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,),files=list(system_files),user_id=user.id,stream=streaming,invoke_from=invoke_from,call_depth=call_depth,trace_manager=trace_manager,workflow_execution_id=workflow_run_id,# 新增异步配置extras={"is_async": args.get("is_async", False)},)
3. 任务管道处理核心逻辑
Advanced Chat 任务管道 (api/core/app/apps/advanced_chat/generate_task_pipeline.py):
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> ChatbotAppBlockingResponse:"""处理阻塞响应"""is_async = self._application_generate_entity.extras.get("is_async", False)for stream_response in generator:if is_async:# 异步模式立即返回任务IDreturn ChatbotAppBlockingResponse(task_id=stream_response.task_id,data=ChatbotAppBlockingResponse.Data(id=self._message_id,mode=self._conversation_mode,conversation_id=self._conversation_id,message_id=self._message_id,answer=str(self._application_generate_entity.workflow_run_id),created_at=int(self._message_created_at),),)if isinstance(stream_response, ErrorStreamResponse):raise stream_response.errelif isinstance(stream_response, MessageEndStreamResponse):# 同步模式正常处理完成响应extras = {}if stream_response.metadata:extras["metadata"] = stream_response.metadatareturn ChatbotAppBlockingResponse(task_id=stream_response.task_id,data=ChatbotAppBlockingResponse.Data(id=self._message_id,mode=self._conversation_mode,conversation_id=self._conversation_id,message_id=self._message_id,answer=self._task_state.answer,created_at=self._message_created_at,**extras,),)else:continueraise ValueError("queue listening stopped unexpectedly.")
def _async_process_stream_response(self, publisher):"""异步处理流响应 - 核心异步逻辑"""# 初始化队列管理器queue_manager = MessageBasedAppQueueManager(task_id=self._application_generate_entity.task_id,user_id=self._application_generate_entity.user_id,invoke_from=self._application_generate_entity.invoke_from,conversation_id=self._conversation_id,app_mode=self._conversation_mode,message_id=self._message_id,)# 启动工作线程处理实际任务worker_thread = threading.Thread(target=self._generate_worker,kwargs={"flask_app": current_app._get_current_object(),"queue_manager": queue_manager,"context": contextvars.copy_context(),"publisher": publisher,},)worker_thread.start()# 返回消费者工作器的结果yield from self._consumer_worker(queue_manager)
def _generate_worker(self, flask_app: Flask, queue_manager: AppQueueManager, context: contextvars.Context, publisher) -> None:"""在工作线程中执行生成任务"""for var, val in context.items():var.set(val)with flask_app.app_context():response_generator = self._sync_process_stream_response(publisher)for generator in response_generator:if generator is None:continue# 将流响应映射为队列消息并发布message = ForwardQueueMessage(event=advance_chat_queue_task_map[generator.event],response=generator)queue_manager.publish(message, PublishFrom.TASK_PIPELINE)
def _consumer_worker(self, queue_manager: AppQueueManager) -> Generator[StreamResponse, None, None]:"""消费队列中的消息"""for message in queue_manager.listen():event = message.eventif isinstance(event, ForwardQueueMessage):yield event.response
Workflow 任务管道 (api/core/app/apps/workflow/generate_task_pipeline.py):
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> WorkflowAppBlockingResponse:"""转换为阻塞响应"""is_async = self._application_generate_entity.extras.get("is_async", False)for stream_response in generator:if is_async:# 异步模式立即返回工作流运行IDreturn WorkflowAppBlockingResponse(task_id=self._application_generate_entity.task_id,workflow_run_id=self._application_generate_entity.workflow_execution_id,data=WorkflowAppBlockingResponse.Data(id=self._application_generate_entity.app_config.app_id,workflow_id=self._workflow_id,status="processing",elapsed_time=0,total_tokens=0,total_steps=0,created_at=int(time.time()),finished_at=int(time.time()),),)if isinstance(stream_response, ErrorStreamResponse):raise stream_response.errelif isinstance(stream_response, WorkflowFinishStreamResponse):# 同步模式返回完整结果response = WorkflowAppBlockingResponse(task_id=self._application_generate_entity.task_id,workflow_run_id=stream_response.data.id,data=WorkflowAppBlockingResponse.Data(id=stream_response.data.id,workflow_id=stream_response.data.workflow_id,status=stream_response.data.status,outputs=stream_response.data.outputs,error=stream_response.data.error,elapsed_time=stream_response.data.elapsed_time,total_tokens=stream_response.data.total_tokens,total_steps=stream_response.data.total_steps,created_at=int(stream_response.data.created_at),finished_at=int(stream_response.data.finished_at),),)return responseelse:continueraise ValueError("queue listening stopped unexpectedly.")
4. 事件映射系统
队列任务桥接 (api/core/app/entities/queue_task_bridge.py):
from core.app.entities.queue_entities import AppQueueEvent, QueueEvent
from core.app.entities.task_entities import StreamEvent, StreamResponse
# 工作流事件映射
workflow_queue_task_map = {StreamEvent.PING: QueueEvent.PING,StreamEvent.ERROR: QueueEvent.ERROR,StreamEvent.MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.MESSAGE_END: QueueEvent.MESSAGE_END,StreamEvent.TTS_MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.TTS_MESSAGE_END: QueueEvent.MESSAGE_END,StreamEvent.MESSAGE_FILE: QueueEvent.MESSAGE_FILE,StreamEvent.MESSAGE_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_THOUGHT: QueueEvent.AGENT_THOUGHT,StreamEvent.AGENT_MESSAGE: QueueEvent.AGENT_MESSAGE,StreamEvent.WORKFLOW_STARTED: QueueEvent.WORKFLOW_STARTED,StreamEvent.WORKFLOW_FINISHED: QueueEvent.WORKFLOW_SUCCEEDED,StreamEvent.NODE_STARTED: QueueEvent.NODE_STARTED,StreamEvent.NODE_FINISHED: QueueEvent.NODE_SUCCEEDED,StreamEvent.NODE_RETRY: QueueEvent.RETRY,StreamEvent.PARALLEL_BRANCH_STARTED: QueueEvent.PARALLEL_BRANCH_RUN_STARTED,StreamEvent.PARALLEL_BRANCH_FINISHED: QueueEvent.PARALLEL_BRANCH_RUN_SUCCEEDED,StreamEvent.ITERATION_STARTED: QueueEvent.ITERATION_START,StreamEvent.ITERATION_NEXT: QueueEvent.ITERATION_NEXT,StreamEvent.ITERATION_COMPLETED: QueueEvent.ITERATION_COMPLETED,StreamEvent.LOOP_STARTED: QueueEvent.LOOP_START,StreamEvent.LOOP_NEXT: QueueEvent.LOOP_NEXT,StreamEvent.LOOP_COMPLETED: QueueEvent.LOOP_COMPLETED,StreamEvent.TEXT_CHUNK: QueueEvent.TEXT_CHUNK,StreamEvent.TEXT_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_LOG: QueueEvent.AGENT_LOG,
}
# 高级聊天事件映射
advance_chat_queue_task_map = {StreamEvent.PING: QueueEvent.PING,StreamEvent.ERROR: QueueEvent.ERROR,StreamEvent.MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.MESSAGE_END: QueueEvent.ADVANCED_CHAT_MESSAGE_END,StreamEvent.TTS_MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.TTS_MESSAGE_END: QueueEvent.MESSAGE_END,StreamEvent.MESSAGE_FILE: QueueEvent.MESSAGE_FILE,StreamEvent.MESSAGE_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_THOUGHT: QueueEvent.AGENT_THOUGHT,StreamEvent.AGENT_MESSAGE: QueueEvent.AGENT_MESSAGE,StreamEvent.WORKFLOW_STARTED: QueueEvent.WORKFLOW_STARTED,StreamEvent.WORKFLOW_FINISHED: QueueEvent.WORKFLOW_SUCCEEDED,StreamEvent.NODE_STARTED: QueueEvent.NODE_STARTED,StreamEvent.NODE_FINISHED: QueueEvent.NODE_SUCCEEDED,StreamEvent.NODE_RETRY: QueueEvent.RETRY,StreamEvent.PARALLEL_BRANCH_STARTED: QueueEvent.PARALLEL_BRANCH_RUN_STARTED,StreamEvent.PARALLEL_BRANCH_FINISHED: QueueEvent.PARALLEL_BRANCH_RUN_SUCCEEDED,StreamEvent.ITERATION_STARTED: QueueEvent.ITERATION_START,StreamEvent.ITERATION_NEXT: QueueEvent.ITERATION_NEXT,StreamEvent.ITERATION_COMPLETED: QueueEvent.ITERATION_COMPLETED,StreamEvent.LOOP_STARTED: QueueEvent.LOOP_START,StreamEvent.LOOP_NEXT: QueueEvent.LOOP_NEXT,StreamEvent.LOOP_COMPLETED: QueueEvent.LOOP_COMPLETED,StreamEvent.TEXT_CHUNK: QueueEvent.TEXT_CHUNK,StreamEvent.TEXT_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_LOG: QueueEvent.AGENT_LOG,
}
class ForwardQueueMessage(AppQueueEvent):"""转发队列消息实体"""event: QueueEvent = QueueEvent.PINGresponse: StreamResponse
5. 结果查询接口
消息查询API (api/controllers/web/message.py):
class MessageApi(WebApiResource):@marshal_with(message_fields)def get(self, app_model: App, end_user: EndUser, message_id):message_id = str(message_id)message = db.session.query(Message).filter(Message.id == message_id,Message.app_id == app_model.id).first()if not message:raise NotFound("Message Not Exists.")return message
# API路由注册
api.add_resource(MessageListApi, "/messages")
api.add_resource(MessageFeedbackApi, "/messages//feedbacks")
api.add_resource(MessageMoreLikeThisApi, "/messages//more-like-this")
api.add_resource(MessageSuggestedQuestionApi, "/messages//suggested-questions")
api.add_resource(MessageApi, "/apps/messages/")

最终效果

技术总结

本次优化通过引入异步处理机制,有效解决了Dify平台在处理批量数据匹配时的技术瓶颈。关键成功因素包括:
  1. 非侵入式设计:在保持原有接口兼容性的基础上扩展功能
  2. 队列管理:基于消息队列实现任务状态管理
  3. 线程安全:多线程环境下的数据一致性保障
  4. 错误恢复:完善的异常处理和任务恢复机制
该方案为类似AI应用场景中的长时任务处理提供了可复用的技术范式,在提升系统稳定性的同时,显著改善了用户体验。

注:本文基于Dify 1.8.1版本实现,具体实现细节可能随版本更新而调整。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/972573.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32系统时钟与SysTick定时器

一、系统嘀嗒定时器(SysTick)全面解析 1. SysTick定时器基本概念 定时器是STM32 中常用的外设,一般定时器的基本功能就是定时,而在Cortex M3/M4 内核中也包含一个简单的定时器,就是系统嘀嗒定时器(Systick),它是属…

【251121】CF2171 Div.3 vp 总结

老师说让我尝试体验快乐 AK,但是我失败了。 还是太菜了喵! 题目梗概题目编号 题目名称 题目链接A Shizuku Hoshikawa and Farm Legs LinkB Yuu Koito and Minimum Absolute Sum LinkC1 / C2 Renako Amaori and XOR G…

OI 笑传 #32

なんてったって春今天是 bct Day2,赛时 \(40+60+10+0=110\),rk 70。 挂分原因是被 vector 卡常了/fn。然后 T4 捆绑 Sbt#1 T 了一个于是又没了 20pts。 评价是 ok 场,练习了对拍的使用。 发现 hm2ns 总是会随口否掉…

PyOpenGL实现Bresenham算法

Bresenham直线算法 Bresenham画圆算法 中点Bresenham画椭圆算法1. Bresenham直线生成算法 1.1 理论基础 绘制直线的最直观想法是使用直线方程 y = mx + b,并对x的每个整数值计算y 这涉及到大量的浮点数乘法和舍入运算…

【Linux】教你在 Linux 上搭建 Web 服务器,步骤清晰无门槛 - 详解

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

【第7章 I/O编程与异常】\r\n 和 \n\r是一回事吗?

\r\n 和 \n\r 不是一回事,它们是两种完全不同的字节序列,在语义和实际效果上也完全不同。 一、含义对比 序列 字节(十六进制) 含义\r\n 0x0D 0x0A 回车(Carriage Return) + 换行(Line Feed)✅ Windows 标准换行…

2025-11-21

CF Problem - 1234C - Codeforces(贪心) #include <bits/stdc++.h> using namespace std; #define LL long long const LL mod = 998244353; const int N=2e5+10; string s[2];void solve() {int n;cin >&g…

深入解析:windows显示驱动开发-CCD api的摘要及方案(一)

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

nju实验七 状态机及键盘输入

本实验的目的是学习状态机的工作原理,了解状态机的编码方式,并利用PS/2键盘输入实现简单状态机的设计。实验七 状态机及键盘输入 简单状态机 . ├── build ├── constr │ └── top.nxdc ├── csrc │ └…

2025-11-21 XQQ NOIP Round 1 hetao1733837的record

2025-11-21 XQQ NOIP Round 1 hetao1733837的record2025-11-21 XQQ NOIP Round 1 hetao1733837的record A.tree 提交链接:树 题面 题目描述 给定一棵 $n$ 个点的树和一个长度为 $n$ 的数组 $score[0], ..., score[n-1…

Gephi如何支持MySQL数据的复杂查询

Gephi是一个用于网络可视化的开源软件,它主要处理的是图数据结构。虽然Gephi本身并不直接支持MySQL数据库的复杂查询,但你可以通过以下步骤将MySQL中的数据导入到Gephi中,并在Gephi中进行进一步的处理和分析:数据提…

Mozilla CI日志中暴露微软x-apikey的安全事件分析

微软遥测API密钥在Mozilla持续集成公共日志中意外暴露。该密钥出现在自动化Firefox测试期间发送到微软遥测端点的HTTP POST请求中,通过mitmproxy日志捕获。尽管安全影响有限,但Mozilla已采取措施防止未来凭证泄露。报…

Gephi中MySQL数据的节点和边如何设置

在Gephi中,使用MySQL数据源时,首先需要导入数据到Gephi的工作空间。以下是设置节点(Node)和边(Edge)的一般步骤:导入MySQL数据:使用Gephi的MySQL插件或其他适当的方法连接到您的MySQL数据库。 执行SQL查询以提…

Gephi怎样优化MySQL数据的展示效果

Gephi是一个用于网络可视化的开源软件,它可以帮助用户创建网络图和数据可视化。然而,Gephi本身并不直接与MySQL数据库交互,它通常用于处理和分析已经存在于内存或文件中的数据集。如果你想要优化MySQL数据的展示效果…

Gephi对MySQL数据的导入导出有何支持

Gephi是一个用于网络可视化的开源软件,它主要处理的是图数据结构。关于Gephi对MySQL数据的导入导出支持,这通常取决于Gephi的版本以及用户是否进行了特定的配置或开发。在标准的Gephi版本中,可能并没有直接针对MySQ…

Fisrt Blog

音视频简介 这是单个代码,find() 这是一段代码 #include<stdio.h> int main() {code } 213213"213231" 这是斜体 这是加粗 ffmpeg find函数她有以下几个用法:第一个用法是 第二个用法我是十个机器人…

揭秘Java对象的内存占用量:从面试题到底层原理

你是否在面试中被问到过:“一个new Object()在JVM中占多少内存?” 这个问题看似简单,却考察了你对Java内存模型(JVM)、数据结构和性能优化的理解深度。今天,我们就来彻底搞懂它。 一、核心结论:一个Java对象的三…

nju实验六 移位寄存器及桶形移位器

本实验将学习常用的移位寄存器的设计,并实现在移位指令中需要用到的桶形移位器。实验六 移位寄存器及桶形移位器 算术移位和逻辑移位寄存器 module shift_register_8bit (input clk, // 时钟信号input r…

P6727 [COCI 2015/2016 #5] OOP

题目给出字符串按 * 断开得到前后两段 \(P,S\),即要求满足一下条件的模式串个数:具有前缀 \(P\),后缀 \(S\)。\(|P|+|S|\le siz\)。

c语言和python如何解决文本文件中“不同平台换行符不兼容”问题

在 C 语言和 Python 中解决不同平台换行符不兼容的核心思路一致:统一换行符标准(推荐使用 \n),或在读写时适配目标平台。以下是具体实现方案: 一、核心背景:不同平台的换行符差异平台 换行符 说明Unix/Linux \n …