python sse的是什么?

news/2025/10/16 21:15:45/文章来源:https://www.cnblogs.com/liqi77gg/p/19146638

1.sse是什么?

 

是一个通信协议。最主要的是, 服务器必须发送一个 Content-Type 为 text/event-stream 的响应头,这告诉客户端:“接下来我发送的不是普通的 HTML 或 JSON,而是一个事件流。”

=====================

是的,完全正确!您可以非常准确地将 SSE (Server-Sent Events) 理解为一个通信标准通信协议规范

它不是一个独立的协议(像 TCP 或 HTTP 那样),而是构建在 HTTP 协议之上的一套标准。

这个标准由 WHATWG(网页超文本应用技术工作组,负责制定 HTML、DOM 等现代 Web 标准的组织)在 HTML Living Standard 中进行了详细定义。

这个标准具体规定了以下几个核心方面:

  1. 服务器的角色和响应格式 (Server's Role & Response Format):

    • MIME 类型: 服务器必须发送一个 Content-Typetext/event-stream 的响应头,这告诉客户端:“接下来我发送的不是普通的 HTML 或 JSON,而是一个事件流。”
    • 连接保持: 服务器需要保持这个 HTTP 连接持续打开。
    • 消息结构: 发送的每一条消息都必须遵循特定的文本格式。最基本的就是 data: <message>\n\n。此外,标准还定义了 id (消息ID)、event (事件类型)、retry (重连时间) 等可选字段。
  2. 客户端的行为和 API (Client's Behavior & API):

    • EventSource 接口: 标准定义了浏览器必须提供一个 EventSource JavaScript API。开发者通过 new EventSource('/my-stream-url') 来创建连接。
    • 事件监听: 客户端通过 .onmessage.onerror 以及使用 .addEventListener('custom_event', ...) 来监听不同类型的消息。
    • 自动重连: 这是 SSE 标准一个非常重要的特性。如果网络连接意外断开,遵循标准的客户端(例如浏览器)会自动尝试重新连接到服务器。服务器可以通过 retry: 字段来建议客户端重连的间隔时间。

把它和类似技术做个对比,您会更清晰:

  • 对比 WebSocket: WebSocket 是一个完全独立的、双向的协议(协议头是 ws://wss://)。而 SSE 只是对 HTTP 协议的一种特殊用法,是单向的(服务器 -> 客户端)。因此,SSE 更轻量、实现更简单。
  • 对比长轮询 (Long Polling): 长轮询是一种“模拟”推送的技术,客户端发起请求,服务器“hold”住不回,直到有数据才返回,然后客户端立即再次发起新请求。这会产生大量的请求开销。而 SSE 只需要建立一次连接,效率高得多。

总结来说:

称 SSE 为一个通信标准非常准确。它为“服务器如何通过 HTTP 将数据流式地、单向地推送给客户端”这一特定场景,提供了一套完整、可靠且被各大浏览器广泛支持的规则和实现机制。

2.如何写一个sse的示例代码?

重点:

在于这个request库种的stream=True 这个字段打开。

 

=================

好的,我们来详细探讨一下如何使用 Python 处理服务器发送事件 (Server-Sent Events, SSE)。

SSE 是一种允许服务器向客户端单向推送数据的 Web 技术。它基于 HTTP 协议,非常适合实现服务端的流式数据传输,例如实时更新、消息通知、数据仪表盘等。

与 WebSocket 相比,SSE 更简单,因为它只是标准的 HTTP,不需要新的协议。但它是单向的(服务器 -> 客户端),而 WebSocket 是双向的。

处理 SSE 分为两部分:服务端实现客户端接收

1. 服务端实现 (Server)

在服务端,你需要创建一个 HTTP 端点,该端点的响应头 Content-Type 必须是 text/event-stream。然后,服务器保持该连接打开,并以特定格式持续发送数据。

最核心的部分是使用生成器 (Generator) 来流式地 yield 数据。

这里以最常见的 Web 框架 Flask 为例,展示如何创建一个 SSE 端点。

import time
from flask import Flask, Responseapp = Flask(__name__)@app.route('/stream')
def stream():def event_stream():count = 0while True:# 等待一秒time.sleep(1)# 发送数据,必须以 "data: " 开头,并以 "\n\n" 结尾yield f"data: 这是一条消息,ID: {count}\n\n"count += 1# 返回一个流式响应return Response(event_stream(), mimetype='text/event-stream')if __name__ == '__main__':app.run(threaded=True, port=5000)

代码解释:

  1. event_stream() 是一个生成器函数,它在一个无限循环中不断生成数据。
  2. yield f"data: ...\n\n" 是 SSE 的核心。
    • data: 是固定的前缀。
    • 后面跟着你的消息内容。
    • \n\n (两个换行符) 表示一条消息的结束。
  3. Response(event_stream(), mimetype='text/event-stream') 将生成器包装成一个 Flask 响应,并设置正确的 MIME 类型,告诉浏览器这是一个事件流。

2. 客户端接收 (Client)

在客户端,你需要连接到服务器的 /stream 端点,并循环读取响应流。

方法一:使用 requests

requests 库可以通过设置 stream=True 来处理流式响应。

import requests# 连接到 SSE 端点
with requests.get('http://127.0.0.1:5000/stream', stream=True) as r:# 检查响应头if 'text/event-stream' not in r.headers.get('Content-Type', ''):print("错误:服务器未使用 'text/event-stream'")exit()# 逐行迭代响应内容for line in r.iter_lines():# 过滤掉空行或注释行if line and line.startswith(b'data:'):# 解码并打印数据部分data = line.split(b':', 1)[1].strip()print(f"收到数据: {data.decode('utf-8')}")

代码解释:

  1. requests.get(..., stream=True) 是关键,它告诉 requests 不要立即下载所有内容,而是保持连接并允许你迭代响应。
  2. r.iter_lines() 会逐行读取响应流。
  3. 我们检查每一行是否以 data: 开头,然后提取并打印真实的数据。

方法二:使用 sseclient-py

这是一个专门为处理 SSE 设计的库,它能更好地处理重连、idevent 等 SSE 协议的细节,使用起来更方便。

首先,你需要安装它:pip install sseclient-py

import sseclient# 创建一个 SSE 客户端
client = sseclient.SSEClient('http://127.0.0.1:5000/stream')# 迭代事件
for event in client.events():print(f"收到数据: {event.data}")

这个库极大地简化了客户端代码。它会自动处理消息边界、解析 data 字段,并将其作为 event 对象提供给你。


为了让你更方便地测试,我将把一个完整的、可运行的服务端和客户端示例放在下面的代码文件中。你可以直接运行它来观察效果。

import time
import threading
import requests
from flask import Flask, Response# ==============================================================================
# 服务端 (Server)
# ==============================================================================
def create_sse_server():"""创建一个 Flask 应用作为 SSE 服务器"""app = Flask(__name__)@app.route('/stream')def stream():def event_stream():"""一个生成器函数,用于流式传输事件"""count = 0print("服务器:客户端已连接,开始发送事件...")try:while True:# 每隔一秒发送一次数据time.sleep(1)# SSE 消息格式: "data: <your_message>\n\n"message = f"这是第 {count} 条消息"yield f"data: {message}\n\n"print(f"服务器:已发送 -> '{message}'")count += 1except GeneratorExit:# 当客户端断开连接时,生成器会收到一个 GeneratorExit 异常print("服务器:客户端已断开连接。")# 返回一个流式响应,MIME 类型为 'text/event-stream'return Response(event_stream(), mimetype='text/event-stream')print("服务器:Flask SSE 服务器正在启动...")# 在单独的线程中运行 Flask 服务器,以避免阻塞主线程# 在生产环境中,应使用 Gunicorn 或 uWSGI 等 WSGI 服务器server_thread = threading.Thread(target=lambda: app.run(threaded=True, port=5000, host='0.0.0.0'))server_thread.daemon = Trueserver_thread.start()print("服务器:服务器已在 http://0.0.0.0:5000 上运行")# ==============================================================================
# 客户端 (Client)
# ==============================================================================
def run_sse_client():"""运行一个 SSE 客户端来接收事件"""sse_url = 'http://127.0.0.1:5000/stream'print(f"\n客户端:正在连接到 SSE 服务器 at {sse_url}...")try:# 使用 requests 库并设置 stream=True 来接收流式响应with requests.get(sse_url, stream=True, timeout=10) as response:# 确保服务器返回了正确的 Content-Typeif 'text/event-stream' not in response.headers.get('Content-Type', ''):print(f"客户端错误:无效的 Content-Type: {response.headers.get('Content-Type')}")returnprint("客户端:连接成功,开始监听事件...")# 使用 iter_lines 迭代来自服务器的每一行数据for line in response.iter_lines():# SSE 消息以 "data:" 开头if line and line.startswith(b'data:'):# 从 "data:" 前缀后提取消息内容# line 是 bytes 类型,需要解码为 utf-8 字符串data = line.split(b':', 1)[1].strip()print(f"客户端:收到数据 -> {data.decode('utf-8')}")except requests.exceptions.RequestException as e:print(f"\n客户端:连接错误: {e}")print("客户端:请确保服务器正在运行。")except KeyboardInterrupt:print("\n客户端:用户中断,正在关闭。")# ==============================================================================
# 主程序入口
# ==============================================================================
if __name__ == '__main__':# 1. 启动 SSE 服务器create_sse_server()# 等待服务器启动print("主程序:等待2秒让服务器完全启动...")time.sleep(2)# 2. 运行 SSE 客户端run_sse_client()print("\n主程序:客户端已停止。服务器仍在后台线程运行。")# 主线程将在这里结束,但后台的服务器线程会因为 daemon=True 而随之退出。

这个示例将启动一个 Flask 服务器,然后客户端会连接到它并实时打印出服务器发送的流式消息。你可以直接运行此文件来体验整个流程。

3.如果不设置stream=True会怎么样?

程序会卡住,默认模式,是等待内容体的结果来展示,因为是流式,等不到内容体的全部,就会一直在等待。

=============

是的,您的理解非常精准。

如果您不设置 stream=Truerequests 库会采用其默认的“缓冲”行为,这确实会导致连接通路在您能处理数据之前就已经被处理完毕,对于 SSE 这样的流式协议来说是行不通的。

我们来深入剖析一下这两种模式的区别:

默认行为 (不设置 stream=True) - 缓冲模式

当您执行一个普通的 requests.get('...') 时,会发生以下情况:

  1. 发送请求: requests 向服务器发送 HTTP 请求。
  2. 等待并下载所有内容: 库会一直等待,直到服务器响应了所有的数据。它会将整个响应体(response body)下载下来,并存储在内存中。
  3. 加载到内存: 所有数据被完整加载到 response 对象的 contenttext 属性里。
  4. 返回并关闭: 只有当所有内容都下载完毕后get() 函数才会返回 response 对象给您的代码。此时,底层的 TCP 连接通常会被释放回连接池或直接关闭,因为 requests 认为这次通信已经“完成”了。

这对于 SSE 为什么是致命的?

SSE 的服务器永远不会“完成”响应。它会一直保持连接打开,以便随时推送新的数据。

所以,如果客户端使用默认的 requests 模式去连接 SSE 端点,客户端会永远等待下去,因为它总是在等待那个永远不会到来的“响应结束”信号。您的 requests.get() 调用会一直被阻塞,程序就卡在那里了。

设置 stream=True - 流式模式

当您执行 requests.get('...', stream=True) 时,行为完全不同:

  1. 发送请求: requests 向服务器发送 HTTP 请求。
  2. 只下载响应头: 库会立即返回,只要它接收到了服务器的响应头(headers)。它不会去下载整个响应体。
  3. 连接保持打开: 此时,底层的 TCP 连接是保持活动的。response 对象内部保留了这个连接的“通路”。
  4. 按需读取: 您现在可以通过 response.iter_lines()response.iter_content() 这样的方法,主动地、一块一块地从这个打开的连接中读取数据。每当新数据从服务器传来,您就能通过迭代读到它。

这为什么适用于 SSE?

这个模式完美契合了 SSE 的工作方式。客户端不会傻等响应结束,而是建立连接后马上开始监听,服务器可以随时通过这个“通路”推送数据,客户端也能实时地一行一行接收和处理。

连接何时关闭?

stream=True 模式下,连接会在以下几种情况关闭:

  1. 您读取完所有数据: 当您通过迭代(例如 for 循环)把服务器发送的所有内容都读取完毕(对于 SSE 来说,这意味着服务器主动关闭了连接)。
  2. with 语句结束: 当包裹 requests.getwith 代码块执行完毕时,requests 会自动为您关闭连接。这是处理流式请求的最佳实践。
  3. 服务器关闭连接: 服务器可以单方面决定关闭连接。
  4. 网络错误或超时: 发生网络问题。

总结对比

特性 默认模式 (stream=False) 流式模式 (stream=True)
行为 下载所有内容到内存,然后才返回。 只下载响应头,立即返回。
response.content 立即可用,包含全部数据。 不可用,访问会抛出异常。必须迭代读取。
连接状态 请求完成后立即关闭/释放。 保持打开,直到您读取完毕或手动关闭。
适用场景 获取大小可预期的、完整的资源(JSON、HTML页面、小文件)。 处理大型文件、实时数据流 (SSE)、无法预知大小的响应。
对 SSE 的影响 程序会卡住,永远等待响应结束。 正确的工作方式,可以实时接收服务器推送的事件。

 

参考资料:

https://zhuanlan.zhihu.com/p/634581294

AI:gemini 2.5 pro

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/938516.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

万字长文详述单据引擎原理、流程、单据管理 - 智慧园区

本文将为你深入剖析单据引擎的原理、架构、设计以及管理方法,帮助你构建一个高效、灵活且可靠的单据处理系统。 前言 单据引擎是做什么的?为什么要了解单据引擎? B端产品,一不小心就会变成项目,一个个定制交付,…

windows 链接共享打印机出现错误0x00000709?打印机0x0000011b错误?0x0000bcd、0x00000709、0x00000011b

安装 点击这里获取:所有修复工具都放这里了 ​​ 图片 第一款:全能打印机问题修复工具 ​​功能都在图片上面了,如果还需要安装其它支持的程序,会自动提示安装第二款:打印机共享维护工具等4个文件 WIN10 2H22和WI…

解码Linux文件IO目录检索与文件属性

目录检索的核心需求 当需要批量访问某个路径下的多个文件时,手动调用open函数逐个处理效率极低。Linux 系统将目录视为特殊文件,提供了一套专门的目录操作接口,可高效实现目录的创建、删除、打开、读取,以及文件属…

p66实验题

""" CIFAR-10 图像分类简化版(使用 sklearn) 在安装 TensorFlow 之前可以先运行这个版本 """ import numpy as np import matplotlib.pyplot as plt from sklearn.datasets import fe…

20251016

10.16今天体育课体测,让我发现自己BMI指数居然是25.7,这令我非常不能接受。因此,我要开始为期两个半月的减脂,控制自己,打造健康强壮身体,于是我今天下午直接到操场跑步3公里,我感觉我现在状况非常好。未来我会…

C# - 串口助手

串口通信工具准备 (1)sscom5.13.1.exe: 串口调试工具 (2)VSPD: 是一种虚拟串口驱动程序,用于模拟和创建多个虚拟串口,以便在计算机间进行串口通信 VSPD 串口介绍Sent: 0 Bytes表示从该串口发送出去的数据字节数为…

虚拟线程的pinned问题终于被jdk25完美解决了

虚拟线程是一个非常有用的特征,但是JDK25以前,一直存在pinned问题,一些场景下会导致平台线程被占用无法释放。 比如下面的代码,在JDK 21下运行时,会卡住:import java.time.Duration; import java.util.concurren…

077_尚硅谷_单分支基本使用

077_尚硅谷_单分支基本使用1.单分支基本介绍2.例题判断是否18岁

【比赛记录】2025NOIP 冲刺模拟赛合集I

2025CSP-S模拟赛64A B C D Sum Rank50 0 0 - 50 7/7挂 155pts,挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂挂…

12 继承--instanceof和类型转换

12 继承--instanceof和类型转换多态 父类的引用指向子类的类型 Father f = new Son(); 而能执行的方法只看左边的类型. 父类可以指向子类,但不能调用子类独有的方法 如果非要调用,可以进行强制类型转换.注意事项:多态是…

C0214 拔树游戏 题解

C0214 拔树游戏 题解C0214 拔树游戏 题解 这道题挺有趣的。 不难发现每一次的拔树操作就是对一个根节点的所有子节点(只是下一层)取最小点权的那个节点取代本身。同时发现,因为每次取的节点都是最小的,所以在堆…

CSDN Markdown 编辑器快捷键大全 - 实践

CSDN Markdown 编辑器快捷键大全 - 实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Mo…

P4168 [Violet] 蒲公英题解

P4168 [Violet] 蒲公英题解洛谷题目链接:[Violet] 蒲公英 一道分块好题,调了整整一上午一句话题意:在线求区间众数 考虑到众数没有可加性,所以一般数据结构是不好维护的,这个时候就要用分块了,分块可以维护一些数…

Java了解

Java了解了解Java Java的特性与优势 简单性、面向对象、可移植性、高性能、分布式 动态性(反射机制)、多线程、安全性、健壮性 Java的三大版本与有关工具 Java SE Java ME Java EEJDK:Java Development Kit(包含JRE)…

VGG使用块的网络

VGG使用块的网络 一.手写VGG架构import torch from torch import nn import d2l #1.VGG块 def VGG_block(nums_conv,in_chanels,out_chanels):#卷积层数,输入通道数,输出通道数layers=[]#一个VGG里面的层for i in ra…

使用SpringBoot + Thymeleaf + MyBatisPlus实现一个简单的书籍管理系统

一 系统功能设计 采用SpringBoot + Thymeleaf + MyBatisPlus技术栈实现一个简单的书籍管理系统,包含以下功能:书籍列表展示 书籍添加 书籍编辑 书籍删除 书籍查询(按条件筛选)二 数据库设计 SET NAMES utf8mb4; SE…

创业思路

创业思路ai p图, 人工后期校验, 人工p, 然后把图片给手机壳制作厂, 卖给提供照片的人. diy手机壳. 比如他本人的照片加风景, 比如巴黎铁塔

详细介绍:Java-Spring入门指南(十九)thymeleaf基本概念

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …