Dify异步接口调用优化实践:解决长时任务处理与网络超时疑问
Dify异步接口调用优化实践:解决长时任务处理与网络超时问题
项目背景
在当前负责的AI研发试验落地项目中,我们面临一个关键的业务需求:用户需要对商品物料信息进行知识库匹配,以找到最相似的目标物料数据。该功能基于Dify平台构建工作流实现,技术实现难度适中。
技术挑战
初始方案及局限性
初期实现采用单条匹配模式,用户反馈处理效率较低。为此,我们升级为Excel批量导入并发匹配方案,但随之暴露出多个技术瓶颈:
- Dify平台限制
- 接口调用超时限制
- 循环执行次数限制
- 网页请求超时限制
- 系统稳定性问题
- 网络波动导致连接中断
- 大数据量匹配时连接稳定性下降
- 用户无法可靠获取处理结果
解决方案探索
虽然可以通过调整Dify配置参数缓解部分问题,但无法从根本上解决网络不稳定导致的连接中断。开放API日志查询权限让用户自行获取结果,又会带来安全管控风险。
技术实现方案
异步接口架构设计
经过技术调研,我们参考了GitHub社区的相关讨论,发现官方暂未正式支持异步接口。基于一个社区贡献的PR思路,我们在Dify 1.8.1版本上实现了异步接口功能。
核心代码实现
1. 接口参数扩展
Service API 配置 (api/controllers/service_api/__init__.py):
from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi
bp = Blueprint("service_api", __name__, url_prefix="/v1")
api = ExternalApi(bp,version="1.0",title="Service API",description="API for application services",doc="/docs", # Enable Swagger UI at /v1/docs
)
service_api_ns = Namespace("service_api", description="Service operations", path="/")
from . import index
from .app import annotation, app, audio, completion, conversation, file, file_preview, message, site, workflow, workflow_run
from .dataset import dataset, document, hit_testing, metadata, segment, upload_file
from .workspace import models
api.add_namespace(service_api_ns)
Completion API 异步参数 (api/controllers/service_api/app/completion.py):
chat_parser = reqparse.RequestParser()
chat_parser.add_argument("inputs", type=dict, required=True, location="json", help="Input parameters for chat")
chat_parser.add_argument("query", type=str, required=True, location="json", help="The chat query")
chat_parser.add_argument("files", type=list, required=False, location="json", help="List of file attachments")
chat_parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json", help="Response mode"
)
chat_parser.add_argument("conversation_id", type=uuid_value, location="json", help="Existing conversation ID")
chat_parser.add_argument("retriever_from", type=str, required=False, default="dev", location="json", help="Retriever source"
)
chat_parser.add_argument("auto_generate_name",type=bool,required=False,default=True,location="json",help="Auto generate conversation name",
)
# 新增异步调用参数
chat_parser.add_argument("is_async", type=bool, required=False, default=False, location="json")
chat_parser.add_argument("workflow_id", type=str, required=False, location="json", help="Workflow ID for advanced chat")
Workflow API 异步参数 (api/controllers/service_api/app/workflow.py):
workflow_run_parser = reqparse.RequestParser()
workflow_run_parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
workflow_run_parser.add_argument("files", type=list, required=False, location="json")
workflow_run_parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
# 新增异步调用参数
workflow_run_parser.add_argument("is_async", type=bool, required=False, default=False, location="json")
2. 应用生成实体配置
Advanced Chat 应用生成器 (api/core/app/apps/advanced_chat/app_generator.py):
def generate(self, args: dict) -> Union[ChatbotAppBlockingResponse, Generator[ChatbotAppStreamResponse, None, None]]:# ... 原有代码 ...extras = {"auto_generate_conversation_name": args.get("auto_generate_name", False),# 新增异步配置"is_async": args.get("is_async", False),}# ... 后续处理逻辑 ...
Workflow 应用生成器 (api/core/app/apps/workflow/app_generator.py):
def generate(self, args: dict) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:# ... 原有代码 ...workflow_run_id = str(uuid.uuid4())application_generate_entity = WorkflowAppGenerateEntity(task_id=str(uuid.uuid4()),app_config=app_config,file_upload_config=file_extra_config,inputs=self._prepare_user_inputs(user_inputs=inputs,variables=app_config.variables,tenant_id=app_model.tenant_id,strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,),files=list(system_files),user_id=user.id,stream=streaming,invoke_from=invoke_from,call_depth=call_depth,trace_manager=trace_manager,workflow_execution_id=workflow_run_id,# 新增异步配置extras={"is_async": args.get("is_async", False)},)
3. 任务管道处理核心逻辑
Advanced Chat 任务管道 (api/core/app/apps/advanced_chat/generate_task_pipeline.py):
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> ChatbotAppBlockingResponse:"""处理阻塞响应"""is_async = self._application_generate_entity.extras.get("is_async", False)for stream_response in generator:if is_async:# 异步模式立即返回任务IDreturn ChatbotAppBlockingResponse(task_id=stream_response.task_id,data=ChatbotAppBlockingResponse.Data(id=self._message_id,mode=self._conversation_mode,conversation_id=self._conversation_id,message_id=self._message_id,answer=str(self._application_generate_entity.workflow_run_id),created_at=int(self._message_created_at),),)if isinstance(stream_response, ErrorStreamResponse):raise stream_response.errelif isinstance(stream_response, MessageEndStreamResponse):# 同步模式正常处理完成响应extras = {}if stream_response.metadata:extras["metadata"] = stream_response.metadatareturn ChatbotAppBlockingResponse(task_id=stream_response.task_id,data=ChatbotAppBlockingResponse.Data(id=self._message_id,mode=self._conversation_mode,conversation_id=self._conversation_id,message_id=self._message_id,answer=self._task_state.answer,created_at=self._message_created_at,**extras,),)else:continueraise ValueError("queue listening stopped unexpectedly.")
def _async_process_stream_response(self, publisher):"""异步处理流响应 - 核心异步逻辑"""# 初始化队列管理器queue_manager = MessageBasedAppQueueManager(task_id=self._application_generate_entity.task_id,user_id=self._application_generate_entity.user_id,invoke_from=self._application_generate_entity.invoke_from,conversation_id=self._conversation_id,app_mode=self._conversation_mode,message_id=self._message_id,)# 启动工作线程处理实际任务worker_thread = threading.Thread(target=self._generate_worker,kwargs={"flask_app": current_app._get_current_object(),"queue_manager": queue_manager,"context": contextvars.copy_context(),"publisher": publisher,},)worker_thread.start()# 返回消费者工作器的结果yield from self._consumer_worker(queue_manager)
def _generate_worker(self, flask_app: Flask, queue_manager: AppQueueManager, context: contextvars.Context, publisher) -> None:"""在工作线程中执行生成任务"""for var, val in context.items():var.set(val)with flask_app.app_context():response_generator = self._sync_process_stream_response(publisher)for generator in response_generator:if generator is None:continue# 将流响应映射为队列消息并发布message = ForwardQueueMessage(event=advance_chat_queue_task_map[generator.event],response=generator)queue_manager.publish(message, PublishFrom.TASK_PIPELINE)
def _consumer_worker(self, queue_manager: AppQueueManager) -> Generator[StreamResponse, None, None]:"""消费队列中的消息"""for message in queue_manager.listen():event = message.eventif isinstance(event, ForwardQueueMessage):yield event.response
Workflow 任务管道 (api/core/app/apps/workflow/generate_task_pipeline.py):
def _to_blocking_response(self, generator: Generator[StreamResponse, None, None]) -> WorkflowAppBlockingResponse:"""转换为阻塞响应"""is_async = self._application_generate_entity.extras.get("is_async", False)for stream_response in generator:if is_async:# 异步模式立即返回工作流运行IDreturn WorkflowAppBlockingResponse(task_id=self._application_generate_entity.task_id,workflow_run_id=self._application_generate_entity.workflow_execution_id,data=WorkflowAppBlockingResponse.Data(id=self._application_generate_entity.app_config.app_id,workflow_id=self._workflow_id,status="processing",elapsed_time=0,total_tokens=0,total_steps=0,created_at=int(time.time()),finished_at=int(time.time()),),)if isinstance(stream_response, ErrorStreamResponse):raise stream_response.errelif isinstance(stream_response, WorkflowFinishStreamResponse):# 同步模式返回完整结果response = WorkflowAppBlockingResponse(task_id=self._application_generate_entity.task_id,workflow_run_id=stream_response.data.id,data=WorkflowAppBlockingResponse.Data(id=stream_response.data.id,workflow_id=stream_response.data.workflow_id,status=stream_response.data.status,outputs=stream_response.data.outputs,error=stream_response.data.error,elapsed_time=stream_response.data.elapsed_time,total_tokens=stream_response.data.total_tokens,total_steps=stream_response.data.total_steps,created_at=int(stream_response.data.created_at),finished_at=int(stream_response.data.finished_at),),)return responseelse:continueraise ValueError("queue listening stopped unexpectedly.")
4. 事件映射系统
队列任务桥接 (api/core/app/entities/queue_task_bridge.py):
from core.app.entities.queue_entities import AppQueueEvent, QueueEvent
from core.app.entities.task_entities import StreamEvent, StreamResponse
# 工作流事件映射
workflow_queue_task_map = {StreamEvent.PING: QueueEvent.PING,StreamEvent.ERROR: QueueEvent.ERROR,StreamEvent.MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.MESSAGE_END: QueueEvent.MESSAGE_END,StreamEvent.TTS_MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.TTS_MESSAGE_END: QueueEvent.MESSAGE_END,StreamEvent.MESSAGE_FILE: QueueEvent.MESSAGE_FILE,StreamEvent.MESSAGE_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_THOUGHT: QueueEvent.AGENT_THOUGHT,StreamEvent.AGENT_MESSAGE: QueueEvent.AGENT_MESSAGE,StreamEvent.WORKFLOW_STARTED: QueueEvent.WORKFLOW_STARTED,StreamEvent.WORKFLOW_FINISHED: QueueEvent.WORKFLOW_SUCCEEDED,StreamEvent.NODE_STARTED: QueueEvent.NODE_STARTED,StreamEvent.NODE_FINISHED: QueueEvent.NODE_SUCCEEDED,StreamEvent.NODE_RETRY: QueueEvent.RETRY,StreamEvent.PARALLEL_BRANCH_STARTED: QueueEvent.PARALLEL_BRANCH_RUN_STARTED,StreamEvent.PARALLEL_BRANCH_FINISHED: QueueEvent.PARALLEL_BRANCH_RUN_SUCCEEDED,StreamEvent.ITERATION_STARTED: QueueEvent.ITERATION_START,StreamEvent.ITERATION_NEXT: QueueEvent.ITERATION_NEXT,StreamEvent.ITERATION_COMPLETED: QueueEvent.ITERATION_COMPLETED,StreamEvent.LOOP_STARTED: QueueEvent.LOOP_START,StreamEvent.LOOP_NEXT: QueueEvent.LOOP_NEXT,StreamEvent.LOOP_COMPLETED: QueueEvent.LOOP_COMPLETED,StreamEvent.TEXT_CHUNK: QueueEvent.TEXT_CHUNK,StreamEvent.TEXT_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_LOG: QueueEvent.AGENT_LOG,
}
# 高级聊天事件映射
advance_chat_queue_task_map = {StreamEvent.PING: QueueEvent.PING,StreamEvent.ERROR: QueueEvent.ERROR,StreamEvent.MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.MESSAGE_END: QueueEvent.ADVANCED_CHAT_MESSAGE_END,StreamEvent.TTS_MESSAGE: QueueEvent.TEXT_CHUNK,StreamEvent.TTS_MESSAGE_END: QueueEvent.MESSAGE_END,StreamEvent.MESSAGE_FILE: QueueEvent.MESSAGE_FILE,StreamEvent.MESSAGE_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_THOUGHT: QueueEvent.AGENT_THOUGHT,StreamEvent.AGENT_MESSAGE: QueueEvent.AGENT_MESSAGE,StreamEvent.WORKFLOW_STARTED: QueueEvent.WORKFLOW_STARTED,StreamEvent.WORKFLOW_FINISHED: QueueEvent.WORKFLOW_SUCCEEDED,StreamEvent.NODE_STARTED: QueueEvent.NODE_STARTED,StreamEvent.NODE_FINISHED: QueueEvent.NODE_SUCCEEDED,StreamEvent.NODE_RETRY: QueueEvent.RETRY,StreamEvent.PARALLEL_BRANCH_STARTED: QueueEvent.PARALLEL_BRANCH_RUN_STARTED,StreamEvent.PARALLEL_BRANCH_FINISHED: QueueEvent.PARALLEL_BRANCH_RUN_SUCCEEDED,StreamEvent.ITERATION_STARTED: QueueEvent.ITERATION_START,StreamEvent.ITERATION_NEXT: QueueEvent.ITERATION_NEXT,StreamEvent.ITERATION_COMPLETED: QueueEvent.ITERATION_COMPLETED,StreamEvent.LOOP_STARTED: QueueEvent.LOOP_START,StreamEvent.LOOP_NEXT: QueueEvent.LOOP_NEXT,StreamEvent.LOOP_COMPLETED: QueueEvent.LOOP_COMPLETED,StreamEvent.TEXT_CHUNK: QueueEvent.TEXT_CHUNK,StreamEvent.TEXT_REPLACE: QueueEvent.MESSAGE_REPLACE,StreamEvent.AGENT_LOG: QueueEvent.AGENT_LOG,
}
class ForwardQueueMessage(AppQueueEvent):"""转发队列消息实体"""event: QueueEvent = QueueEvent.PINGresponse: StreamResponse
5. 结果查询接口
消息查询API (api/controllers/web/message.py):
class MessageApi(WebApiResource):@marshal_with(message_fields)def get(self, app_model: App, end_user: EndUser, message_id):message_id = str(message_id)message = db.session.query(Message).filter(Message.id == message_id,Message.app_id == app_model.id).first()if not message:raise NotFound("Message Not Exists.")return message
# API路由注册
api.add_resource(MessageListApi, "/messages")
api.add_resource(MessageFeedbackApi, "/messages//feedbacks")
api.add_resource(MessageMoreLikeThisApi, "/messages//more-like-this")
api.add_resource(MessageSuggestedQuestionApi, "/messages//suggested-questions")
api.add_resource(MessageApi, "/apps/messages/")
最终效果




技术总结
本次优化通过引入异步处理机制,有效解决了Dify平台在处理批量数据匹配时的技术瓶颈。关键成功因素包括:
- 非侵入式设计:在保持原有接口兼容性的基础上扩展功能
- 队列管理:基于消息队列实现任务状态管理
- 线程安全:多线程环境下的数据一致性保障
- 错误恢复:完善的异常处理和任务恢复机制
该方案为类似AI应用场景中的长时任务处理提供了可复用的技术范式,在提升系统稳定性的同时,显著改善了用户体验。
注:本文基于Dify 1.8.1版本实现,具体实现细节可能随版本更新而调整。