【linux】深入了解线程池:基本概念与代码实例(C++)

文章目录

  • 1. 前言
    • 1.1 概念
    • 1.2 应用场景
    • 1.3 线程池的种类
    • 1.4 线程池的通常组成
  • 2. 代码示例
    • 2.1 log.hpp
    • 2.2 lockGuard.hpp
      • ① pthread_mutex_t
    • 2.3 Task.hpp
    • 2.4 thread.hpp
    • 2.5 threadPool.hpp
      • ① 基本框架
      • ② 成员变量
      • ③ 构造函数
      • ④ 其余功能函数:
    • main.cc
    • 结果演示
  • 完整代码

1. 前言

1.1 概念

在这里插入图片描述

线程池是一种并发编程的解决方案(线程使用模式),它由一组工作线程和一个任务队列组成

  • 工作线程在初始化时被创建并持续运行,等待从任务队列中获取任务并执行

  • 当任务执行完成后,线程不会退出,而是继续保持运行状态,等待下一个任务的到来。

  • 线程池不仅能够保证内核的充分利用,还能防止过分调度


1.2 应用场景

  1. 高并发服务器:在高并发的网络服务器中,每个客户端请求通常都需要创建一个新的线程来处理,并发量较高时会导致线程的频繁创建和销毁,浪费大量的系统资源。使用线程池可以复用已有的线程,降低线程创建和销毁的开销

  2. 大规模数据处理在需要大量计算的任务中,使用线程池可以将计算分配到多个线程中进行,提高计算效率

  3. 定时任务在定时任务中,可以将任务提交到线程池中,在预定时间执行任务,避免在定时任务到来时才创建新的线程,提高程序的响应速度

  4. 图像、视频等多媒体处理在多媒体处理中,通常需要对大量的数据进行处理,使用线程池可以将处理任务分配到多个线程中进行,提高处理效率

  5. 批量请求处理在需要批量处理请求的场景中,使用线程池可以将请求分配到多个线程中进行处理,提高处理效率和响应速度

1.3 线程池的种类

根据线程池的实现方式和特性,可以将线程池分为以下几种类型:

  1. 固定大小线程池固定大小线程池是最简单的线程池实现方式,它在初始化时创建一定数量的工作线程,这些线程会一直存在直到线程池销毁

    • 当有新的任务提交到线程池时,会将任务放入任务队列中,并由空闲的工作线程执行。这种线程池的优点是稳定可靠,缺点是无法动态调整线程数量。
  2. 动态大小线程池动态大小线程池可以根据系统负载情况自动调整线程数量,以适应不同的并发量。

    • 当有大量任务需要处理时,会自动增加线程数量,反之则会减少线程数量,从而节约系统资源。这种线程池的优点是能够自适应负载,缺点是实现较为复杂,容易出现线程数量过多或过少的情况。
  3. 定时线程池定时线程池可以在指定时间执行任务,通常用于定时任务、定期检查等场景

    • 当任务提交到线程池后,会设置一个定时器,在指定的时间到达时执行任务。这种线程池的优点是精确可靠,缺点是对定时器进行管理需要额外的开销。
  4. 工作窃取线程池工作窃取线程池是一种高效的线程池实现方式,它采用工作窃取算法,让空闲的线程从其他线程的任务队列中窃取任务,从而避免了任务分配不均的问题。这种线程池的优点是高效均衡,缺点是实现较为复杂。


1.4 线程池的通常组成

线程池通常由以下几个核心组件组成:

  • 线程池管理器(ThreadPool Manager):负责创建和管理线程池的生命周期。
  • 工作队列(Work Queue):用于存储待执行的任务,当线程池中的线程完成当前任务后,会从工作队列中获取新的任务进行处理。
  • 线程池(Thread Pool):包含一组预先创建的线程,这些线程可用于执行任务。
  • 任务(Task):要在线程池中执行的具体操作或代码逻辑。

2. 代码示例

下面我们会介绍线程池的代码示例,主要功能包括:

  1. 创建固定数量线程池,循环从任务队列中获取任务对象
  2. 获取到任务对象后,执行任务对象中的任务接口

