FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

news/2025/11/25 18:18:16/文章来源:https://www.cnblogs.com/yaoty-ai/p/19252545

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

在使用 FastAPI 构建 AI 对话应用时,StreamingResponse 是实现打字机效果的绝佳工具。通过 yield 逐步返回内容,用户体验非常流畅。但一个棘手的问题随之而来:如果用户在 AI 回答的过程中取消对话或中断了连接,后端会发生什么?我们如何确保对话记录等重要数据依然能被可靠地保存到数据库中?

问题出现过程

1. 客户端发起流式对话请求

我们从一个典型的流式对话接口开始。我们使用依赖注入来获取一个 SQLAlchemy 的 AsyncSession,在对话开始时创建消息,在对话结束后更新 AI 的回答。

流式对话原始代码(伪代码)
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

2. 客户端取消对话(主动断开)

当用户取消发送时,会抛出这个异常

pymysql.err.InterfaceError: 

原因:当客户端断开时 ,FastAPI 会立即把它的 session连接回收掉,底层的那个物理连接被标记为 Cancelled,然后执行finally的时候,再往下传原来session连接就不对了,save_conversation函数就会抛pymysql.err.InterfaceError。


问题解决尝试

尝试一:在 save_conversation 函数中创建新连接

一个自然的想法是:既然旧的 session 不能用了,那就在保存的时候检查一下,如果不可用就创建一个新的。

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

结果:失败! 没想到,即使创建了新的 session,依然抛出了 pymysql.err.InterfaceError

原因分析:之所以还会抛错误,原因是这个新会话 依然在使用已经被取消的连接池资源,因为 FastAPI/Starlette 在主请求取消时,会把整个 AsyncSessionLocal() 对象的连接都标记为 “cancelled”。即便你重新 async with AsyncSessionLocal(),底层复用的还是同一个数据库连接池里的连接,而那个连接刚被 cancel。

重新创建个数据库引擎 是肯定可以的,但是只是对话后更新,这么搞完全没必要。
或者创建个独立线程,在新线程中去创建新连接,应该是可以的,个人还是感觉比较重,浪费资源。


尝试二:创建个协程去执行save_conversation

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息# 创建协程执行asyncio.create_task(save_conversation(session, full_response))print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

疑惑点:asyncio.create_task 启动的协程仍然跑在同一个线程和进程里,也会复用那个全局的连接池,理论上确实还有可能拿到刚才那个被 cancel 的连接啊?

主要在于操作的时序和上下文隔离

  1. 先清理,后执行

    当原始请求被取消后,FastAPI 会立即开始清理与该请求相关的资源(包括回收它持有的数据库连接)。这个清理动作在 finally 块中调用 create_task 之前就已经触发了。我们派生出的后台任务是在这个清理逻辑之后才启动的。

  2. 上下文隔离

    这个后台协程已经完全不挂在 HTTP 请求的上下文上了。客户端断开与否,都影响不了它的独立运行。只要连接池中还有任意一个好连接,它就能完成写入。

  3. 高成功率

    因为顺序已经变成了:先断开、先清理 → 再新建、再执行,所以新任务向连接池请求时,拿到那个“坏掉”连接的概率已经大大降低。连接池会优先分配一个健康的、空闲的连接。

即使在极端情况下又拿到了旧连接,它也很有可能在 session.begin() 阶段就失败,我们还可以在后台任务的 try...except 块里加入重试逻辑(比如 await asyncio.sleep(0.1) 后重试),进一步提高健壮性。

大量测试后,发现真没问题😁。

结论

对于需要确保最终操作(如数据写入)一定执行的流式 API,asyncio.create_task 提供了一种轻量级且非常有效的解决方案。它避免了引入像 Celery 这样复杂的任务队列,同时优雅地解决了因客户端断连导致资源状态污染的问题。

通过将关键的收尾工作与不稳定的 HTTP 请求生命周期解耦,我们能构建出更加健壮和可靠的 FastAPI 应用。"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/976333.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

上海有哪些AI企业值得投资?行业潜力机构盘点

上海作为国内人工智能产业的核心发展高地,近年来在政策支持、技术研发与产业应用等方面均展现出强劲动力。随着AI技术与千行百业的深度融合,一批具备核心技术实力与商业化能力的企业逐渐崭露头角,成为行业关注的焦点…

2025年AI最具投资价值的企业排行及发展洞察

随着人工智能技术在各行业的深度渗透,AI企业的投资价值日益成为市场关注焦点。从技术研发实力、商业化落地能力到行业赋能成效,具备核心竞争力的企业正展现出强劲的增长潜力。本文结合企业技术布局、市场表现及行业影…

2025 长效阻垢马桶权威榜单:95% 阻垢率才达标,告别管路发黄烦恼

家有马桶的人多半都受过水垢的罪:管路悄悄发黄堵塞,喷枪出水越来越细,刷马桶时得跟顽固水垢死磕,硬水地区的家庭更是深受其扰。很多人不知道,水垢本质是水中钙镁离子遇热形成的结晶,一旦附着在马桶水路和喷枪上,…

2025 年成都蜂窝铝扣板生产厂家口碑推荐榜出炉

2025 年成都蜂窝铝扣板生产厂家口碑推荐榜出炉近日,结合第三方测评数据、行业口碑调研及工程案例反馈,聚焦成都吊顶铝扣板厂家与铝扣板生产厂家的核心竞争力,2025 成都蜂窝铝扣板生产厂家口碑推荐榜正式发布。本次榜…

2025年行业内四川噪声治理厂家口碑最好的厂家榜

2025 年行业内四川噪声治理厂家口碑最好的厂家榜随着环保意识升级与《中华人民共和国噪声污染防治法》的深入实施,四川噪声治理市场对 “效果达标、服务贴心、性价比高” 的需求愈发强烈,口碑已成为企业选择合作方的…

2025 年最新推荐冲击试验机优质厂家排行榜:摆锤 / 落锤 / 低温型设备精选,助力企业精准采购优质供应商低温冲击试验机/冲击试验机低温槽/冲击试验机缺口拉床公司推荐

引言 在工业制造、航空航天、冶金建材等关键领域,冲击试验机作为材料力学性能检测的核心设备,其精度、稳定性直接决定产品质量安全与行业技术升级成效。当前市场上试验机厂家数量繁杂,部分企业存在核心技术薄弱、检…

北京知名婚姻律所推荐:专注家事纠纷解决的专业选择

在北京,婚姻家事纠纷的处理往往需要专业法律支持,选择一家经验丰富的律所至关重要。本文结合服务特色与客户反馈,为有需求的朋友整理了北京地区专注婚姻家事领域的知名律所推荐。一、推荐榜单推荐 1:北京安嘉律师事…

2025 年试验机厂家最新推荐排行榜:覆盖多品类检测设备,揭晓国际认证齐全的优质品牌拉力试验机/万能试验机/弯曲试验机/扭转试验机/压力试验机/杯突试验机/高低温拉伸试验机公司推荐

引言 在工业制造、科研检测、航空航天等关键领域,试验机作为材料性能检测的核心设备,其精度、稳定性与可靠性直接决定了产品质量管控成效和技术创新进程。当前市场上试验机品牌鱼龙混杂,产品质量参差不齐,部分品牌…

争取孩子抚养权找哪个律师靠谱?婚姻纠纷律师选择参考

在处理婚姻家庭纠纷时,争取孩子抚养权往往是当事人关注的核心问题之一。这一过程不仅涉及复杂的法律程序,还可能掺杂情感因素,因此选择一位专业、有经验的律师至关重要。合适的律师能够提供清晰的法律指导,帮助当事…

2025 最新硅芯管源头厂家推荐排行榜:权威甄选高密度聚乙烯 / 通信 / 光缆用优质管材供应企业通信用硅芯管/光缆保护用硅芯管/高强度硅芯管/内壁润滑硅芯管公司推荐

