飞书服务端SDK已全面支持Java、Go、Python与Node.js等主流开发语言,然而.NET生态系统的开发者们却面临着官方SDK缺失的困境,这无疑为.NET社区接入飞书平台带来了不便。
一、.net中如何实现飞书WebSocket长连接
为什么选择飞书WebSocket?
相较于传统的Webhook模式,长连接模式大大降低了接入成本,将原先1周左右的开发周期降低到5分钟。
核心优势:
- 开发便捷:无需公网IP或域名,本地环境即可接收回调
- 安全传输:内置加密和鉴权,无需额外安全处理
- 实时性强:消息延迟从分钟级降至毫秒级
- 资源高效:避免频繁HTTP请求,连接复用多种事件类型
企业级应用场景
飞书平台提供丰富的事件类型,支持:
- 用户事件:员工入职/离职自动化处理
- 消息事件:智能客服、消息机器人
- 审批事件:OA系统集成、自动审批
- 部门事件:组织架构同步管理
- 其它事件:.....
Mud.Feishu架构设计
二、抽象层设计(Mud.Feishu.Abstractions)
事件处理策略模式
Mud.Feishu采用策略模式实现灵活的事件处理机制,核心接口设计简洁而强大:
/// <summary>
/// 飞书事件处理器接口
/// </summary>
public interface IFeishuEventHandler
{/// <summary>/// 支持的事件类型/// </summary>string SupportedEventType { get; }/// <summary>/// 处理事件/// </summary>Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default);
}
策略模式核心优势:
- 🎯 单一职责:每个处理器专注特定事件类型
- 🔧 开闭原则:对扩展开放,对修改封闭
- 🧪 可测试性:独立测试,依赖清晰
- ⚡ 运行时多态:动态选择处理策略
实际应用示例:
// 用户创建事件处理器
public class UserCreatedEventHandler : IFeishuEventHandler
{public string SupportedEventType => "contact.user.created_v3";public async Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default){var user = JsonSerializer.Deserialize<UserData>(eventData.EventJson);await _userService.CreateUserAsync(user, cancellationToken);_logger.LogInformation("用户创建事件处理完成: {UserId}", user.UserId);}
}// 消息接收事件处理器
public class MessageReceiveEventHandler : IFeishuEventHandler
{public string SupportedEventType => "im.message.receive_v1";public async Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default){var message = JsonSerializer.Deserialize<MessageData>(eventData.EventJson);await _messageService.ProcessMessageAsync(message, cancellationToken);}
}
事件类型与数据模型
EventData统一事件模型
/// <summary>
/// 飞书事件数据模型 - 统一的事件载体
/// </summary>
public class EventData
{public string EventId { get; set; } = string.Empty;public string EventType { get; set; } = string.Empty;public DateTime EventTime { get; set; } = DateTime.UtcNow;public object? Event { get; set; }public string EventJson { get; set; } = string.Empty;
}
主要事件类型
| 类别 | 事件类型 | 说明 |
|---|---|---|
| 用户管理 | contact.user.created_v3 |
用户创建 |
contact.user.updated_v3 |
用户更新 | |
contact.user.deleted_v3 |
用户删除 | |
| 消息事件 | im.message.receive_v1 |
接收消息 |
im.message.message_read_v1 |
消息已读 | |
| 部门管理 | contact.department.created_v3 |
部门创建 |
contact.department.updated_v3 |
部门更新 | |
| 审批流程 | approval.approval.approved_v1 |
审批通过 |
approval.approval.rejected_v1 |
审批拒绝 | |
| ... | ... |
... |
强类型数据模型示例
// 用户创建事件数据
public class UserCreatedEvent
{[JsonPropertyName("user_id")]public string UserId { get; set; }[JsonPropertyName("name")]public string Name { get; set; }[JsonPropertyName("email")]public string Email { get; set; }
}// 消息接收事件数据
public class MessageReceiveEvent
{[JsonPropertyName("message_id")]public string MessageId { get; set; }[JsonPropertyName("chat_id")]public string ChatId { get; set; }[JsonPropertyName("content")]public string Content { get; set; }
}
工厂模式应用
事件处理器工厂
/// <summary>
/// 事件处理器工厂接口
/// </summary>
public interface IFeishuEventHandlerFactory
{IFeishuEventHandler? GetHandler(string eventType);void RegisterHandler(IFeishuEventHandler handler);IReadOnlyList<string> GetRegisteredEventTypes();
}
使用示例:
// 注册事件处理器
var factory = serviceProvider.GetService<IFeishuEventHandlerFactory>();
factory.RegisterHandler(new UserCreatedEventHandler());
factory.RegisterHandler(new MessageReceiveEventHandler());// 获取处理器处理事件
var handler = factory.GetHandler("contact.user.created_v3");
if (handler != null)
{await handler.HandleAsync(eventData);
}
三、核心实现层(Mud.Feishu.WebSocket)
组件化架构设计
Mud.Feishu.WebSocket采用严格的组件化设计,包含四个核心组件:
3.1 连接管理器
核心职责
- 连接生命周期管理:建立、维护、恢复WebSocket连接
- 线程安全:使用SemaphoreSlim确保并发安全
- 超时控制:多级取消令牌支持精确超时控制
- 自动重连:智能重连策略,支持指数退避算法
连接状态管理
关键实现特点
- 事件驱动通知机制(Connected/Disconnected/Error事件)
- 线程安全的连接操作
- 灵活的重连策略配置
3.2 认证管理器
认证流程设计
核心功能
- 令牌验证:应用访问令牌有效性检查
- 消息构建:标准化认证消息格式
- 状态管理:认证状态实时跟踪
- 自动重试:认证失败智能重试机制
认证消息格式
{"timestamp": 1703920800,"data": {"app_access_token": "your_app_access_token_here"}
}
认证机制架构设计
认证管理器采用命令模式和回调机制,将认证逻辑与网络通信解耦,确保认证过程的可测试性和可扩展性。
/// <summary>
/// 认证管理器 - 处理WebSocket认证相关逻辑
/// </summary>
public class AuthenticationManager
{private readonly ILogger<AuthenticationManager> _logger;private readonly Func<string, Task> _sendMessageCallback;private bool _isAuthenticated = false;private readonly FeishuWebSocketOptions _options;public event EventHandler<EventArgs>? Authenticated;public event EventHandler<WebSocketErrorEventArgs>? AuthenticationFailed;public bool IsAuthenticated => _isAuthenticated;public AuthenticationManager(ILogger<AuthenticationManager> logger,FeishuWebSocketOptions options,Func<string, Task> sendMessageCallback){_logger = logger ?? throw new ArgumentNullException(nameof(logger));_sendMessageCallback = sendMessageCallback ?? throw new ArgumentNullException(nameof(sendMessageCallback));_options = options;}/// <summary>/// 发送认证消息/// </summary>public async Task AuthenticateAsync(string appAccessToken, CancellationToken cancellationToken = default){if (string.IsNullOrEmpty(appAccessToken))throw new ArgumentException("应用访问令牌不能为空", nameof(appAccessToken));try{_logger.LogInformation("正在进行WebSocket认证...");_isAuthenticated = false; // 重置认证状态// 创建认证消息var authMessage = new AuthMessage{Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),Data = new AuthData{AppAccessToken = appAccessToken}};var jsonOptions = new JsonSerializerOptions{DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,WriteIndented = false};var authJson = JsonSerializer.Serialize(authMessage, jsonOptions);await _sendMessageCallback(authJson);_logger.LogInformation("已发送认证消息,等待响应...");}catch (Exception ex){_isAuthenticated = false;_logger.LogError(ex, "WebSocket认证失败");var errorArgs = new WebSocketErrorEventArgs{Exception = ex,ErrorMessage = $"WebSocket认证失败: {ex.Message}",ErrorType = ex.GetType().Name,IsAuthError = true};AuthenticationFailed?.Invoke(this, errorArgs);throw;}}/// <summary>/// 处理认证响应/// </summary>public void HandleAuthResponse(string responseMessage){try{var authResponse = JsonSerializer.Deserialize<AuthResponseMessage>(responseMessage);if (authResponse?.Code == 0){_isAuthenticated = true;_logger.LogInformation("WebSocket认证成功: {Message}", authResponse.Message);Authenticated?.Invoke(this, EventArgs.Empty);}else{_isAuthenticated = false;_logger.LogError("WebSocket认证失败: {Code} - {Message}", authResponse?.Code, authResponse?.Message);var errorArgs = new WebSocketErrorEventArgs{ErrorMessage = $"WebSocket认证失败: {authResponse?.Code} - {authResponse?.Message}",IsAuthError = true};AuthenticationFailed?.Invoke(this, errorArgs);}}catch (JsonException ex){_isAuthenticated = false;_logger.LogError(ex, "解析认证响应失败: {Message}", responseMessage);var errorArgs = new WebSocketErrorEventArgs{Exception = ex,ErrorMessage = $"解析认证响应失败: {ex.Message}",ErrorType = ex.GetType().Name,IsAuthError = true};AuthenticationFailed?.Invoke(this, errorArgs);}catch (Exception ex){_isAuthenticated = false;_logger.LogError(ex, "处理认证响应时发生错误");var errorArgs = new WebSocketErrorEventArgs{Exception = ex,ErrorMessage = $"处理认证响应时发生错误: {ex.Message}",ErrorType = ex.GetType().Name,IsAuthError = true};AuthenticationFailed?.Invoke(this, errorArgs);}}/// <summary>/// 重置认证状态/// </summary>public void ResetAuthentication(){_isAuthenticated = false;_logger.LogDebug("已重置认证状态");}
}
认证流程详细机制
认证管理器的实现遵循安全性和可靠性原则,确保认证过程的安全和稳定:
-
参数验证机制:在认证开始前进行严格的参数验证,包括:
- AppAccessToken的非空检查
- 令牌格式验证
- 权限范围确认
-
消息构建策略:采用标准化的认证消息格式,包含:
- 时间戳(防重放攻击)
- 应用访问令牌
- 协议版本信息
-
状态管理:通过内部状态标志确保认证状态的一致性:
_isAuthenticated标志控制认证状态- 状态变化时触发相应事件
- 支持状态查询和重置
-
异常处理机制:完善的错误处理和恢复策略:
- 网络异常处理
- 认证失败处理
- 超时和取消支持
认证交互时序图
核心认证消息格式
飞书WebSocket认证使用标准化的JSON消息格式:
{"timestamp": 1703920800,"data": {"app_access_token": "your_app_access_token_here"}
}
响应消息格式
{"code": 0,"msg": "success","data": {"session_id": "websocket_session_id","expires_in": 3600}
}
认证安全考虑
-
令牌安全:
- AppAccessToken不应在代码中硬编码
- 支持令牌自动刷新机制
- 令牌存储和传输加密
-
防重放攻击:
- 使用时间戳验证消息新鲜度
- 支持消息签名机制
- 会话唯一性标识
-
错误处理:
- 敏感信息不记录到日志
- 认证失败不暴露具体错误
- 支持优雅降级处理
3.3 消息路由器(MessageRouter)
消息路由器是飞书WebSocket框架的消息分发核心,负责识别消息类型、版本信息,并将消息精确路由到对应的处理器。它采用策略模式和责任链模式的组合,确保消息处理的高效性和可扩展性。
消息路由架构
消息类型识别
消息路由器首先需要对接收到的消息进行类型和版本识别,以确定合适的处理策略。飞书WebSocket支持多种消息格式,包括v1.0和v2.0协议版本。
/// <summary>
/// 消息处理器接口
/// </summary>
public interface IMessageHandler
{/// <summary>/// 是否可以处理指定类型的消息/// </summary>/// <param name="messageType">消息类型</param>/// <returns>是否可以处理</returns>bool CanHandle(string messageType);/// <summary>/// 处理消息/// </summary>/// <param name="message">消息内容</param>/// <param name="cancellationToken">取消令牌</param>/// <returns>处理任务</returns>Task HandleAsync(string message, CancellationToken cancellationToken = default);
}/// <summary>
/// 消息路由器 - 负责将消息分发给合适的处理器
/// </summary>
public class MessageRouter
{private readonly ILogger<MessageRouter> _logger;private readonly List<IMessageHandler> _handlers;private readonly FeishuWebSocketOptions _options;public MessageRouter(ILogger<MessageRouter> logger, FeishuWebSocketOptions options){_logger = logger ?? throw new ArgumentNullException(nameof(logger));_handlers = new List<IMessageHandler>();_options = options;}/// <summary>/// 注册消息处理器/// </summary>public void RegisterHandler(IMessageHandler handler){if (handler == null)throw new ArgumentNullException(nameof(handler));_handlers.Add(handler);_logger.LogDebug("已注册消息处理器: {HandlerType}", handler.GetType().Name);}/// <summary>/// 移除消息处理器/// </summary>public bool UnregisterHandler(IMessageHandler handler){var removed = _handlers.Remove(handler);if (removed){if (_options.EnableLogging){_logger.LogDebug("已移除消息处理器: {HandlerType}", handler.GetType().Name);}}return removed;}/// <summary>/// 路由消息到合适的处理器/// </summary>public async Task RouteMessageAsync(string message, CancellationToken cancellationToken = default){if (string.IsNullOrWhiteSpace(message)){if (_options.EnableLogging){_logger.LogWarning("收到空消息,跳过路由");}return;}await RouteMessageInternalAsync(message, "Text", cancellationToken);}/// <summary>/// 路由从二进制消息转换而来的JSON消息到合适的处理器/// </summary>public async Task RouteBinaryMessageAsync(string jsonContent, string messageType, CancellationToken cancellationToken = default){if (string.IsNullOrWhiteSpace(jsonContent)){if (_options.EnableLogging){_logger.LogWarning("收到空的二进制转换消息,跳过路由");}return;}await RouteMessageInternalAsync(jsonContent, $"Binary_{messageType}", cancellationToken);}/// <summary>/// 提取消息类型/// </summary>private string ExtractMessageType(string message){try{using var jsonDoc = System.Text.Json.JsonDocument.Parse(message);var root = jsonDoc.RootElement;// 检查是否为v2.0版本if (root.TryGetProperty("schema", out var schemaElement) &&schemaElement.GetString() == "2.0"){if (root.TryGetProperty("header", out var headerElement) &&headerElement.TryGetProperty("event_type", out var eventTypeElement)){return "event"; // v2.0主要是事件消息}}// v1.0版本处理if (root.TryGetProperty("type", out var typeElement)){return typeElement.GetString()?.ToLowerInvariant() ?? string.Empty;}return string.Empty;}catch (System.Text.Json.JsonException ex){_logger.LogError(ex, "解析消息JSON失败: {Message}", message);return string.Empty;}}/// <summary>/// 内部消息路由处理/// </summary>private async Task RouteMessageInternalAsync(string message, string sourceType, CancellationToken cancellationToken){try{// 提取消息类型var messageType = ExtractMessageType(message);if (string.IsNullOrEmpty(messageType)){_logger.LogWarning("无法提取消息类型 (来源: {SourceType}): {Message}", sourceType, message);return;}// 查找能处理该消息类型的处理器var handler = _handlers.FirstOrDefault(h => h.CanHandle(messageType));if (handler == null){_logger.LogWarning("未找到能处理消息类型 {MessageType} 的处理器 (来源: {SourceType})", messageType, sourceType);return;}_logger.LogDebug("将消息路由到处理器: {HandlerType} (来源: {SourceType}, 消息类型: {MessageType})",handler.GetType().Name, sourceType, messageType);await handler.HandleAsync(message, cancellationToken);}catch (Exception ex){_logger.LogError(ex, "路由消息时发生错误 (来源: {SourceType}): {Message}", sourceType, message);}}
}
消息路由流程详解
消息路由器的核心职责是根据消息特征进行智能分发,其工作流程包括以下几个关键步骤:
- 消息预验证:检查消息的基本格式和有效性
- 版本识别:通过schema字段或type字段确定消息版本
- 类型匹配:根据消息内容找到对应的处理器
- 异步分发:将消息异步发送给匹配的处理器
- 结果聚合:收集处理结果并进行后续处理
路由策略设计
消息路由器采用多维度匹配策略,确保消息能够精确路由:
-
基于版本的匹配:
- v1.0协议:通过
type字段识别消息类型 - v2.0协议:通过
schema字段确认版本 - 向后兼容:支持旧版本消息格式
- v1.0协议:通过
-
基于类型的匹配:
- 事件消息:
event类型,如消息接收、状态变更 - 响应消息:
response类型,如认证响应、操作结果 - 通知消息:
notification类型,如系统通知
- 事件消息:
-
基于优先级的匹配:
- 高优先级处理器:认证、心跳等关键消息
- 中优先级处理器:业务事件、用户消息
- 低优先级处理器:统计数据、日志信息
处理器注册机制
消息路由器支持动态注册和注销消息处理器:
// 注册处理器
router.RegisterHandler(new AuthMessageHandler());
router.RegisterHandler(new EventMessageHandler());
router.RegisterHandler(new BinaryMessageHandler());// 处理器接口定义
public interface IMessageHandler
{bool CanHandle(string messageType, MessageVersionInfo version);Task<HandlerResult> HandleAsync(string message, CancellationToken cancellationToken);int Priority { get; }
}
性能优化策略
- 并行处理:支持多个消息并行处理,提高吞吐量
- 缓存机制:缓存处理器匹配结果,减少重复计算
- 批处理:支持批量消息处理,减少网络开销
- 连接池:复用连接资源,提高连接效率
错误处理机制
- 容错设计:单个处理器异常不影响其他处理器
- 降级策略:无匹配处理器时使用默认处理逻辑
- 重试机制:处理失败时支持自动重试
- 监控告警:记录处理异常并触发告警机制
处理器分发机制
/// <summary>
/// 路由策略实现
/// </summary>
public class RoutingStrategy
{private readonly ILogger<MessageRouter> _logger;private readonly FeishuWebSocketOptions _options;public async Task RouteMessageAsync(string message, string sourceType, List<IMessageHandler> handlers, CancellationToken cancellationToken){// 提取消息类型var messageType = ExtractMessageType(message);if (string.IsNullOrEmpty(messageType)){if (_options.EnableLogging)_logger.LogWarning("无法提取消息类型 (来源: {SourceType}): {Message}", sourceType, message);return;}// 查找能处理该消息类型的处理器var handler = handlers.FirstOrDefault(h => h.CanHandle(messageType));if (handler == null){if (_options.EnableLogging)_logger.LogWarning("未找到能处理消息类型 {MessageType} 的处理器 (来源: {SourceType})", messageType, sourceType);return;}if (_options.EnableLogging)_logger.LogDebug("将消息路由到处理器: {HandlerType} (来源: {SourceType}, 消息类型: {MessageType})",handler.GetType().Name, sourceType, messageType);await handler.HandleAsync(message, cancellationToken);}/// <summary>/// 提取消息类型/// </summary>private string ExtractMessageType(string message){try{using var jsonDoc = JsonDocument.Parse(message);var root = jsonDoc.RootElement;// 检查是否为v2.0版本if (root.TryGetProperty("schema", out var schemaElement) &&schemaElement.GetString() == "2.0"){if (root.TryGetProperty("header", out var headerElement) &&headerElement.TryGetProperty("event_type", out var eventTypeElement)){return "event"; // v2.0主要是事件消息}}// v1.0版本处理if (root.TryGetProperty("type", out var typeElement)){return typeElement.GetString()?.ToLowerInvariant() ?? string.Empty;}return string.Empty;}catch (JsonException ex){_logger.LogError(ex, "解析消息JSON失败");return string.Empty;}}
}
3.4 二进制消息处理器(BinaryMessageProcessor)
二进制消息处理器是飞书WebSocket框架中专门处理二进制数据流的核心组件,负责增量接收、数据组装、格式解析和消息分发。它采用流式处理架构,支持大消息的分片接收和解析。
二进制消息处理架构
增量数据接收
/// <summary>
/// 二进制消息处理器 - 负责处理二进制数据的增量接收和解析
/// </summary>
public class BinaryMessageProcessor : IDisposable
{private readonly ILogger<BinaryMessageProcessor> _logger;private readonly FeishuWebSocketOptions _options;private MemoryStream? _binaryDataStream;private readonly object _binaryDataStreamLock = new object();private DateTime _binaryDataReceiveStartTime = DateTime.MinValue;private bool _disposed = false;private readonly MessageRouter? _messageRouter;private readonly WebSocketConnectionManager? _connectionManager;public event EventHandler<WebSocketBinaryMessageEventArgs>? BinaryMessageReceived;public event EventHandler<WebSocketErrorEventArgs>? Error;/// <summary>/// 处理二进制数据/// </summary>public async Task ProcessBinaryDataAsync(byte[] data, int offset, int count, bool endOfMessage, CancellationToken cancellationToken = default){try{lock (_binaryDataStreamLock){// 如果是新消息的开始,初始化内存流if (_binaryDataStream == null){_binaryDataStream = new MemoryStream();_binaryDataReceiveStartTime = DateTime.UtcNow;if (_options.EnableLogging)_logger.LogDebug("开始接收新的二进制消息");}// 写入数据片段_binaryDataStream.Write(data, offset, count);// 检查数据大小限制if (_binaryDataStream.Length > _options.MaxBinaryMessageSize){var errorMessage = $"二进制消息大小超过限制 ({_binaryDataStream.Length} > {_options.MaxBinaryMessageSize})";_logger.LogError(errorMessage);// 清理当前数据流_binaryDataStream.Dispose();_binaryDataStream = null;// 触发错误事件OnError(errorMessage, "MessageSizeExceeded");return;}// 如果消息接收完成if (endOfMessage){var completeData = _binaryDataStream.ToArray();var receiveDuration = DateTime.UtcNow - _binaryDataReceiveStartTime;if (_options.EnableLogging)_logger.LogInformation("二进制消息接收完成,大小: {Size} 字节,耗时: {Duration}ms",completeData.Length, receiveDuration.TotalMilliseconds);// 异步处理完整的二进制消息_ = Task.Run(async () =>{await ProcessCompleteBinaryMessageAsync(completeData, cancellationToken);}, cancellationToken);// 清理资源_binaryDataStream.Dispose();_binaryDataStream = null;}else{if (_options.EnableLogging)_logger.LogDebug("已接收二进制消息片段,当前总大小: {Size} 字节", _binaryDataStream.Length);}}}catch (Exception ex){// 发生异常时清理资源lock (_binaryDataStreamLock){_binaryDataStream?.Dispose();_binaryDataStream = null;}if (_options.EnableLogging)_logger.LogError(ex, "处理二进制消息时发生错误");OnError($"处理二进制消息时发生错误: {ex.Message}", ex.GetType().Name);}}
}
核心特性
- 📦 流式处理:支持大消息分片接收
- 🔄 双格式支持:ProtoBuf优先,JSON回退
- 📊 大小限制:可配置的消息大小限制
- 🎯 自动路由:解析后自动路由到消息处理器
处理流程
- 增量接收:分片接收二进制数据,写入内存流
- 大小检查:实时监控数据大小,防止内存溢出
- 格式解析:ProtoBuf反序列化,失败则回退到JSON
- 消息路由:提取JSON Payload,路由到对应处理器
- ACK确认:向服务器发送处理确认
主客户端集成(FeishuWebSocketClient)
FeishuWebSocketClient是整个框架的门面组件,负责协调和编排各个子组件的工作。它采用组件化设计模式,将复杂的WebSocket连接管理功能分解为独立的、可复用的组件,并通过统一的事件系统进行集成。
客户端架构集成
组件协调与编排
/// <summary>
/// 飞书WebSocket客户端 - 采用组件化设计提高可维护性
/// </summary>
public sealed class FeishuWebSocketClient : IFeishuWebSocketClient, IDisposable
{private readonly ILogger<FeishuWebSocketClient> _logger;private readonly FeishuWebSocketOptions _options;private readonly IFeishuEventHandlerFactory _eventHandlerFactory;private readonly WebSocketConnectionManager _connectionManager;private readonly AuthenticationManager _authManager;private readonly MessageRouter _messageRouter;private readonly BinaryMessageProcessor _binaryProcessor;private readonly ConcurrentQueue<string> _messageQueue = new();private readonly List<Func<string, Task>> _messageProcessors = new();private Task? _messageProcessingTask;private ILoggerFactory _loggerFactory;private bool _disposed = false;private CancellationTokenSource? _cancellationTokenSource;/// <inheritdoc/>public WebSocketState State => _connectionManager.State;/// <inheritdoc/>public bool IsAuthenticated => _authManager.IsAuthenticated;/// <inheritdoc/>public event EventHandler<EventArgs>? Connected;/// <inheritdoc/>public event EventHandler<WebSocketCloseEventArgs>? Disconnected;/// <inheritdoc/>public event EventHandler<WebSocketMessageEventArgs>? MessageReceived;/// <inheritdoc/>public event EventHandler<WebSocketErrorEventArgs>? Error;/// <inheritdoc/>public event EventHandler<EventArgs>? Authenticated;/// <inheritdoc/>public event EventHandler<WebSocketPingEventArgs>? PingReceived;/// <inheritdoc/>public event EventHandler<WebSocketPongEventArgs>? PongReceived;/// <inheritdoc/>public event EventHandler<WebSocketHeartbeatEventArgs>? HeartbeatReceived;/// <inheritdoc/>public event EventHandler<WebSocketFeishuEventArgs>? FeishuEventReceived;/// <inheritdoc/>public event EventHandler<WebSocketBinaryMessageEventArgs>? BinaryMessageReceived;/// <summary>/// 默认构造函数/// </summary>public FeishuWebSocketClient(ILogger<FeishuWebSocketClient> logger,IFeishuEventHandlerFactory eventHandlerFactory,ILoggerFactory loggerFactory,FeishuWebSocketOptions? options = null){_logger = logger ?? throw new ArgumentNullException(nameof(logger));_eventHandlerFactory = eventHandlerFactory ?? throw new ArgumentNullException(nameof(eventHandlerFactory));_options = options ?? new FeishuWebSocketOptions();_loggerFactory = loggerFactory;// 初始化组件_connectionManager = new WebSocketConnectionManager(_loggerFactory.CreateLogger<WebSocketConnectionManager>(), _options);_authManager = new AuthenticationManager(_loggerFactory.CreateLogger<AuthenticationManager>(), _options, (message) => SendMessageAsync(message));_messageRouter = new MessageRouter(_loggerFactory.CreateLogger<MessageRouter>(), _options);_binaryProcessor = new BinaryMessageProcessor(_loggerFactory.CreateLogger<BinaryMessageProcessor>(), _connectionManager, _options, _messageRouter);// 订阅组件事件SubscribeToComponentEvents();// 注册消息处理器RegisterMessageHandlers();}/// <summary>/// 订阅组件事件/// </summary>private void SubscribeToComponentEvents(){// 连接管理器事件_connectionManager.Connected += (s, e) => Connected?.Invoke(this, e);_connectionManager.Disconnected += (s, e) => Disconnected?.Invoke(this, e);_connectionManager.Error += (s, e) => Error?.Invoke(this, e);// 认证管理器事件_authManager.Authenticated += (s, e) => Authenticated?.Invoke(this, e);_authManager.AuthenticationFailed += (s, e) => Error?.Invoke(this, e);// 二进制处理器事件_binaryProcessor.BinaryMessageReceived += (s, e) => BinaryMessageReceived?.Invoke(this, e);_binaryProcessor.Error += (s, e) => Error?.Invoke(this, e);}/// <summary>/// 注册消息处理器/// </summary>private void RegisterMessageHandlers(){var pingPongHandler = new PingPongMessageHandler(_loggerFactory.CreateLogger<PingPongMessageHandler>(),_options,(message) => SendMessageAsync(message));var authHandler = new AuthMessageHandler(_loggerFactory.CreateLogger<AuthMessageHandler>(),(success) =>{if (success){_authManager.HandleAuthResponse("{\"code\":0,\"msg\":\"Authentication successful\"}");}else{_authManager.HandleAuthResponse("{\"code\":-1,\"msg\":\"Authentication failed\"}");}});var heartbeatHandler = new HeartbeatMessageHandler(_loggerFactory.CreateLogger<HeartbeatMessageHandler>(), _options);_messageRouter.RegisterHandler(pingPongHandler);_messageRouter.RegisterHandler(authHandler);_messageRouter.RegisterHandler(heartbeatHandler);}/// <summary>/// 建立WebSocket连接/// </summary>public async Task ConnectAsync(WsEndpointResult endpoint, CancellationToken cancellationToken = default){if (endpoint == null)throw new ArgumentNullException(nameof(endpoint));await _connectionManager.ConnectAsync(endpoint.Url, cancellationToken);// 启动消息接收_cancellationTokenSource = new CancellationTokenSource();_ = Task.Run(() => StartReceivingAsyncInternal(_cancellationTokenSource.Token), _cancellationTokenSource.Token);// 启动心跳_ = Task.Run(() => StartHeartbeatAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);// 启动消息队列处理if (_options.EnableMessageQueue){_messageProcessingTask = ProcessMessageQueueAsync(_cancellationTokenSource.Token);}}/// <summary>/// 建立WebSocket连接并进行认证/// </summary>public async Task ConnectAsync(WsEndpointResult endpoint, string appAccessToken, CancellationToken cancellationToken = default){await ConnectAsync(endpoint, cancellationToken);await _authManager.AuthenticateAsync(appAccessToken, cancellationToken);}/// <summary>/// 断开WebSocket连接/// </summary>public async Task DisconnectAsync(CancellationToken cancellationToken = default){_cancellationTokenSource?.Cancel();await _connectionManager.DisconnectAsync(cancellationToken);}/// <summary>/// 发送消息/// </summary>public async Task SendMessageAsync(string message, CancellationToken cancellationToken = default){await _connectionManager.SendMessageAsync(message, cancellationToken);}/// <summary>/// 注册消息处理器/// </summary>public void RegisterMessageProcessor(Func<string, Task> processor){if (processor == null)throw new ArgumentNullException(nameof(processor));_messageProcessors.Add(processor);}
}
客户端生命周期管理
组件协作时序图
核心设计模式
-
门面模式(Facade Pattern):
- FeishuWebSocketClient作为统一入口
- 隐藏内部组件复杂性
- 提供简洁的API接口
-
观察者模式(Observer Pattern):
- 事件驱动的组件通信
- 松耦合的组件协作
- 支持多订阅者监听
-
策略模式(Strategy Pattern):
- 可插拔的消息处理器
- 灵活的路由策略
- 动态处理器注册
-
工厂模式(Factory Pattern):
- EventHandlerFactory创建处理器
- 统一的组件初始化
- 依赖注入支持
四、应用示例
4.1 事件处理器实现
用户创建事件处理器示例
/// <summary>
/// 演示用户事件处理器 - 继承预定义的用户创建事件处理器
/// </summary>
public class DemoUserEventHandler : DefaultFeishuEventHandler<UserCreatedResult>
{private readonly DemoEventService _eventService;private readonly INotificationService _notificationService;public DemoUserEventHandler(ILogger<DemoUserEventHandler> logger,INotificationService notificationService) : base(logger){_eventService = eventService ?? throw new ArgumentNullException(nameof(eventService));_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));}protected override async Task ProcessBusinessLogicAsync(EventData eventData, UserCreatedResult? userCreated, CancellationToken cancellationToken = default){if (eventData == null)throw new ArgumentNullException(nameof(eventData));_logger.LogInformation("👤 [用户事件] 开始处理用户创建事件: {EventId}", eventData.EventId);try{// 1. 数据验证和转换var user = ValidateAndTransformUser(userCreated?.Object);// 2. 保存到数据库await _eventService.RecordUserEventAsync(user, cancellationToken);// 3. 业务逻辑处理await ProcessUserEventAsync(user, cancellationToken);// 4. 更新统计信息_eventService.IncrementUserCount();_logger.LogInformation("✅ [用户事件] 用户创建事件处理完成: 用户ID {UserId}, 用户名 {UserName}",user.UserId, user.UserName);}catch (Exception ex){_logger.LogError(ex, "❌ [用户事件] 处理用户创建事件失败: {EventId}", eventData.EventId);throw;}}private UserData ValidateAndTransformUser(UserCreatedResult? userCreated){if (userCreated == null)throw new ArgumentException("用户创建数据不能为空");if (string.IsNullOrWhiteSpace(userCreated.UserId))throw new ArgumentException("用户ID不能为空");if (string.IsNullOrWhiteSpace(userCreated.Name))throw new ArgumentException("用户名不能为空");return new UserData{UserId = userCreated.UserId,UserName = userCreated.Name,Email = userCreated.Email ?? string.Empty,Department = userCreated.Department ?? string.Empty,EmployeeType = userCreated.EmployeeType ?? string.Empty,CreatedAt = userCreated.CreatedAt};}private async Task ProcessUserEventAsync(UserData user, CancellationToken cancellationToken){// 模拟异步业务操作await Task.Delay(100, cancellationToken);// 验证必要字段if (string.IsNullOrWhiteSpace(user.UserId)){throw new ArgumentException("用户ID不能为空");}// 模拟发送欢迎通知if (_options.EnableLogging)_logger.LogInformation("📧 [用户事件] 发送欢迎通知给用户: {UserName} ({Email})",user.UserName, user.Email);await _notificationService.SendWelcomeEmailAsync(user, cancellationToken);// 模拟用户配置文件创建if (_options.EnableLogging)_logger.LogInformation("⚙️ [用户事件] 创建用户配置文件: {UserId}", user.UserId);await _eventService.CreateUserProfileAsync(user, cancellationToken);// 模拟权限初始化if (_options.EnableLogging)_logger.LogInformation("🔐 [用户事件] 初始化用户权限: {UserId}", user.UserId);await _eventService.InitializeUserPermissionsAsync(user, cancellationToken);await Task.CompletedTask;}
}
4.2 继承预定义事件处理器
/// <summary>
/// 演示部门事件处理器 - 继承预定义的部门创建事件处理器
/// </summary>
public class DemoDepartmentEventHandler : DepartmentCreatedEventHandler
{private readonly DemoEventService _eventService;private readonly INotificationService _notificationService;private readonly IPermissionService _permissionService;public DemoDepartmentEventHandler(ILogger<DemoDepartmentEventHandler> logger, DemoEventService eventService,INotificationService notificationService,IPermissionService permissionService) : base(logger){_eventService = eventService ?? throw new ArgumentNullException(nameof(eventService));_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));_permissionService = permissionService ?? throw new ArgumentNullException(nameof(permissionService));}protected override async Task ProcessBusinessLogicAsync(EventData eventData, ObjectEventResult<DepartmentCreatedResult>? departmentData, CancellationToken cancellationToken = default){if (eventData == null)throw new ArgumentNullException(nameof(eventData));_logger.LogInformation("[部门事件] 开始处理部门创建事件: {EventId}", eventData.EventId);try{// 1. 验证部门数据var department = ValidateDepartmentData(departmentData?.Object);// 2. 记录事件到服务await _eventService.RecordDepartmentEventAsync(department, cancellationToken);// 3. 处理部门业务逻辑await ProcessDepartmentEventAsync(department, cancellationToken);// 4. 初始化部门权限await InitializeDepartmentPermissionsAsync(department, cancellationToken);// 5. 更新统计信息_eventService.IncrementDepartmentCount();_logger.LogInformation("[部门事件] 部门创建事件处理完成: 部门ID {DepartmentId}, 部门名 {DepartmentName}",department.DepartmentId, department.Name);}catch (Exception ex){_logger.LogError(ex, "[部门事件] 处理部门创建事件失败: {EventId}", eventData.EventId);throw;}}private DepartmentData ValidateDepartmentData(DepartmentCreatedResult? departmentResult){if (departmentResult == null)throw new ArgumentException("部门创建数据不能为空");if (string.IsNullOrWhiteSpace(departmentResult.DepartmentId))throw new ArgumentException("部门ID不能为空");if (string.IsNullOrWhiteSpace(departmentResult.Name))throw new ArgumentException("部门名不能为空");return new DepartmentData{DepartmentId = departmentResult.DepartmentId,Name = departmentResult.Name,ParentDepartmentId = departmentResult.ParentDepartmentId,LeaderUserId = departmentResult.LeaderUserId,DepartmentLevel = departmentResult.DepartmentLevel,CreatedAt = DateTime.UtcNow};}private async Task ProcessDepartmentEventAsync(DepartmentData department, CancellationToken cancellationToken){// 模拟异步业务操作await Task.Delay(100, cancellationToken);// 模拟设置部门配置if (_options.EnableLogging)_logger.LogInformation("[部门事件] 设置部门配置: {DepartmentName}", department.Name);await _eventService.ConfigureDepartmentSettingsAsync(department, cancellationToken);// 通知部门主管if (!string.IsNullOrWhiteSpace(department.LeaderUserId)){if (_options.EnableLogging)_logger.LogInformation("[部门事件] 通知部门主管: {LeaderUserId}", department.LeaderUserId);await _notificationService.NotifyDepartmentLeaderAsync(department, cancellationToken);}// 处理层级关系if (!string.IsNullOrWhiteSpace(department.ParentDepartmentId)){if (_options.EnableLogging)_logger.LogInformation("[部门事件] 建立层级关系: {DepartmentId} -> {ParentDepartmentId}",department.DepartmentId, department.ParentDepartmentId);await _eventService.UpdateDepartmentHierarchyAsync(department, cancellationToken);}await Task.CompletedTask;}private async Task InitializeDepartmentPermissionsAsync(DepartmentData department, CancellationToken cancellationToken){if (_options.EnableLogging)_logger.LogInformation("[部门事件] 初始化部门权限: {DepartmentName}", department.Name);// 创建部门默认权限var defaultPermissions = new[]{"department.view","department.edit","department.member.manage"};foreach (var permission in defaultPermissions){await _permissionService.GrantPermissionAsync(department.DepartmentId, permission, cancellationToken);}// 为部门主管分配管理员权限if (!string.IsNullOrWhiteSpace(department.LeaderUserId)){await _permissionService.GrantPermissionAsync(department.DepartmentId, "department.admin", cancellationToken);}}
}
4.3 依赖注入配置
建造者模式应用
/// <summary>
/// 飞书WebSocket服务注册扩展
/// </summary>
public static class FeishuWebSocketServiceExtensions
{/// <summary>/// 使用建造者模式注册飞书WebSocket服务/// </summary>public static IFeishuWebSocketBuilder AddFeishuWebSocketBuilder(this IServiceCollection services){return new FeishuWebSocketBuilder(services);}
}/// <summary>
/// 飞书WebSocket服务建造者
/// </summary>
public class FeishuWebSocketBuilder
{private readonly IServiceCollection _services;private readonly List<Type> _handlerTypes = new();private FeishuWebSocketOptions _options = new();private bool _useMultiHandler = false;public FeishuWebSocketBuilder(IServiceCollection services){_services = services ?? throw new ArgumentNullException(nameof(services));}/// <summary>/// 从配置文件读取配置/// </summary>public FeishuWebSocketBuilder ConfigureFrom(IConfiguration configuration){configuration.GetSection("Feishu:WebSocket").Bind(_options);return this;}/// <summary>/// 配置选项/// </summary>public FeishuWebSocketBuilder ConfigureOptions(Action<FeishuWebSocketOptions> configure){configure?.Invoke(_options);return this;}/// <summary>/// 启用多处理器模式/// </summary>public FeishuWebSocketBuilder UseMultiHandler(){_useMultiHandler = true;return this;}/// <summary>/// 添加事件处理器/// </summary>public FeishuWebSocketBuilder AddHandler<THandler>() where THandler : class, IFeishuEventHandler{_handlerTypes.Add(typeof(THandler));return this;}/// <summary>/// 构建服务注册/// </summary>public IServiceCollection Build(){// 注册配置选项_services.Configure(_options);if (_useMultiHandler){// 多处理器模式_services.AddSingleton<IFeishuEventHandlerFactory, DefaultFeishuEventHandlerFactory>();// 注册所有处理器类型foreach (var handlerType in _handlerTypes){_services.AddSingleton(typeof(IFeishuEventHandler), handlerType);}_services.AddSingleton<IFeishuWebSocketManager, FeishuWebSocketManager>();}else{// 单处理器模式var handlerType = _handlerTypes.FirstOrDefault();if (handlerType != null){_services.AddSingleton(typeof(IFeishuEventHandler), handlerType);_services.AddSingleton<IFeishuWebSocketManager, FeishuWebSocketManager>();}}return _services;}
}
实际使用示例
// Program.cs 中的配置示例
var builder = WebApplication.CreateBuilder(args);// 方式一:建造者模式注册(推荐)
builder.Services.AddFeishuWebSocketBuilder().ConfigureFrom(builder.Configuration) // 从配置文件读取.UseMultiHandler() // 启用多处理器模式.AddHandler<ReceiveMessageEventHandler>() // 添加消息处理器.AddHandler<UserCreatedEventHandler>() // 添加用户事件处理器.AddHandler<DepartmentCreatedEventHandler>() // 添加部门事件处理器.Build(); // 构建服务注册// 方式二:简化注册
builder.Services.AddFeishuWebSocketServiceWithSingleHandler<ReceiveMessageEventHandler>(options => {options.AutoReconnect = true;options.MaxReconnectAttempts = 5;options.HeartbeatIntervalMs = 30000;options.EnableLogging = true;});// 方式三:从配置文件注册
builder.Services.AddFeishuWebSocketService(builder.Configuration);
4.4 Web API集成
RESTful API设计
/// <summary>
/// WebSocket管理控制器
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class WebSocketController : ControllerBase
{private readonly IFeishuWebSocketManager _webSocketManager;private readonly ILogger<WebSocketController> _logger;public WebSocketController(IFeishuWebSocketManager webSocketManager,ILogger<WebSocketController> logger){_webSocketManager = webSocketManager ?? throw new ArgumentNullException(nameof(webSocketManager));_logger = logger ?? throw new ArgumentNullException(nameof(logger));}/// <summary>/// 启动WebSocket连接/// </summary>[HttpPost("connect")]public async Task<IActionResult> ConnectAsync(){try{await _webSocketManager.StartAsync();return Ok(new { Success = true,Message = "WebSocket连接启动成功",Timestamp = DateTime.UtcNow});}catch (Exception ex){_logger.LogError(ex, "启动WebSocket连接失败");return BadRequest(new { Success = false,Message = $"WebSocket连接启动失败: {ex.Message}",Timestamp = DateTime.UtcNow});}}/// <summary>/// 断开WebSocket连接/// </summary>[HttpPost("disconnect")]public async Task<IActionResult> DisconnectAsync(){try{await _webSocketManager.StopAsync();return Ok(new { Success = true,Message = "WebSocket连接断开成功",Timestamp = DateTime.UtcNow});}catch (Exception ex){_logger.LogError(ex, "断开WebSocket连接失败");return BadRequest(new { Success = false,Message = $"WebSocket连接断开失败: {ex.Message}",Timestamp = DateTime.UtcNow});}}/// <summary>/// 获取连接状态/// </summary>[HttpGet("status")]public IActionResult GetStatus(){var stats = _webSocketManager.GetConnectionStats();var state = _webSocketManager.GetConnectionState();return Ok(new WebSocketStatusResponse{IsConnected = _webSocketManager.IsConnected,State = state.Status.ToString(),Uptime = stats.Uptime,ReconnectCount = stats.ReconnectCount,LastError = stats.LastError?.Message,Timestamp = DateTime.UtcNow});}
}
实时状态监控
/// <summary>
/// WebSocket状态监控服务
/// </summary>
public class WebSocketMonitoringService : IHostedService
{private readonly IFeishuWebSocketManager _webSocketManager;private readonly ILogger<WebSocketMonitoringService> _logger;private readonly Timer _monitoringTimer;private readonly ConcurrentQueue<ConnectionSnapshot> _snapshots = new();public WebSocketMonitoringService(IFeishuWebSocketManager webSocketManager,ILogger<WebSocketMonitoringService> logger){_webSocketManager = webSocketManager;_logger = logger;// 每30秒收集一次状态快照_monitoringTimer = new Timer(CollectStatusSnapshot, null,TimeSpan.Zero, TimeSpan.FromSeconds(30));}/// <summary>/// 收集状态快照/// </summary>private void CollectStatusSnapshot(object? state){var stats = _webSocketManager.GetConnectionStats();var connectionState = _webSocketManager.GetConnectionState();var snapshot = new ConnectionSnapshot{Timestamp = DateTime.UtcNow,IsConnected = stats.IsConnected,Uptime = stats.Uptime,ReconnectCount = stats.ReconnectCount,LastError = stats.LastError,ConnectionState = connectionState.Status.ToString()};_snapshots.Enqueue(snapshot);// 保持最近100个快照while (_snapshots.Count > 100){_snapshots.TryDequeue(out _);}// 分析连接质量AnalyzeConnectionQuality();}/// <summary>/// 分析连接质量/// </summary>private void AnalyzeConnectionQuality(){var recentSnapshots = _snapshots.TakeLast(10).ToList();if (recentSnapshots.Count < 5) return;var connectedCount = recentSnapshots.Count(s => s.IsConnected);var connectivityRate = (double)connectedCount / recentSnapshots.Count;if (connectivityRate < 0.9){_logger.LogWarning("连接质量较差 - 连接率: {ConnectivityRate:P2}", connectivityRate);}// 分析重连频率var reconnectEvents = recentSnapshots.Where(s => s.ReconnectCount > 0).ToList();if (reconnectEvents.Count > 3){_logger.LogWarning("重连频率过高 - 最近10次检测中有{Count}次重连", reconnectEvents.Count);}}
}
🎯 总结
使用 Mud.Feishu.Abstractions 和 Mud.Feishu.WebSocket 两个核心项目构建企业级的飞书WebSocket长连接应用。通过组件化架构设计、策略模式的事件处理、完善的错误隔离机制和全面的监控体系,开发者可以快速构建稳定、可靠的实时事件处理系统。
总的来说,Mud.Feishu.Abstractions 和 Mud.Feishu.WebSocket 两个核心项目的目标是提供一个可靠、易于集成、高度可扩展的飞书WebSocket长连接应用框架,大大降低了飞书WebSocket集成的开发复杂度,让开发者能够专注于业务逻辑的实现。