2.1 log.hpp

  • 首先对于log.hpp,是一个日志文件,用于在部分正常情况或者程序运行异常时输出信息。

日志文件完全根据需要编写,对于该文件就不再过多描述。

#pragma once#include <iostream>
#include <cstdio>
#include <cstdarg>// 宏定义 日志级别
#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4// 全局字符串数组 : 将日志级别映射为对应的字符串
const char *gLevelMap[] = {"DEBUG","NORMAL","WARNING","ERROR","FATAL"
};#define LOGFILE "./threadpool.log" // LOGFILE: 表示日志文件的路径void logMessage(int level, const char* format, ...)
{// 判断DEBUG_SHOW 是否定义,分别执行操作#ifndef DEBUG_SHOW // 将日志级别映射为对应的字符串if(level == DEBUG) return; // DEBUG_SHOW不存在 且 日志级别为 DEBUG时,返回
#endif// DEBUG_SHOW存在 则执行下面的日志信息 char stdBuffer[1024];time_t timestamp = time(nullptr);// 将日志级别和时间戳格式化后的字符串将会被写入到 stdBuffer 缓冲区中snprintf(stdBuffer, sizeof(stdBuffer), "[%s] [%ld] ", gLevelMap[level], timestamp);char logBuffer[1024];va_list args;va_start(args, format);vsnprintf(logBuffer, sizeof(logBuffer), format, args);va_end(args);FILE* fp = fopen(LOGFILE, "a");
}

2.2 lockGuard.hpp

lockGuard.hpp中我们 实现了一个 需封装了互斥锁的Mutex类 和一个 实现自动加解锁的lockGuard类

  • Mutex类封装了pthread_mutex_t类型的互斥锁
  • lockGuard类是一个RAII风格的加锁方式。

① pthread_mutex_t

pthread_mutex_t是POSIX线程库提供的互斥锁类型与C++标准库中的std::mutex类似,pthread_mutex_t也可以用于实现线程间的互斥访问

详情接口可以看下图
在这里插入图片描述


通过这种方式,lockGuard对象的生命周期和锁的生命周期绑定在一起,可以确保在任何情况下都能保证锁的正确释放,避免死锁等问题

对于lockGuard.hpp,其中包含两个类, Mutex类(封装一个互斥锁) lockGuard类(用于自动释放互斥锁)

Mutex类:

#pragma once#include <mutex>
#include <pthread.h>// Mutex 类是对 pthread_mutex_t 的封装
class Mutex
{
public:Mutex(pthread_mutex_t *mtx):_pmtx(mtx){}~Mutex(){}void lock() // 加锁{pthread_mutex_lock(_pmtx);}void unlock() // 解锁{pthread_mutex_unlock(_pmtx);}private:pthread_mutex_t* _pmtx; // POSIX线程库提供的互斥锁类型
};

lockGuard类:

// RAII 加锁方式
// lockGuard 实现了自动释放互斥锁的效果
class lockGuard
{
public:lockGuard(pthread_mutex_t *mtx):_mtx(mtx){_mtx.lock();}~lockGuard(){_mtx.unlock();}private:Mutex _mtx;
};

2.3 Task.hpp

  • Task类即线程池的任务对象,这里实现的功能很简单,该任务类接收两个参数以及一个处理函数,比如传入 (1, 2 , ADD) 就可以进行相加的操作
#pragma once#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"typedef std::function<int(int, int)> func_t; // 定义一个函数类型,用于传递给Taskclass Task
{
public:// 构造Task(){}Task(int x, int y, func_t func):_x(x),_y(y),_func(func){}void operator()(const std::string& name){// __FILE__ 和 __LINE__ : 预定义宏,用于获取当前文件名和行号。确保它们能够正确展开并提供有效的日志输出信息。logMessage(WARNING, "%s 处理完成: %d+%d=%d | %s | %d",name.c_str(), _x, _y, _func(_x, _y), __FILE__, __LINE__);}private:// 成员变量:两个参数以及处理方法int _x;int _y;func_t _func;
};

