SSE通用详细配置指南

news/2026/1/21 10:40:36/文章来源:https://www.cnblogs.com/feiyusys/p/19510347

SSE (Server-Sent Events) 通用配置指南

目录

  1. SSE 简介
  2. SSE vs WebSocket vs 轮询
  3. 服务端配置
  4. 客户端使用
  5. 最佳实践
  6. 常见问题
  7. 性能优化
  8. 安全考虑
  9. 生产环境部署

SSE 简介

什么是 SSE?

Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,允许服务器向客户端单向发送实时数据。

SSE 特点

特性 说明
单向通信 服务器 → 客户端(推送消息)
基于 HTTP 使用标准 HTTP/HTTPS 协议
自动重连 浏览器自动处理断线重连
文本格式 仅支持文本数据(UTF-8 编码)
EventSource API 浏览器原生支持,无需额外库

适用场景

适合

  • 实时通知(新订单、新用户注册等)
  • 股票价格、汇率实时更新
  • 社交媒体动态推送
  • 系统状态监控
  • 日志实时输出
  • 进度条更新

不适合

  • 双向实时通信(聊天、游戏等)→ 使用 WebSocket
  • 二进制数据传输 → 使用 WebSocket
  • 需要高频率双向通信 → 使用 WebSocket

SSE vs WebSocket vs 轮询

特性 SSE WebSocket HTTP 轮询
通信方向 单向(服务器→客户端) 双向 客户端→服务器
连接方式 长连接 长连接 短连接
协议 HTTP/HTTPS WS/WSS HTTP/HTTPS
自动重连 ✅ 原生支持 ❌ 需手动实现 -
数据格式 文本 文本/二进制 JSON/XML
浏览器支持 现代浏览器全支持 IE10+ 全支持
服务器资源
实现复杂度 简单 中等 简单
防火墙穿透 ✅ 容易 ⚠️ 可能受阻 ✅ 容易
跨域 ✅ 支持 CORS ✅ 支持 CORS ✅ 支持 CORS

选择建议

场景 → 技术选择需要实时推送,只需服务器→客户端↓使用 SSE ✅需要双向实时通信↓使用 WebSocket ✅不需要实时,偶尔更新即可↓使用 HTTP 轮询 ✅需要兼容旧浏览器(IE9 及以下)↓使用 HTTP 轮询 ✅

服务端配置

1. ASP.NET Core (.NET 6+)

SSE 服务接口

// ISseService.cs
using Microsoft.AspNetCore.Http;public interface ISseService
{string AddConnection(HttpResponse response);Task BroadcastAsync(string eventType, string data);Task SendToClientAsync(string connectionId, string eventType, string data);int GetConnectedClientCount();Task RemoveConnectionAsync(string connectionId);
}

SSE 服务实现

// SseService.cs
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;public class SseService : ISseService
{private readonly ConcurrentDictionary<string, HttpResponse> _connections = new();private readonly ILogger<SseService> _logger;public SseService(ILogger<SseService> logger){_logger = logger;}public string AddConnection(HttpResponse response){var connectionId = Guid.NewGuid().ToString();_connections.TryAdd(connectionId, response);_logger.LogInformation($"SSE 连接添加: {connectionId}");return connectionId;}public async Task BroadcastAsync(string eventType, string data){foreach (var connection in _connections){await SendMessageAsync(connection.Value, eventType, data);}_logger.LogInformation($"SSE 广播发送至 {_connections.Count} 个客户端: {eventType}");}public async Task SendToClientAsync(string connectionId, string eventType, string data){if (_connections.TryGetValue(connectionId, out var response)){await SendMessageAsync(response, eventType, data);}}private async Task SendMessageAsync(HttpResponse response, string eventType, string data){try{await response.WriteAsync($"event: {eventType}\n");await response.WriteAsync($"data: {data}\n\n");await response.Body.FlushAsync();}catch (Exception ex){_logger.LogError($"SSE 消息发送错误: {ex.Message}");}}public int GetConnectedClientCount(){return _connections.Count;}public async Task RemoveConnectionAsync(string connectionId){if (_connections.TryRemove(connectionId, out var response)){try{await response.WriteAsync("event: close\ndata: Connection closed\n\n");await response.Body.FlushAsync();}catch { }}}
}

注册服务

// Program.cs
builder.Services.AddSingleton<ISseService, SseService>();

SSE 控制器

// SseController.cs
using Microsoft.AspNetCore.Cors;
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;[ApiController]
[Route("api/[controller]")]
[EnableCors("any")]
public class SseController : ControllerBase
{private readonly ISseService _sseService;public SseController(ISseService sseService){_sseService = sseService;}[HttpGet][Route("connect")]public async Task Connect(){// 设置 SSE 响应头Response.Headers.Add("Content-Type", "text/event-stream");Response.Headers.Add("Cache-Control", "no-cache");Response.Headers.Add("Connection", "keep-alive");Response.Headers.Add("X-Accel-Buffering", "no"); // Nginx 禁用缓冲var connectionId = _sseService.AddConnection(Response);try{// 发送连接确认消息await _sseService.SendToClientAsync(connectionId, "connected",Newtonsoft.Json.JsonConvert.SerializeObject(new{connectionId,message = "SSE connection established",timestamp = DateTime.Now}));// 保持连接活跃while (!HttpContext.RequestAborted.IsCancellationRequested){await Task.Delay(30000); // 每30秒发送心跳await _sseService.SendToClientAsync(connectionId, "heartbeat", DateTime.Now.ToString());}}catch{// 客户端断开}finally{await _sseService.RemoveConnectionAsync(connectionId);}}[HttpGet][Route("count")]public IActionResult GetConnectionCount(){return Ok(new { count = _sseService.GetConnectedClientCount() });}[HttpPost][Route("broadcast")]public async Task<IActionResult> Broadcast([FromBody] BroadcastRequest request){await _sseService.BroadcastAsync(request.EventType, request.Message);return Ok(new { success = true });}
}public class BroadcastRequest
{public string EventType { get; set; }public string Message { get; set; }
}

2. Node.js + Express

基本实现

// server.js
const express = require('express');
const cors = require('cors');
const app = express();app.use(cors());
app.use(express.json());// 存储所有客户端连接
const clients = new Set();// SSE 连接端点
app.get('/sse', (req, res) => {// 设置 SSE 响应头res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('X-Accel-Buffering', 'no'); // Nginx 禁用缓冲const clientId = Date.now();clients.add(res);console.log(`客户端连接: ${clientId}`);// 发送连接确认res.write(`event: connected\ndata: ${JSON.stringify({ clientId, message: 'Connected' })}\n\n`);// 发送心跳const heartbeat = setInterval(() => {res.write(`event: heartbeat\ndata: ${new Date().toISOString()}\n\n`);}, 30000);// 客户端断开连接req.on('close', () => {console.log(`客户端断开: ${clientId}`);clearInterval(heartbeat);clients.delete(res);});
});// 广播消息
app.post('/broadcast', (req, res) => {const { eventType, message } = req.body;clients.forEach(client => {client.write(`event: ${eventType}\ndata: ${message}\n\n`);});console.log(`广播消息发送至 ${clients.size} 个客户端`);res.json({ success: true, count: clients.size });
});// 获取连接数
app.get('/count', (req, res) => {res.json({ count: clients.size });
});app.listen(3000, () => {console.log('SSE 服务器运行在端口 3000');
});

3. Java + Spring Boot

SSE 控制器

// SseController.java
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@RestController
@RequestMapping("/api/sse")
@CrossOrigin(origins = "*")
public class SseController {// 存储所有 SSE 连接private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();private final ExecutorService executor = Executors.newCachedThreadPool();/*** 建立 SSE 连接*/@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect() {SseEmitter emitter = new SseEmitter(60 * 60 * 1000L); // 1小时超时emitter.onCompletion(() -> {emitters.remove(emitter);System.out.println("SSE 连接完成");});emitter.onTimeout(() -> {emitters.remove(emitter);System.out.println("SSE 连接超时");});emitter.onError((e) -> {emitters.remove(emitter);System.err.println("SSE 连接错误: " + e.getMessage());});emitters.add(emitter);// 发送连接确认executor.execute(() -> {try {emitter.send(SseEmitter.event().name("connected").data("{\"message\":\"SSE connection established\"}"));} catch (IOException e) {emitter.completeWithError(e);}});// 发送心跳executor.execute(() -> {while (emitters.contains(emitter)) {try {Thread.sleep(30000);emitter.send(SseEmitter.event().name("heartbeat").data(new Date().toString()));} catch (Exception e) {break;}}});return emitter;}/*** 广播消息*/@PostMapping("/broadcast")public ResponseEntity<?> broadcast(@RequestBody BroadcastRequest request) {int count = 0;for (SseEmitter emitter : emitters) {executor.execute(() -> {try {emitter.send(SseEmitter.event().name(request.getEventType()).data(request.getMessage()));} catch (IOException e) {emitter.completeWithError(e);}});count++;}return ResponseEntity.ok(new Response(true, "消息已发送至 " + count + " 个客户端"));}/*** 获取连接数*/@GetMapping("/count")public ResponseEntity<?> count() {return ResponseEntity.ok(new CountResponse(emitters.size()));}
}// 请求/响应类
class BroadcastRequest {private String eventType;private String message;// getters and setters
}class Response {private boolean success;private String message;// constructor, getters
}class CountResponse {private int count;// constructor, getters
}

4. Python + Flask

# app.py
from flask import Flask, Response, jsonify, request
import json
import time
import threading
from datetime import datetimeapp = Flask(__name__)# 存储所有客户端连接
clients = []@app.route('/sse')
def sse():def event_stream():# 生成唯一客户端IDclient_id = id(event_stream)clients.append(event_stream)try:# 发送连接确认yield f"event: connected\ndata: {json.dumps({'clientId': client_id})}\n\n"# 保持连接活跃,发送心跳while True:time.sleep(30)  # 每30秒发送心跳yield f"event: heartbeat\ndata: {datetime.now().isoformat()}\n\n"except GeneratorExit:clients.remove(event_stream)print(f"客户端断开: {client_id}")return Response(event_stream(), mimetype='text/event-stream',headers={'Cache-Control': 'no-cache','Connection': 'keep-alive','X-Accel-Buffering': 'no'})@app.route('/broadcast', methods=['POST'])
def broadcast():data = request.jsonevent_type = data.get('eventType', 'message')message = data.get('message', '')# 发送给所有客户端message_str = f"event: {event_type}\ndata: {message}\n\n"for client in clients[:]:  # 使用切片避免迭代时修改try:client.send(message_str)except:clients.remove(client)return jsonify({'success': True, 'count': len(clients)})@app.route('/count')
def count():return jsonify({'count': len(clients)})if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, threaded=True)

5. Go + Gin

// main.go
package mainimport ("encoding/json""fmt""log""net/http""sync""time""github.com/gin-gonic/gin"
)// 客户端连接
type Client struct {Channel chan stringID      string
}// SSE 服务
type SSEService struct {clients map[*Client]boolmu      sync.RWMutex
}var sseService = &SSEService{clients: make(map[*Client]bool),
}func (s *SSEService) AddClient(client *Client) {s.mu.Lock()defer s.mu.Unlock()s.clients[client] = truelog.Printf("客户端连接: %s, 当前连接数: %d", client.ID, len(s.clients))
}func (s *SSEService) RemoveClient(client *Client) {s.mu.Lock()defer s.mu.Unlock()if _, ok := s.clients[client]; ok {delete(s.clients, client)close(client.Channel)log.Printf("客户端断开: %s, 当前连接数: %d", client.ID, len(s.clients))}
}func (s *SSEService) Broadcast(eventType, data string) {s.mu.RLock()defer s.mu.RUnlock()message := fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, data)for client := range s.clients {select {case client.Channel <- message:default:// 客户端接收缓冲区满,移除go s.RemoveClient(client)}}
}func (s *SSEService) GetClientCount() int {s.mu.RLock()defer s.mu.RUnlock()return len(s.clients)
}// SSE 处理器
func sseHandler(c *gin.Context) {client := &Client{ID:      generateID(),Channel: make(chan string, 100),}sseService.AddClient(client)defer sseService.RemoveClient(client)// 设置响应头c.Header("Content-Type", "text/event-stream")c.Header("Cache-Control", "no-cache")c.Header("Connection", "keep-alive")c.Header("X-Accel-Buffering", "no")// 发送连接确认c.SSEvent("connected", "").JSON(gin.H{"clientId": client.ID,"message":  "Connected",})c.Writer.Flush()// 发送心跳ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for {select {case message := <-client.Channel:// 发送消息fmt.Fprint(c.Writer, message)c.Writer.Flush()case <-ticker.C:// 发送心跳c.SSEvent("heartbeat", "").Send(time.Now().Format(time.RFC3339))c.Writer.Flush()case <-c.Request.Context().Done():return}}
}// 广播处理器
func broadcastHandler(c *gin.Context) {var req struct {EventType string `json:"eventType"`Message  string `json:"message"`}if err := c.ShouldBindJSON(&req); err != nil {c.JSON(400, gin.H{"error": err.Error()})return}sseService.Broadcast(req.EventType, req.Message)c.JSON(200, gin.H{"success": true,"count":   sseService.GetClientCount(),})
}// 获取连接数
func countHandler(c *gin.Context) {c.JSON(200, gin.H{"count": sseService.GetClientCount()})
}func generateID() string {return fmt.Sprintf("%d", time.Now().UnixNano())
}func main() {r := gin.Default()// 允许跨域r.Use(func(c *gin.Context) {c.Writer.Header().Set("Access-Control-Allow-Origin", "*")c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")if c.Request.Method == "OPTIONS" {c.AbortWithStatus(204)return}c.Next()})r.GET("/api/sse/connect", sseHandler)r.POST("/api/sse/broadcast", broadcastHandler)r.GET("/api/sse/count", countHandler)log.Println("SSE 服务器运行在端口 8080")r.Run(":8080")
}

客户端使用

1. 原生 JavaScript (EventSource API)

基本连接

// 建立 SSE 连接
const eventSource = new EventSource('http://your-domain.com/api/sse/connect');// 监听连接打开
eventSource.onopen = function(event) {console.log('SSE 连接已打开');
};// 监听所有消息
eventSource.onmessage = function(event) {console.log('收到消息:', event.data);
};// 监听特定事件类型
eventSource.addEventListener('connected', function(event) {const data = JSON.parse(event.data);console.log('连接确认:', data);
});eventSource.addEventListener('heartbeat', function(event) {console.log('心跳:', event.data);
});eventSource.addEventListener('notification', function(event) {const data = JSON.parse(event.data);showNotification(data);
});// 错误处理
eventSource.onerror = function(error) {console.error('SSE 连接错误:', error);// EventSource 会自动尝试重连
};// 关闭连接
function closeConnection() {eventSource.close();console.log('SSE 连接已关闭');
}

React 示例

// SseComponent.jsx
import React, { useState, useEffect } from 'react';const SseComponent = () => {const [messages, setMessages] = useState([]);const [isConnected, setIsConnected] = useState(false);const [connectionCount, setConnectionCount] = useState(0);useEffect(() => {let eventSource;function connect() {eventSource = new EventSource('http://your-domain.com/api/sse/connect');eventSource.onopen = () => {setIsConnected(true);console.log('SSE 连接成功');};eventSource.onerror = () => {setIsConnected(false);console.log('SSE 连接断开,尝试重连...');};// 监听注册事件eventSource.addEventListener('register', (event) => {const data = JSON.parse(event.data);setMessages(prev => [data, ...prev]);showNotification(`新用户 ${data.username} 已注册`);});// 监听心跳eventSource.addEventListener('heartbeat', (event) => {console.log('心跳:', event.data);});}connect();// 清理return () => {if (eventSource) {eventSource.close();}};}, []);const showNotification = (message) => {// 显示浏览器通知if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}};return (<div><h3>SSE 实时消息</h3><div className={`status ${isConnected ? 'connected' : 'disconnected'}`}>{isConnected ? '✅ 已连接' : '❌ 未连接'}</div><div className="messages">{messages.map((msg, index) => (<div key={index} className="message"><strong>{msg.message}</strong><time>{new Date(msg.timestamp).toLocaleString()}</time></div>))}</div></div>);
};export default SseComponent;

Vue 3 示例

<!-- SseComponent.vue -->
<template><div><h3>SSE 实时消息</h3><div :class="['status', isConnected ? 'connected' : 'disconnected']">{{ isConnected ? '✅ 已连接' : '❌ 未连接' }}</div><div class="messages"><div v-for="(msg, index) in messages" :key="index" class="message"><strong>{{ msg.message }}</strong><time>{{ formatTime(msg.timestamp) }}</time></div></div></div>
</template><script setup>
import { ref, onMounted, onUnmounted } from 'vue';const messages = ref([]);
const isConnected = ref(false);
let eventSource = null;onMounted(() => {connect();// 请求通知权限if ('Notification' in window && Notification.permission === 'default') {Notification.requestPermission();}
});onUnmounted(() => {if (eventSource) {eventSource.close();}
});function connect() {eventSource = new EventSource('http://your-domain.com/api/sse/connect');eventSource.onopen = () => {isConnected.value = true;console.log('SSE 连接成功');};eventSource.onerror = () => {isConnected.value = false;console.log('SSE 连接断开');};eventSource.addEventListener('register', (event) => {const data = JSON.parse(event.data);messages.value.unshift(data);showNotification(data.message);});eventSource.addEventListener('heartbeat', (event) => {console.log('心跳:', event.data);});
}function showNotification(message) {if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}
}function formatTime(timestamp) {return new Date(timestamp).toLocaleString();
}
</script><style scoped>
.status {padding: 10px;margin: 10px 0;border-radius: 4px;
}
.status.connected {background-color: #d4edda;color: #155724;
}
.status.disconnected {background-color: #f8d7da;color: #721c24;
}
.message {padding: 10px;border-left: 3px solid #007bff;margin: 10px 0;background-color: #f8f9fa;
}
</style>

Angular 示例

// sse.service.ts
import { Injectable, NgZone } from '@angular/core';
import { EventSourcePolyfill } from 'event-source-polyfill';@Injectable({providedIn: 'root'
})
export class SseService {private eventSource: EventSource | null = null;constructor(private ngZone: NgZone) {}connect(url: string, eventHandlers: { [eventType: string]: (data: any) => void }) {// 使用 polyfill 支持更多浏览器this.eventSource = new EventSourcePolyfill(url, {withCredentials: true});this.eventSource.onopen = () => {console.log('SSE 连接成功');};this.eventSource.onerror = (error) => {console.error('SSE 连接错误:', error);};// 注册事件处理器Object.keys(eventHandlers).forEach(eventType => {this.eventSource!.addEventListener(eventType, (event: any) => {this.ngZone.run(() => {eventHandlers[eventType](JSON.parse(event.data));});});});}disconnect() {if (this.eventSource) {this.eventSource.close();this.eventSource = null;}}
}
// app.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { SseService } from './sse.service';@Component({selector: 'app-root',template: `<div><h3>SSE 实时消息</h3><div [class]="'status ' + (isConnected ? 'connected' : 'disconnected')">{{ isConnected ? '✅ 已连接' : '❌ 未连接' }}</div><div class="messages"><div *ngFor="let msg of messages" class="message"><strong>{{ msg.message }}</strong><time>{{ formatTime(msg.timestamp) }}</time></div></div></div>`,styles: [`.status.connected { background: #d4edda; color: #155724; padding: 10px; }.status.disconnected { background: #f8d7da; color: #721c24; padding: 10px; }.message { padding: 10px; border-left: 3px solid #007bff; margin: 10px 0; }`]
})
export class AppComponent implements OnInit, OnDestroy {messages: any[] = [];isConnected = false;constructor(private sseService: SseService) {}ngOnInit() {this.sseService.connect('http://your-domain.com/api/sse/connect', {connected: (data) => {this.isConnected = true;console.log('连接确认:', data);},register: (data) => {this.messages.unshift(data);this.showNotification(data.message);},heartbeat: (data) => {console.log('心跳:', data);},error: () => {this.isConnected = false;}});}ngOnDestroy() {this.sseService.disconnect();}private showNotification(message: string) {if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}}private formatTime(timestamp: string): string {return new Date(timestamp).toLocaleString();}
}

2. 浏览器兼容性

Polyfill (支持旧浏览器)

<!-- 安装 event-source-polyfill -->
npm install event-source-polyfill<!-- 或使用 CDN -->
<script src="https://cdn.jsdelivr.net/npm/event-source-polyfill@1.0.25/src/eventsource.min.js"></script><script>
// 使用 polyfill
const EventSource = window.EventSourcePolyfill || window.EventSource;const eventSource = new EventSource('/api/sse/connect');
// 正常使用...
</script>

3. 移动端支持

// 移动端 SSE 连接
function connectMobile() {const eventSource = new EventSource('http://your-domain.com/api/sse/connect', {// 移动端优化withCredentials: true});eventSource.onopen = () => {console.log('移动端 SSE 连接成功');};// 监听消息eventSource.addEventListener('notification', (event) => {const data = JSON.parse(event.data);// 移动端推送通知if ('vibrate' in navigator) {navigator.vibrate(200); // 震动}// 显示本地通知showLocalNotification(data.title, data.body);});return eventSource;
}// 显示本地通知(iOS/Android)
function showLocalNotification(title, body) {// 检查支持if (!('Notification' in window)) {console.log('不支持通知');return;}if (Notification.permission === 'granted') {new Notification(title, { body, icon: '/icon.png' });} else if (Notification.permission !== 'denied') {Notification.requestPermission().then(permission => {if (permission === 'granted') {new Notification(title, { body, icon: '/icon.png' });}});}
}

最佳实践

1. 连接管理

自动重连

const MAX_RETRIES = 5;
let retryCount = 0;function connect() {const eventSource = new EventSource('/api/sse/connect');eventSource.onerror = () => {retryCount++;if (retryCount <= MAX_RETRIES) {console.log(`重连中... (${retryCount}/${MAX_RETRIES})`);setTimeout(() => {connect();}, 1000 * retryCount); // 指数退避} else {console.error('重连次数超过限制');alert('连接失败,请刷新页面');}};eventSource.onopen = () => {retryCount = 0; // 重置重连计数console.log('连接成功');};
}connect();

心跳机制

服务端:

// 每 30 秒发送心跳
while (!HttpContext.RequestAborted.IsCancellationRequested)
{await Task.Delay(30000);await _sseService.SendToClientAsync(connectionId, "heartbeat", DateTime.Now.ToString());
}

客户端:

let lastHeartbeat = Date.now();eventSource.addEventListener('heartbeat', () => {lastHeartbeat = Date.now();console.log('心跳正常');
});// 检查连接状态
setInterval(() => {const now = Date.now();if (now - lastHeartbeat > 60000) { // 超过60秒未收到心跳console.warn('连接可能已断开');eventSource.close();connect(); // 重新连接}
}, 10000);

2. 消息格式设计

推荐格式

// 统一的消息格式
{"type": "notification",     // 消息类型"id": "msg-123",         // 消息ID(用于去重)"timestamp": 1705699200000, // 时间戳"data": {                 // 业务数据"title": "新订单","content": "订单 #12345 已创建","url": "/orders/12345"}
}

服务端发送

var message = Newtonsoft.Json.JsonConvert.SerializeObject(new
{type = "notification",id = Guid.NewGuid().ToString(),timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),data = new{title = "新订单",content = $"订单 #{orderId} 已创建",url = $"/orders/{orderId}"}
});await _sseService.BroadcastAsync("notification", message);

客户端处理

const processedIds = new Set();eventSource.addEventListener('notification', (event) => {const msg = JSON.parse(event.data);// 去重处理if (processedIds.has(msg.id)) {return;}processedIds.add(msg.id);// 清理旧的ID(避免内存泄漏)if (processedIds.size > 1000) {const firstId = processedIds.values().next().value;processedIds.delete(firstId);}// 处理消息handleNotification(msg);
});

3. 性能优化

连接池管理

// 服务端 - 限制连接数
const MAX_CONNECTIONS = 10000;function addConnection(connection) {if (connections.size >= MAX_CONNECTIONS) {throw new Error('连接数已达上限');}connections.add(connection);
}// 客户端 - 连接复用
class SseManager {constructor(url) {this.url = url;this.eventSource = null;this.listeners = new Map();}connect() {if (this.eventSource) {return; // 已连接}this.eventSource = new EventSource(this.url);// 事件分发this.eventSource.onmessage = (event) => {const data = JSON.parse(event.data);const listeners = this.listeners.get(data.type) || [];listeners.forEach(callback => callback(data));};}on(eventType, callback) {if (!this.listeners.has(eventType)) {this.listeners.set(eventType, []);}this.listeners.get(eventType).push(callback);}off(eventType, callback) {const listeners = this.listeners.get(eventType) || [];const index = listeners.indexOf(callback);if (index > -1) {listeners.splice(index, 1);}}close() {if (this.eventSource) {this.eventSource.close();this.eventSource = null;}}
}// 使用
const sseManager = new SseManager('/api/sse/connect');
sseManager.connect();
sseManager.on('notification', (data) => {console.log('通知:', data);
});

批量发送

// 批量发送优化
public async Task BroadcastBatchAsync(string eventType, List<string> messages)
{var batchData = Newtonsoft.Json.JsonConvert.SerializeObject(messages);foreach (var connection in _connections){await SendMessageAsync(connection.Value, eventType, batchData);}
}
// 客户端批量处理
eventSource.addEventListener('notification-batch', (event) => {const messages = JSON.parse(event.data); // 消息数组messages.forEach(msg => {processNotification(msg);});
});

4. 错误处理

服务端错误处理

try
{await _sseService.SendToClientAsync(connectionId, eventType, data);
}
catch (IOException ex)
{// 客户端断开连接await _sseService.RemoveConnectionAsync(connectionId);
}
catch (Exception ex)
{_logger.LogError(ex, "SSE 发送消息失败");
}

客户端错误处理

eventSource.onerror = (error) => {// 判断错误类型if (error.target.readyState === EventSource.CLOSED) {console.error('连接已关闭');} else if (error.target.readyState === EventSource.CONNECTING) {console.warn('正在重连...');} else {console.error('未知错误:', error);}
};

常见问题

Q1: SSE 连接频繁断开

原因

  • 代理服务器超时设置过短
  • 防火墙或负载均衡器配置问题
  • 客户端网络不稳定

解决方案

  1. 调整代理超时(Nginx):
location /api/sse/ {proxy_pass http://backend;proxy_set_header Connection '';proxy_http_version 1.1;proxy_read_timeout 86400s;  # 24小时proxy_send_timeout 86400s;chunked_transfer_encoding on;
}
  1. 调整心跳间隔
// 根据代理超时设置调整心跳间隔
await Task.Delay(25000); // 小于代理超时时间

Q2: 消息延迟或丢失

原因

  • 网络拥塞
  • 客户端处理速度慢
  • 缓冲区溢出

解决方案

  1. 调整缓冲区大小
// 客户端设置最大缓冲区
var eventSource = new EventSource(url);
eventSource.MAX_BUFFER_SIZE = 1024 * 1024; // 1MB
  1. 实现消息确认
// 服务端 - 消息队列
public class MessageQueue
{private readonly ConcurrentQueue<Message> _queue = new();public async Task EnqueueAsync(Message message){_queue.Enqueue(message);await ProcessQueueAsync();}private async Task ProcessQueueAsync(){while (_queue.TryDequeue(out var message)){try{await _sseService.BroadcastAsync(message.Type, message.Data);await Task.Delay(100); // 批次间隔}catch{_queue.Enqueue(message); // 重新入队break;}}}
}

Q3: 跨域问题

错误: Access-Control-Allow-Origin 错误

解决方案

服务端(ASP.NET Core):

builder.Services.AddCors(options =>
{options.AddPolicy("AllowSSE", builder =>{builder.WithOrigins("http://yourdomain.com").AllowAnyMethod().AllowAnyHeader().AllowCredentials();});
});app.UseCors("AllowSSE");

客户端:

// EventSource 不支持自定义 headers
// 确保服务端允许跨域
const eventSource = new EventSource('http://api.example.com/sse');

Q4: 浏览器兼容性问题

问题: IE 或旧版浏览器不支持 EventSource

解决方案

// 使用 polyfill
import { EventSourcePolyfill } from 'event-source-polyfill';const EventSource = window.EventSourcePolyfill || window.EventSource;const eventSource = new EventSource('/api/sse/connect', {withCredentials: true
});

或使用轮询降级:

class SseOrPolling {constructor(url) {this.url = url;this.usePolling = typeof EventSource === 'undefined';}connect() {if (this.usePolling) {this.startPolling();} else {this.startSSE();}}startSSE() {this.eventSource = new EventSource(this.url);this.eventSource.onmessage = this.onMessage;}startPolling() {this.interval = setInterval(() => {fetch(this.url).then(res => res.json()).then(data => this.onMessage({ data: JSON.stringify(data) }));}, 3000); // 每3秒轮询}
}

Q5: 内存泄漏

原因

  • 未正确关闭连接
  • 事件监听器未清理
  • 消息队列无限增长

解决方案

class SseConnection {constructor(url) {this.url = url;this.eventSource = null;this.handlers = [];this.messageBuffer = [];}connect() {this.eventSource = new EventSource(this.url);this.eventSource.onmessage = (event) => {// 限制缓冲区大小if (this.messageBuffer.length > 100) {this.messageBuffer.shift();}this.messageBuffer.push(event.data);// 触发处理器this.handlers.forEach(handler => handler(event.data));};}on(handler) {this.handlers.push(handler);}destroy() {// 清理资源if (this.eventSource) {this.eventSource.close();this.eventSource = null;}this.handlers = [];this.messageBuffer = [];}
}

性能优化

1. 服务端优化

连接池限制

public class SseConnectionLimiter
{private readonly SemaphoreSlim _semaphore;private readonly int _maxConnections;public SseConnectionLimiter(int maxConnections){_maxConnections = maxConnections;_semaphore = new SemaphoreSlim(maxConnections, maxConnections);}public async Task<bool> TryAcquireAsync(){return await _semaphore.WaitAsync(0);}public void Release(){_semaphore.Release();}public int AvailableConnections => _semaphore.CurrentCount;
}

消息压缩

using System.IO.Compression;public async Task SendCompressedAsync(HttpResponse response, string data)
{response.Headers.Add("Content-Encoding", "gzip");using var gzip = new GZipStream(response.Body, CompressionMode.Compress);using var writer = new StreamWriter(gzip);await writer.WriteAsync(data);
}

异步处理

// 使用 Channel 进行异步消息传递
public class SseBroadcaster
{private readonly Channel<string> _channel;public SseBroadcaster(){_channel = Channel.CreateUnbounded<string>();Task.Run(ProcessMessagesAsync);}public async Task BroadcastAsync(string message){await _channel.Writer.WriteAsync(message);}private async Task ProcessMessagesAsync(){await foreach (var message in _channel.Reader.ReadAllAsync()){// 异步广播_ = Task.Run(() => SendToAllClientsAsync(message));}}
}

2. 客户端优化

节流处理

class ThrottledSseHandler {constructor(throttleMs = 100) {this.throttleMs = throttleMs;this.lastMessageTime = 0;this.pendingMessage = null;this.timeout = null;}handle(message) {const now = Date.now();this.pendingMessage = message;if (now - this.lastMessageTime >= this.throttleMs) {this.processMessage(message);this.lastMessageTime = now;} else {clearTimeout(this.timeout);this.timeout = setTimeout(() => {this.processMessage(this.pendingMessage);this.lastMessageTime = Date.now();}, this.throttleMs);}}processMessage(message) {// 实际处理逻辑console.log('处理消息:', message);}
}// 使用
const handler = new ThrottledSseHandler(200);
eventSource.addEventListener('data', (e) => handler.handle(e.data));

批量更新 UI

class BatchedUiUpdater {constructor(batchSize = 10, delayMs = 100) {this.batch = [];this.batchSize = batchSize;this.delayMs = delayMs;this.timeout = null;}add(item) {this.batch.push(item);if (this.batch.length >= this.batchSize) {this.flush();} else {clearTimeout(this.timeout);this.timeout = setTimeout(() => this.flush(), this.delayMs);}}flush() {if (this.batch.length === 0) return;// 批量更新 DOMconst fragment = document.createDocumentFragment();this.batch.forEach(item => {const element = this.createElement(item);fragment.appendChild(element);});document.getElementById('messages').appendChild(fragment);this.batch = [];}
}

安全考虑

1. 认证与授权

Token 认证

// 方式1: URL 参数
const token = localStorage.getItem('token');
const eventSource = new EventSource(`/api/sse/connect?token=${token}`);// 方式2: Cookie(推荐)
// 服务端在登录时设置 HttpOnly Cookie
// EventSource 自动携带 Cookie
const eventSource = new EventSource('/api/sse/connect');

服务端验证:

[HttpGet]
[Route("connect")]
public async Task<IActionResult> Connect()
{// 从 URL 参数获取 Tokenvar token = HttpContext.Request.Query["token"];// 从 Cookie 获取 Tokenvar cookieToken = HttpContext.Request.Cookies["auth_token"];// 验证 Tokenif (!_authService.ValidateToken(token ?? cookieToken)){return Unauthorized();}// 建立连接// ...
}

2. 限流

public class SseRateLimiter
{private readonly Dictionary<string, (int Count, DateTime ResetTime)> _clients;private readonly int _maxRequestsPerMinute;public bool IsAllowed(string clientId){var now = DateTime.UtcNow;if (!_clients.ContainsKey(clientId)){_clients[clientId] = (1, now.AddMinutes(1));return true;}var (count, resetTime) = _clients[clientId];if (now >= resetTime){_clients[clientId] = (1, now.AddMinutes(1));return true;}if (count >= _maxRequestsPerMinute){return false;}_clients[clientId] = (count + 1, resetTime);return true;}
}

3. 数据加密

public string EncryptMessage(string data, string key)
{using var aes = Aes.Create();aes.Key = Encoding.UTF8.GetBytes(key);aes.IV = new byte[16];using var encryptor = aes.CreateEncryptor();using var ms = new MemoryStream();using (var cs = new CryptoStream(ms, encryptor, CryptoStreamMode.Write)){using var sw = new StreamWriter(cs);sw.Write(data);}return Convert.ToBase64String(ms.ToArray());
}

生产环境部署

1. Nginx 配置

upstream backend {server 127.0.0.1:5000;
}server {listen 80;server_name your-domain.com;# SSE 路径配置location /api/sse/ {proxy_pass http://backend;# SSE 必需配置proxy_set_header Connection '';proxy_http_version 1.1;chunked_transfer_encoding on;# 超时设置proxy_read_timeout 86400s;    # 24小时proxy_send_timeout 86400s;# 禁用缓冲proxy_buffering off;proxy_cache off;# 连接保持proxy_connect_timeout 60s;# 转发事件流头proxy_pass_header Content-Type;proxy_pass_header Cache-Control;}# 其他 API 请求location /api/ {proxy_pass http://backend;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;}
}

2. Docker 部署

# Dockerfile
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["SmartConv/SmartConv.csproj", "SmartConv/"]
RUN dotnet restore "SmartConv/SmartConv.csproj"
COPY . .
WORKDIR "/src/SmartConv"
RUN dotnet build "SmartConv.csproj" -c Release -o /app/buildFROM build AS publish
RUN dotnet publish "SmartConv.csproj" -c Release -o /app/publishFROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "SmartConv.dll"]
# docker-compose.yml
version: '3.8'services:app:build: .ports:- "8080:80"environment:- ASPNETCORE_ENVIRONMENT=Production- ASPNETCORE_URLS=http://+:80restart: unless-stoppedhealthcheck:test: ["CMD", "curl", "-f", "http://localhost/health"]interval: 30stimeout: 10sretries: 3

3. Kubernetes 部署

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: sse-app
spec:replicas: 3selector:matchLabels:app: sse-apptemplate:metadata:labels:app: sse-appspec:containers:- name: sse-appimage: your-registry/sse-app:latestports:- containerPort: 80env:- name: ASPNETCORE_ENVIRONMENTvalue: "Production"resources:requests:memory: "256Mi"cpu: "250m"limits:memory: "512Mi"cpu: "500m"livenessProbe:httpGet:path: /healthport: 80initialDelaySeconds: 30periodSeconds: 10readinessProbe:httpGet:path: /readyport: 80initialDelaySeconds: 5periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:name: sse-app-service
spec:selector:app: sse-appports:- protocol: TCPport: 80targetPort: 80type: LoadBalancer

4. 监控与日志

// 添加监控指标
public class SseMetrics
{public int TotalConnections { get; set; }public int ActiveConnections { get; set; }public long MessagesSent { get; set; }public long BytesTransferred { get; set; }
}// Prometheus 指标
public class SseMetricsExporter
{private readonly Counter _connectionsTotal;private readonly Gauge _connectionsActive;private readonly Counter _messagesTotal;public SseMetricsExporter(){_connectionsTotal = Metrics.CreateCounter("sse_connections_total", "Total SSE connections");_connectionsActive = Metrics.CreateGauge("sse_connections_active", "Active SSE connections");_messagesTotal = Metrics.CreateCounter("sse_messages_total", "Total SSE messages sent");}public void RecordConnection(){_connectionsTotal.Inc();_connectionsActive.Inc();}public void RecordDisconnection(){_connectionsActive.Dec();}public void RecordMessage(){_messagesTotal.Inc();}
}

5. 负载均衡注意事项

重要: SSE 是有状态连接,需要会话保持(Sticky Session)。

upstream backend {ip_hash;  # 使用 IP 哈希实现会话保持server 127.0.0.1:5000;server 127.0.0.1:5001;server 127.0.0.1:5002;
}

或者使用 Redis 存储连接状态,实现跨节点广播。


总结

SSE 是实现服务器推送的简单高效方案,特别适合:

✅ 实时通知
✅ 数据更新推送
✅ 系统监控
✅ 日志流式输出

相比 WebSocket,SSE 更简单、更容易部署、且天然支持自动重连。

核心要点

  1. 配置正确的响应头: Content-Type: text/event-stream
  2. 实现心跳机制: 保持连接活跃
  3. 处理连接异常: 自动重连
  4. 优化性能: 连接池、批量处理
  5. 安全保障: 认证、限流、加密
  6. 生产部署: 代理配置、会话保持

快速开始

// 客户端
const eventSource = new EventSource('/api/sse/connect');
eventSource.addEventListener('notification', e => {console.log(JSON.parse(e.data));
});
// 服务端
Response.Headers.Add("Content-Type", "text/event-stream");
Response.Headers.Add("Cache-Control", "no-cache");await Response.WriteAsync($"event: notification\ndata: {message}\n\n");
await Response.Body.FlushAsync();

祝开发顺利!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1194018.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【法律咨询】哪家好:廊坊地区专业深度测评

随着社会法治意识的增强,个人与企业面临的法律问题日益复杂多样。在廊坊地区,如何从众多法律服务机构中,找到专业、可靠、匹配自身需求的那一家,成为许多人的核心关切。本次测评旨在通过客观、量化的方式,对廊坊地…

TurboDiffusion成本控制:高算力需求下的经济型部署策略

TurboDiffusion成本控制&#xff1a;高算力需求下的经济型部署策略 1. TurboDiffusion是什么&#xff1f; TurboDiffusion是由清华大学、生数科技与加州大学伯克利分校联合研发的视频生成加速框架&#xff0c;专为解决文生视频&#xff08;T2V&#xff09;和图生视频&#xf…

如何用AI自动生成ContextMenuManager的右键菜单代码

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个基于JavaScript的ContextMenuManager实现方案&#xff0c;要求&#xff1a;1.支持多级嵌套的右键菜单结构 2.支持动态添加/删除菜单项 3.支持自定义菜单样式 4.包含点击事…

AI如何帮你一键生成WPS离线安装包解决方案

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个WPS Office离线安装包生成工具&#xff0c;要求&#xff1a;1.自动检测用户操作系统版本和位数(32/64位) 2.根据检测结果智能选择匹配的WPS版本 3.自动下载所有必需组件和…

ms-swift性能优化秘籍:让大模型训练速度提升3倍的小技巧

ms-swift性能优化秘籍&#xff1a;让大模型训练速度提升3倍的小技巧 你是否也遇到过这样的情况&#xff1a;明明配置了高端GPU&#xff0c;但大模型训练却像“蜗牛爬行”&#xff1f;一个epoch跑几个小时&#xff0c;显存还动不动就爆掉。更让人头疼的是&#xff0c;调参试错成…

18-经过actions方法封装请求以及补充计算属性

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

1小时打造KB2533623漏洞检测原型

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 快速开发一个KB2533623检测工具原型&#xff0c;要求&#xff1a;1. 系统信息采集 2. 补丁状态检查 3. 风险等级评估 4. 简易修复建议 5. 结果导出功能。使用Python开发控制台应用…

2026年工程咨询公司排名,中恒通达项目管理公司值得推荐吗?

本榜单依托全维度工程行业调研与真实合作口碑,深度筛选出五家标杆工程咨询企业,为工程从业者及创业团队提供客观选型依据,助力精准匹配适配的资质合作与项目管理伙伴。 TOP1 推荐:中恒通达项目管理有限公司 推荐指…

亲测有效:用verl在Qwen模型上跑PPO全流程分享

亲测有效&#xff1a;用verl在Qwen模型上跑PPO全流程分享 最近在尝试使用强化学习&#xff08;RL&#xff09;对大语言模型进行后训练优化&#xff0c;目标是提升其在特定任务上的推理能力。经过一番调研和测试&#xff0c;我选择了字节跳动火山引擎团队开源的 verl 框架&…

2026年1月市面上乳化剂推荐榜:CO436/A501/COPS -1/SR10/LCN118等不同乳化剂厂家哪家好深入剖析!

2026年1月【乳化剂】优质之选:A501与CO43深入剖析 在化工领域,【乳化剂】作用关键,像 2A1阴离子乳化剂、CO436乳液聚合乳化剂、A501造纸用丁苯胶乳乳化剂、COPS - 1反应型乳化剂、SR10耐水乳化剂、LCN118环保非离子…

echart 格式化水平坐标 tooltip数据

在做图表的时候,总是会遇到提示的内容和水平的内容格式不统一。然后每次都要做两次处理,鼠标滑过的提示x坐标内容 和 水平x显示的刻度要不一样,或者把外部的数据传入到内部进行切割。 更好的做法,提示的显示全部,…

Hutool + AI:如何用智能工具提升Java开发效率

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个Java项目&#xff0c;使用Hutool工具库结合AI能力实现以下功能&#xff1a;1. 自动生成常用工具类代码模板 2. 智能识别并处理常见异常场景 3. 根据数据库表结构自动生成C…

AlexNet vs 传统CV算法:效率对比实验

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 构建一个对比测试平台&#xff0c;比较AlexNet与传统CV方法&#xff08;如SIFTSVM&#xff09;在图像分类任务中的表现。包括&#xff1a;1) 相同测试数据集&#xff1b;2) 准确率…

2026年无缝钢管供应商综合评估与推荐榜单研究报告

在工业制造与基础设施建设领域,无缝钢管作为关键的基础材料,其供应链的稳定性、产品的可靠性直接关系到下游项目的成本、进度与安全。当前,采购决策者普遍面临一个核心挑战:在众多供应商中,如何精准识别那些不仅能…

AI如何解决‘连接被阻止‘的常见开发问题

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个能够自动诊断连接被阻止错误的AI工具。该工具应能&#xff1a;1. 分析浏览器控制台错误日志&#xff1b;2. 识别CORS策略、混合内容安全策略等常见原因&#xff1b;3. 根据…

2026年细胞器取样系统/细胞器提取系统/细胞器细胞取样系统/细胞器提取分析系统品牌大盘点:从入门到精通

随着精准医学、单细胞组学、合成生物学和再生医学等前沿领域的迅猛发展,对细胞内部结构——尤其是细胞器(如线粒体、溶酶体、内质网、高尔基体、细胞核等)的精细操作与分析需求日益增长。传统基于群体细胞的批量处理…

QOJ #1823. Permutation CFG

现在网上找不到题解,QOJ上的论文看不了了,来贡献一篇。 题目链接 记 \(U(x, s)\) 表示从 \(x\) 一个单独的数开始,进行 \(s\) 次操作后得到的序列。 举个例子,若 \(p = \{1, 4, 3, 2\}\),那么 \(U(4, 0) = \{4\},…

AI配音降本增效:CosyVoice2-0.5B批量生成实战指南

AI配音降本增效&#xff1a;CosyVoice2-0.5B批量生成实战指南 1. 引言&#xff1a;为什么你需要关注AI语音合成&#xff1f; 你有没有遇到过这样的问题&#xff1a;做短视频需要配音&#xff0c;但请人录一次成本高、周期长&#xff1b;写好的文章想转成有声内容&#xff0c;…

Hunyuan-MT-7B连接超时?反向代理配置修复网页访问问题

Hunyuan-MT-7B连接超时&#xff1f;反向代理配置修复网页访问问题 1. 问题背景&#xff1a;Hunyuan-MT-7B-WEBUI 访问异常 你是不是也遇到过这种情况&#xff1a;刚部署完腾讯混元开源的 Hunyuan-MT-7B 翻译模型&#xff0c;满怀期待地点击“网页推理”按钮&#xff0c;结果浏…