引言 在通信工程、基础设施建设等核心领域,硅芯管的品质直接决定项目施工效率与长期运维成本。当前市场供需旺盛,但厂家资质良莠不齐,部分企业存在原材料掺假、工艺落后、供货延迟等问题,导致工程隐患频发。尤其随…

2025年11月山东石材雕刻机/墓碑雕刻机/绳锯机综合测评TOP10

摘要 随着石材加工行业向智能化、精密化方向快速发展,2025年山东石材雕刻机市场呈现出技术迭代加速、品质要求提升的明显趋势。本文基于市场调研和用户反馈,整理出十家值得关注的石材雕刻机品牌,排名仅作参考不区分…

2025 卫浴健康革命!全链路杀菌马桶榜单,99% 家庭都需要

卫生间作为家庭细菌滋生重灾区,大肠杆菌、金黄色葡萄球菌等常见致病菌易通过水路、接触表面传播,传统清洁方式难以实现全维度防护。2025 年,全链路杀菌成为智能马桶核心技术趋势,从进水源头到接触终端的全流程除菌…

2025年质量好的西安净化板实力厂家推荐排行榜

2025 年质量好的西安净化板实力厂家推荐排行榜在工业升级与洁净环境需求激增的 2025 年,净化板作为车间、实验室、洁净室等场景的核心建材,凭借 “防尘、抗菌、保温、隔音” 的核心优势,成为制造业、医疗、电子等行…

2025年评价高的西安净化板实力厂家最新用户好评榜

2025 年评价高的西安净化板实力厂家最新用户好评榜在工业洁净需求升级与口碑消费主导的 2025 年,西安净化板作为车间、实验室、洁净室的核心建材,其产品质量与服务体验成为客户决策的关键。西安净化板、西安岩棉净化…

2025年11月山东石材雕刻机/墓碑雕刻机/绳锯机综合选购指南与十大推荐:山东永福泰登顶

摘要 随着石材加工行业向智能化、精密化方向发展,山东作为全国重要的石材产业基地,石材雕刻机市场需求持续增长。本文基于行业调研和用户反馈,整理出2025年11月山东地区值得关注的石材雕刻机品牌推荐清单,排名不分…

时间序列信息预测:14种机器学习与深度学习模型

时间序列信息预测:14种机器学习与深度学习模型pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", &…

2025年口碑好的成都制造业短视频运营公司最新权威实力榜

2025 年口碑好的成都制造业短视频运营公司最新权威实力榜在 2025 年制造业数字化转型加速推进的背景下,短视频运营已成为成都制造业工厂拓展市场、提升业绩的关键手段。作为西部制造业核心城市,成都涌现出一批专注于…

2025年盐雾试验箱厂家口碑评分排行榜,淋雨试验箱/恒温恒湿试验箱/恒温恒湿房/光伏组件湿演式验箱/高低温试验箱盐雾试验箱厂商推荐排行

行业权威榜单发布 随着工业制造质量要求的不断提升,盐雾试验箱作为材料耐腐蚀性能测试的关键设备,其市场需求持续增长。基于2025年最新市场调研数据及用户反馈,我们特别推出本年度盐雾试验箱厂家口碑评分排行榜,为…

2025 最新推荐!金刚石量子传感器厂家权威榜单:聚焦技术创新、产业应用与国际测评领先品牌金刚石量子磁力仪/金刚石量子探针扫描仪/金刚石量子显微镜/金刚石量子温度探针公司推荐

引言 在工业检测、电力传输、芯片制造等关键领域,传统传感技术面临精度不足、环境适应性差、侵入式测量等瓶颈,制约产业智能化升级。金刚石量子传感器凭借超高灵敏度、高稳定性、非接触式测量等优势,成为破解难题的…

2025国内私有云服务厂商排名 | China’s Private Cloud Providers Ranking 2025

中国私有云市场竞争加速,华为云、阿里云与 AWS 凭借混合云架构与 AI-native 能力进入前三。 According to IDC, Huawei Cloud, Alibaba Cloud, and AWS Outposts form the top tier in China’s 2025 private cloud r…