2.4 thread.hpp

该文件中 我们实现 一个用于创建和管理线程的简单封装类 Thread使用 POSIX 线程库(pthread)来实现线程的创建和管理。

其中包含两个类:

  • ThreadData用于存储线程的额外数据
  • Thread用于创建和管理线程的简单封装类

ThreadData类:

  • ThreadData 即用于存储线程的内容(所含变量)
#pragma once#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <cstdio>typedef void *(*func_t)(void *);// ThreadData: 存储线程的额外数据
class ThreadData
{
public:void* _args; // 线程参数std::string _name; // 线程名称
};

Thread类:

对于封装的Thread类,其包含以下功能:

  • 创建
  • 等待
  • 获取名称
class Thread
{
public:// 构造函数接受三个参数:num 表示线程编号,callback 是一个函数指针,表示线程要执行的函数,args 是传递给线程函数的参数。Thread(int num, func_t callback, void* args):_func(callback) // 回调指针{// 线程编号和函数指针存储在成员变量 _name 和 _func 中,并初始化 _tData 对象的成员变量 _args 和 _name。char nameBuffer[64]; // 存储线程名称snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num); // 将格式化字符串 "Thread-%d" 和 num 参数合并,生成线程的名称_name = nameBuffer; // 赋值名称_tData._args = args;_tData._name = _name;}~Thread(){}void start() // 创建线程{pthread_create(&_tid, nullptr, _func, (void*)&_tData);}void join() // 等待线程完成{pthread_join(_tid, nullptr);}std::string name() // 返回线程名称{return _name;}private:std::string _name; // 线程名称func_t _func;   // 函数指针:存储线程要执行的函数ThreadData _tData; // ThreadData变量,线程数据pthread_t _tid; // 标识唯一线程
};

2.5 threadPool.hpp

threadPool.hpp 文件主要封装了一个线程池类ThreadPool,作为重点,这里会对每一个函数单独讲解:

① 基本框架

下面的框架展示整个文件的基本内容,将具体的函数功能先省略:

#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <mutex>#include "lockGuard.hpp"
#include "thread.hpp"
#include "log.hpp"const int g_thread_num = 3; // 默认线程个数// 线程池其本质是生产消费者模型
template<class T>
class ThreadPool
{
private:// 构造函数 ThreadPool(int thread_num = g_thread_num):_num(thread_num){}// 禁用拷贝 && 赋值// 防止多个线程池对象共享同一个线程池内部资源而造成混乱ThreadPool(const ThreadPool<T>& other) = delete;const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete;public:pthread_mutex_t *getMutex(){}bool isEmpty() // 判断任务队列是否为空{}void waitCond() //等待条件变量{}T getTask() // 获取任务{}// 获取线程池实例static ThreadPool<T> *getThreadPool(int num = g_thread_num){}// 启动线程void run(){}// 每个线程的执行函数: 消费者执行逻辑static void* routine(void *args){}// 添加任务void pushTask(const T& task){}private:// 成员变量...
};template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;// PTHREAD_MUTEX_INITIALIZER 是一个宏: 用于初始化互斥锁的静态常量。
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;// 这样的定义和初始化可以确保在使用 ThreadPool<T> 类时, thread_ptr 和 mutex 这两个静态成员变量被正确创建和初始化。
// 静态成员变量是该类所有实例共享的,通过初始化为适当值,确保在使用时具有预期的初始状态。

② 成员变量

该ThreadPool类包含以下成员变量

每个成员变量的具体作用都在注释中标出👇

private:std::vector<Thread*> _threads; // 线程集合std::queue<T> _task_queue; // 任务队列int _num; // 表示线程数量static ThreadPool<T>* thread_ptr; // 静态成员指针,用于保存 ThreadPool 类的唯一实例static pthread_mutex_t mutex;   // 静态互斥锁,用于保证对 ThreadPool 类唯一实例的访问的互斥性pthread_mutex_t lock; // 互斥锁pthread_cond_t cond; // 条件变量

③ 构造函数

条件变量是一种线程同步机制,用于等待或唤醒特定条件的线程。

  • pthread库中,可以使用pthread_cond_init函数来初始化条件变量对象。该函数的原型如下:
int pthread_cond_init(pthread_cond_t* cond, const pthread_condattr_t* attr)
ThreadPool(int thread_num = g_thread_num):_num(thread_num){// 初始化互斥锁与条件变量pthread_mutex_init(&lock, nullptr);pthread_cond_init(&cond, nullptr);for(int i = 1; i <= _num; ++i)_threads.push_back(new Thread(i, routine, this));}// 禁用拷贝 && 赋值// 防止多个线程池对象共享同一个线程池内部资源而造成混乱ThreadPool(const ThreadPool<T>& other) = delete;const ThreadPool<T> &operator=(const ThreadPool<T> &other) = delete;

对于上面的部分函数进行解释:

互斥锁是一种线程同步机制,用于保护共享资源,避免多个线程同时访问而引起的竞态条件。在pthread库中,可以使用pthread_mutex_init函数来初始化互斥锁对象。

该函数的原型如下:

int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* attr);

④ 其余功能函数:

getMutex()

  • 返回互斥锁
pthread_mutex_t *getMutex()
{return &lock; // 返回互斥锁指针
}

isEmpty()

  • 判断任务队列是否为空
bool isEmpty() // 判断任务队列是否为空
{return _task_queue.empty();
}

waitCond()

  • 等待条件变量的信号
  • 在调用该函数之前,需要先获得互斥锁,并将其传递给函数,以确保线程在等待条件时不会被其他线程同时访问共享资源。
void waitCond() //等待条件变量
{pthread_cond_wait(&cond, &lock);
}

getTask()

  • 用于线程获取队列
T getTask() // 获取任务
{T t = _task_queue.front(); // 获取队列首位元素(任务)_task_queue.pop(); // 删除该任务return t;
}

getThreadPool()

  • 获取线程池实例
// 获取线程池实例
static ThreadPool<T> *getThreadPool(int num = g_thread_num)
{// 判断是否已经创建了线程池实例if(thread_ptr == nullptr){// 多个线程可能同时访问 thread_ptr,需要通过互斥锁来保护对其的操作lockGuard lockguard(&mutex);// 为了避免在多个线程被阻塞等待锁资源时,当第一个线程创建了线程池后,其他线程仍然会创建新的线程池 的情况// 再次判断if(thread_ptr == nullptr){thread_ptr = new ThreadPool<T>(num);}   }return thread_ptr;
}

run()

  • 用于启动线程池中的线程
// 启动线程
void run()
{// 遍历线程池中的每个线程并依次创建for(auto& iter : _threads){iter->start();logMessage(NORMAL, "%s %s", iter->name().c_str(), "Creation completed");}    
}

rountine()

  • 线程内部的执行逻辑:
    • 获取传来的参数(线程数据)后,在循环中建立任务对象:
    • 依次进行等待条件变量、读取任务
// 每个线程的执行函数: 消费者执行逻辑
static void* routine(void *args)
{ThreadData* td = (ThreadData*)args;ThreadPool<T>* tp = (ThreadPool<T>*)td->_args;while(true){T task;{// 初始化 task 对象,并调用适当的构造函数lockGuard lockguard(tp->getMutex());while(tp->isEmpty()) // 当线程为空,消费操作等待tp->waitCond();// 读取任务task = tp->getTask();}// 执行任务task(td->_name);}
}

pushTask()

  • 用于向任务队列中添加任务
// 添加任务
void pushTask(const T& task)
{   lockGuard lockguard(&lock);_task_queue.push(task); // 将任务传入到 任务队列pthread_cond_signal(&cond); // 等待条件变量的线程,有新的任务可用
}

main.cc

  • 用于形成最后等待可执行文件,
    • 具体为:通过任务类生成一批 题目,并将任务推送给线程池
#include "threadPool.hpp"
#include "Task.hpp"
#include <iostream>
#include <ctime>
#include <unistd.h>
#include <pthread.h>int main()
{srand((unsigned long)time(nullptr) ^ getpid()); // 设置随机数种子ThreadPool<Task>::getThreadPool()->run();   // while(true){int x = rand() % 100 + 1;usleep(1000);int y = rand() % 30 + 1;Task t(x, y, [](int x, int y)->int{return x + y;});logMessage(DEBUG, "create task success: %d + %d = ?", x, y);logMessage(DEBUG, "create task success: %d + %d = ?", x, y);logMessage(DEBUG, "create task success: %d + %d = ?", x, y);logMessage(DEBUG, "create task success: %d + %d = ?", x, y);// 推送任务到线程池ThreadPool<Task>::getThreadPool()->pushTask(t);sleep(1);}return 0;
}

结果演示

最后可以呈现出类似如下的效果:

ThreadPool started.
create task success: 43 + 18 = ?
create task success: 12 + 27 = ?
create task success: 55 + 8 = ?
create task success: 92 + 5 = ?
Waiting for tasks...
create task success: 37 + 21 = ?
create task success: 67 + 29 = ?
create task success: 89 + 14 = ?
create task success: 25 + 3 = ?

完整代码

上文涉及到的代码完整代码如下👇 :

ThreadPool代码实例

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/16908.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数组与指针声明小问题

1、int *p &a; 是 C 语言中的一条语句&#xff0c;它涉及指针的声明和初始化。让我们逐步解释这一行代码的含义&#xff1a; int *p&#xff1a;这是一个指针声明。它声明了一个名为 p 的变量&#xff0c;该变量是一个指向 int 类型数据的指针。 &a&#xff1a;这是取…

动态规划-似包非包问题

组合总和 Ⅳ&#xff08;377&#xff09; 题目描述&#xff1a; 状态表示&#xff1a; 我们看到这题发现有一个限制条件就是目标整数target并且此时数组中的数字是可以重复选择的&#xff0c;这时候不难联想到前面学习的完全背包问题&#xff0c;这题好像符合完全背包问题的…

关于linux磁盘告警问题

案例&#xff1a;我们在执行df命令时&#xff0c;查看到磁盘利用率很高&#xff0c;但是到相对应的目录执行du -sh *来找大文件时进行删除时&#xff0c;发现各个目录相加并不大&#xff0c;如下图&#xff1a; 使用df命令查看到根(/)目录使用到33G&#xff0c;而du命令显示只使…

vscode终端命令行前面出现两个conda环境名的问题决解方法

已经安装了conda&#xff0c;打开vscode的terminal时&#xff0c;命令行前面有两个虚拟环境名。 进入vscode的setting 找到Python->Python:Default Interpreter Path&#xff0c;把这个值复位&#xff0c;就可以解决。 如果不想前面带(base)&#xff0c;可以运行 conda co…

FLIP动画思想

Aerotwist - FLIP Your Animations 还可以用gsap动画库的FLIP插件 Flip | GSAP | Docs & Learning // Get the first position. var first el.getBoundingClientRect();// Now set the element to the last position. el.classList.add(totes-at-the-end);// Read agai…

STP简介

一、STP介绍 STP 即生成树协议&#xff08;Spanning Tree Protocol&#xff09;一种网络协议 STP 主要用于解决以太网中的环路问题。在具有冗余链路的网络环境中&#xff0c;环路可能导致广播风暴、重复帧等不良后果&#xff0c;严重影响网络性能和稳定性。STP 通过在交换机之…

FuTalk设计周刊-Vol.050

#AI漫谈 热点捕手 1.Canva 宣布收购 Affinity 创意套件 平面设计平台 Canva 于 3 月 26 日宣布收购知名设计软件 Affinity 以“迎战”Adobe&#xff0c;不过此后许多设计师开始担心原本采用“永久授权”付费方案的 Affinity 系列软件是否会转为订阅制&#xff0c;而目前 Canv…

Android Studio开发之路(十四)自定义Titlebar以及设置顶部状态栏颜色

一、描述 项目需求&#xff0c;我要做一个下图这样的titlebar,包括一个返回按钮&#xff0c;一个关闭按钮&#xff0c;一个文本框。默认的titlebar按钮设计不太满足我的需求&#xff0c;于是我打算自定义一个titlebar组件&#xff0c;应用到我的每一个页面 二、titlebar组件设…

【NumPy】关于numpy.searchsorted()函数,看这一篇文章就够了

&#x1f9d1; 博主简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

【Qt】数据库(一)SQLITE创建、增删查改

填坑1&#xff1a;如何连续插入 汇总SQlite语句 创建表格&#xff1a;create table <table_name> (f1 type1, f2 type2,…); 增&#xff1a;insert into <table_name> values (value1, value2,…); 改&#xff1a;update <table_name> set <f1value1>,…

AttributeError: module ‘google._upb._message‘ has no attribute ‘Message‘

rolling back to the stable version protobuf3.20.1 resolved the issue for me.

数据结构第二篇【关于java线性表(顺序表)的基本操作】

【关于java线性表&#xff08;顺序表&#xff09;的基本操作】 线性表是什么&#xff1f;&#x1f435;&#x1f412;&#x1f98d;顺序表的定义&#x1f9a7;&#x1f436;&#x1f435;创建顺序表新增元素,默认在数组最后新增在 pos 位置新增元素判定是否包含某个元素查找某个…

.net aot inherit object.Dispose()”: 没有找到适合的方法来重写解决方法

.net aot inherit object.Dispose()”: 没有找到适合的方法来重写解决方法<DisableImplicitFrameworkReferences>true</DisableImplicitFrameworkReferences>

【Python爬虫】案例_百度贴吧

声明&#xff1a;案例只用于学习&#xff0c;不得恶意使用 要求&#xff1a;获取帖子的标题和链接 import requests from lxml import etreeclass Tieba(object):def __init__(self,name):self.url https://tieba.baidu.com/f?ieutf-8&kw{}.format(name)self.headers …

人形机器人建模与控制(三) - 机器人控制

L3 Robot Control Robot Control L3 Robot Control1. RepeatForward KinematicsDifferential KinematicsExternal Wrench (Force & Torque) MappingDynamicsGoal for Today2. Feedback Cancellation3. Joint Space ControlSimpler Control Laws

JDBC批量处理(addBatch/executeBatch/clearBatch)

1-操作多条的时候用批量处理&#xff0c;比单条处理效率更高 2-JDBC常用批量处理方法 &#xff08;1&#xff09;addBatch(string)&#xff1a;添加批量处理的sql语句或参数 &#xff08;2&#xff09;executeBatch&#xff1a;执行批量处理语句 &#xff08;3&#xff09;clea…

Java中IO的四大抽象类

InputStream/OutputStream和Reader/Writer类是所有IO流类的抽象父类&#xff0c;需要先简单了解一下这四个抽象类的作用。然后&#xff0c;通过它们具体的子类熟悉相关的用法。 InputStream 此抽象类是表示字节输入流的所有类的父类。InputStream是一个抽象类&#xff0c;它不…

BUG(18) : Caused by: java.lang.ClassNotFoundException

场景 导入了多个mave依赖, 启动是报java.lang.ClassNotFoundException, 报错位置为依赖包里面的子依赖 解决 什么依赖报错, pom文件直接引入该子依赖

代码随想录算法训练营第二十天 | 654.最大二叉树 、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树

654.最大二叉树 题目链接&#xff1a;https://leetcode.cn/problems/maximum-binary-tree/ 文档讲解&#xff1a;https://programmercarl.com/0654.%E6%9C%80%E5%A4%A7%E4%BA%8C%E5%8F%89%E6%A0%91.html 视频讲解&#xff1a;https://www.bilibili.com/video/BV1MG411G7ox 思路…

使用高性能NIO框架netty实现IM集群对聊方案

文章目录 前言技术积累什么是nettynetty如何实现IM如何实现IM集群 实战演示基础配置netty搭建IM集群redis发布订阅 实战测试 前言 在前面的博文中我们分享了原生websoket集群搭建&#xff0c;也用redis 发布订阅实现了集群消息正常有序分发。但是有不少同学希望风向一期netty实…