EPOLL(C/S模型)实现I/O复用多进程聊天室,通过共享内存、socketpair实现父子进程通信,通过信号量回收进程

这里只展示了server端,client端可以用之前的poll写的。
每个client我们fork一个子进程用epoll来实现它的I/O复用。
非常巧妙的使用共享内存,通过给每个client编号以及BUFFER_SIZE保存需要广播和接受的内容,因为有了编号,所以父子进程的socketpair通信,我们只要传编号就可以表示这个client需要广播的内容了。
最后就是里面注册的信号量,父进程不能直接说关闭就关闭,否则没有及时关闭的子进程会变成僵尸进程,所以我们通过注册的信号量来让系统走我们把所以子进程都关闭再关闭自己的逻辑。

缺点
当一部分client频繁发送时,容易出现所处的共享内存上的buffer未发出但是新的又来了,所以可能会导致吞消息的现象,这时我们需要设置缓冲区来解决(下次一定)。

#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <set>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/shm.h>
#include <signal.h>#define  USER_LIMIT 5  //最大用户数量
#define  BUFFER_SIZE 1024  //读缓冲区的大小
#define  FD_LIMIT 65535 //文件描述符数量限制
#define  MAX_EVENT_NUMBER 1024
#define  PROCESS_LIMIT 65536+65536
//客户数据 : socket地址、待写到客户端的数据的位置、从客户端读入的数据
struct client_data
{sockaddr_in address;int connfd;      ///socket文件描述符pid_t pid;      //处理这个连接的子进程pidint pipefd[2];    //与父进程通信的管道
};
static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_men = 0;
//客户连接的数组,进程用客户连接编号来索引,获得相关的数据
client_data* users = 0;
//子进程和客户的连接映射关系表,用进程的pid来索引数据,获取该进程处理的客户连接编号
int* sub_process = 0;
int user_counet = 0 ;
std::set<int> nost;
//当前客户数量
std::set<int> curst;
bool stop_child = false;int setnoblocking(int fd)
{int old_option = fcntl(fd,F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd,F_SETFL,new_option);return old_option;
}
void addfd(int epollfd,int fd)
{epoll_event event ;event.data.fd = fd ;event.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);setnoblocking(fd);
}
void sig_handler(int sig)
{int save_errno = errno;int msg = sig;send(sig_pipefd[1],(char *)&msg,1,0);errno = save_errno;
}
void addsig(int sig , void(*handler)(int) ,bool restart = true )
{struct  sigaction sa ;memset(&sa,'\0',sizeof(sa));sa.sa_handler = handler;if ( restart )sa.sa_flags |= SA_RESTART;sigfillset(&sa.sa_mask);assert(sigaction(sig,&sa,NULL) != 1 );
}
void del_resource()
{close(sig_pipefd[0]);close(sig_pipefd[1]);close(listenfd);close(epollfd);shm_unlink(shm_name);delete [] users;delete sub_process;
}
void child_term_handler(int sig)
{stop_child = true;
}
//子进程运行的函数,参数idx指出该子进程处理的客户连接的编号,user是保存所有客户连接数据的数组,参数share_men指出共享内存的起始位置
int run_child(int idx,client_data* users,char* share_mem)
{epoll_event events[MAX_EVENT_NUMBER];//子进程使用I/O复用技术来同时监控两个文件描述符:客户连接socket、与父进程通信的管道文件描述符int child_epollfd = epoll_create(5);assert(child_epollfd != -1);int connfd = users[idx].connfd;addfd(child_epollfd,connfd);int pipefd = users[idx].pipefd[1];addfd(child_epollfd,pipefd);int ret;//子进程设置自己的信号处理函数addsig(SIGTERM,child_term_handler,false);while ( !stop_child ){int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);if ((number < 0) && (errno != EINTR)){printf("epoll failure\n");break;}for (int i = 0 ; i < number ; i++ ){int sockfd = events[i].data.fd;//本子进程负责的客户连接有数据到达if ( (sockfd == connfd ) && ( events[i].events & EPOLLIN) ){memset(share_mem + idx*BUFFER_SIZE ,'\0',BUFFER_SIZE);//将客户数据读取到对应的读缓存中,该读缓存时共享内存的一段ret = recv(connfd,share_mem+idx*BUFFER_SIZE,BUFFER_SIZE-1,0);if ( ret < 0 ){if ( errno != EAGAIN ){stop_child = true;}}else if ( ret == 0 ){stop_child = true;}else{send(pipefd,(char *)&idx,sizeof(idx),0);}}else if ( (sockfd == pipefd) && (events[i].events & EPOLLIN) ){int client = 0 ;//接受主进程发来的数据ret = recv(sockfd,(char *)&client , sizeof(client),0);if ( ret < 0 ){if ( errno != EAGAIN ){stop_child = true;}}else if ( ret == 0 ){stop_child = true;}else{send(connfd,share_mem+client*BUFFER_SIZE,BUFFER_SIZE,0);}}}}close(connfd);close(pipefd);close(child_epollfd);return 0;
}
int main()
{const char* ip = "192.168.174.129" ;int port = 5050 ;int ret = 0;sockaddr_in address;bzero(&address,sizeof(address));address.sin_family = AF_INET;inet_pton(AF_INET,ip,&address.sin_addr);address.sin_port = htons(port);listenfd = socket(PF_INET,SOCK_STREAM,0);assert(listenfd >=0 );ret = bind(listenfd,(struct sockaddr *)&address,sizeof(address));assert(ret != -1);ret = listen(listenfd,5);assert(ret != -1);user_counet = 0;users = new client_data [USER_LIMIT+1];sub_process = new int[PROCESS_LIMIT];for (int i = 1 ; i < PROCESS_LIMIT ; i++ )sub_process[i] = -1;for (int i = 0 ; i < USER_LIMIT ; i++ )nost.insert(i);epoll_event ev;epoll_event events[MAX_EVENT_NUMBER];epollfd = epoll_create(USER_LIMIT);assert(epollfd != -1);addfd(epollfd,listenfd);ret = socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);assert(ret != -1);setnoblocking(sig_pipefd[1]);addfd(epollfd,sig_pipefd[0]);addsig(SIGCLD,sig_handler);addsig(SIGTERM,sig_handler);addsig(SIGINT,sig_handler);addsig(SIGPIPE,SIG_IGN);bool stop_server = false;bool terminate = false;//创建共享内存,作为客户socket连接的读缓存shmfd = shm_open(shm_name,O_CREAT|O_RDWR,0666);assert(shmfd != -1);ret = ftruncate(shmfd,USER_LIMIT*BUFFER_SIZE);assert(ret != -1);share_men = (char *) mmap(NULL,USER_LIMIT*BUFFER_SIZE,PROT_READ|PROT_WRITE,MAP_SHARED,shmfd,0);assert(share_men != MAP_FAILED);close(shmfd);while ( !stop_server ){int number = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);if ( (number < 0 ) && ( errno != EINTR ) ){printf("epoll falure\n");break;}for (int i  = 0 ; i < number; i++ ){int sockfd = events[i].data.fd;if ( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);if ( connfd < 0 ) {printf("errno is : %d \n", errno);continue;}if ( curst.size() >= USER_LIMIT ){const char * info = "too many users\n";printf("%s",info);send(connfd,info, strlen(info),0);close(connfd);continue;}user_counet = *nost.begin();nost.erase(nost.begin());curst.insert(user_counet);users[user_counet].address = client_address;users[user_counet].connfd = connfd;ret = socketpair(PF_UNIX,SOCK_STREAM,0,users[user_counet].pipefd);assert(ret != -1);pid_t  pid = fork();if ( pid < 0 ){close(connfd);continue;}else if ( pid == 0 ){close(epollfd);close(listenfd);close(users[user_counet].pipefd[0]);close(sig_pipefd[0]);close(sig_pipefd[1]);run_child(user_counet,users,share_men);munmap((void*)share_men,USER_LIMIT*BUFFER_SIZE);exit(0);}else{close(connfd);close(users[user_counet].pipefd[1]);addfd(epollfd,users[user_counet].pipefd[0]);users[user_counet].pid = pid;printf("client %d join , now curclient %d \n",user_counet,curst.size());sub_process[pid] = user_counet;}}else if ( (sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)){int sig;char signals[1024];printf("recv sig !!!\n");ret = recv(sig_pipefd[0],signals,sizeof(signals),0);if ( ret == -1 ){continue;}else if ( ret == 0 ){continue;}else{for (int k = 0 ; k < ret; ++k ){switch (signals[k]){case SIGCHLD:{pid_t pid;int stat;while ( (pid = waitpid(-1,&stat,WNOHANG)) > 0 ){//用子进程的pid取需要关闭的客户连接idint del_user = sub_process[pid];sub_process[pid] = -1;if ( ( del_user < 0 ) || ( del_user > USER_LIMIT ) ){continue;}printf("close : %d \n",del_user);nost.insert(del_user);curst.erase(del_user);//清除第del_user个客户端连接使用的相关数据epoll_ctl(epollfd,EPOLL_CTL_DEL,users[del_user].pipefd[0],0);close(users[del_user].pipefd[0]);users[del_user] = users[USER_LIMIT];sub_process[users[del_user].pid] = del_user;}if ( terminate && curst.empty() ){stop_server = true;}break;}case SIGTERM:case SIGINT:{//结束服务器程序printf("kill all the child now\n");if ( curst.empty() ){stop_server = true ;break;}for (auto & j : curst){int pid = users[j].pid;kill(pid,SIGTERM);}terminate = true ;break;}default:{break;}}}}}//某个子进程向父进程写了数据else if ( events[i].events & EPOLLIN ){int child = 0;//读取管道数据,child变量记录了是哪个客户连接有数据可达ret = recv(sockfd,(char *)&child,sizeof(child),0);//printf("read data from child accross pipe\n");if ( ret == -1 ){continue;}else if ( ret == 0 ){continue;}else{printf("read data from child accross pipe %s \n",share_men+child*BUFFER_SIZE);//向除了负责第child个客户连接的子进程之外的其他进程发送消息,通知他们的客户端有数据要写for (auto j : curst){if ( users[j].pipefd[0] != sockfd ){printf("send data to child accross pipe\n");send(users[j].pipefd[0],(char* )&child,sizeof(child),0);}}}}}}del_resource();return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/130399.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis7.x 高级篇

Redis7.x 高级篇 Redis版本发行时间Redis单线程说的是什么东西 Redis版本发行时间 Redis单线程说的是什么东西

蓝桥杯官网填空题(合法括号序列)

题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 由 1 对括号&#xff0c;可以组成一种合法括号序列&#xff1a;()。 由 2 对括号&#xff0c;可以组成两种合法括号序列&#xff1a;()()、(())。 由 4 对括号组…

intellij idea拉取最新的依赖包

intellij idea setting 拉取最新的依赖包 File --> Settings --> Build, Execution, Deployment --> Build Tools --> Maven --> 勾选 Always update snapshots. 一般情况下&#xff0c;设置完就可以拉取到最新的依赖包了。 如下&#xff1a; 安装最新的依赖…

配置git并把本地项目连接github

一.配置git 1.下载git&#xff08;Git&#xff09;&#xff0c;但推荐使用国内镜像下载&#xff08;CNPM Binaries Mirror&#xff09; 选好64和版本号下载&#xff0c;全部点下一步 下载完成后打开终端&#xff0c;输入 git --version 出现版本号则说明安装成功 然后继续…

将Bean注入Spring容器的五种方式

将bean放入Spring容器中有哪些方式&#xff1f; 我们知道平时在开发中使用Spring的时候&#xff0c;都是将对象交由Spring去管理&#xff0c;那么将一个对象加入到Spring容器中&#xff0c;有哪些方式呢&#xff0c;下面我就来总结一下 1、Configuration Bean 这种方式其实也是…

pyodbc库删除MSSQL表中重复字段及排序函数ROW_NUMBER()运用

ROW_NUMBER() 是一个窗口函数&#xff0c;用于在 SQL 查询结果集中为每一行分配一个唯一的数字标识符。这个标识符通常用于数据排序、分组或者在结果集中标识特定行。 ROW_NUMBER() 函数的语法如下&#xff1a; ROW_NUMBER() OVER ([PARTITION BY column1, column2, ...][ORD…

修改docker 版本的mysql 8.0 本机Navicat 连不上的问题

1.进入容器 docker exec -it xxxx bash 2.使用root账号登录mysql mysql -u root -p 3.查看当前加密方式 use mysql; SELECT Host, User, plugin from user; 我这是改过了&#xff0c;应该都是caching_sha2_password 4. 修改加密方式 ALTER USER root% IDENTIFIED WITH m…

关于单片机CPU如何控制相关引脚

目录 1、相关的单片机结构 2、通过LED的实例解释 1、相关的单片机结构 在寄存器中每一块都有一根导线与引脚对应&#xff0c;通过cpu改变寄存器内的数据&#xff08;0或1&#xff09;&#xff0c;通过驱动器来控制对于的引脚。 2、通过LED的实例解释 如图所示&#xff0c;芯片…

C语言 程序环境 编译和链接

目录 1.程序的翻译环境和执行环境 2.详解C语言程序的编译和链接 2.1翻译环镜 2.2翻译的几个阶段 2.2.1预编译 2.2.2编译 词法分析 符号汇总 2.2.3汇编 生成符号表 2.3链接 1.合并段表 2.合并符号表和重定位 2.4运行环境 1.程序的翻译环境和执行环境 在ANSI C的任…

rhcsa-vim

命令行的三种模式 将ets下的passwd文件复制到普通用户下面 编辑模式的快捷方式 a--光标后插入 A--行尾插入 o--光标所在上一行插入 O--光标所在上一行插入 i--光标前插入 I--行首插入 s--删除光标所在位然后进行插入模式 S--删除光标所在行然后进行插入 命令模式的快捷…

vue详细安装教程

这里写目录标题 一、下载和安装node二、创建全局安装目录和缓存日志目录三、安装vue四、创建一个应用程序五、3x版本创建六、创建一个案例 一、下载和安装node 官网下载地址&#xff1a;https://nodejs.org/en/download 选择适合自己的版本&#xff0c;推荐LTS&#xff0c;长久…

代码随想录算法训练营第10天|232. 用栈实现队列 225. 用队列实现栈

JAVA代码编写 232. 用栈实现队列 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 MyQueue 类&#xff1a; void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除…

Quartus II 13.0波形仿真(解决无法产生仿真波形问题)

目录 前言 新建工程 创建Verilog文件&#xff0c;写代码 波形仿真&#xff08;解决没有输出波问题&#xff09; 前言 这么说把Quartus II 13.0是我目前来讲见过最恶心的软件&#xff0c;总是一大堆麻烦事&#xff0c;稍微哪里没弄好就后面全都出问题。很多人在写完Verilog代…

数组反转(LeetCode)

凑数 ... 描述 : 给你一个 32 位的有符号整数 x &#xff0c;返回将 x 中的数字部分反转后的结果。 如果反转后整数超过 32 位的有符号整数的范围 [−231, 231 − 1] &#xff0c;就返回 0。 假设环境不允许存储 64 位整数&#xff08;有符号或无符号&#xff09;。 题目…

OpenCV检测圆(Python版本)

文章目录 示例代码示例结果调参 示例代码 import cv2 import numpy as np# 加载图像 image_path DistanceComparison/test_image/1.png image cv2.imread(image_path, cv2.IMREAD_COLOR)# 将图像转换为灰度 gray cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# 使用高斯模糊消除…

node插件express(路由)的基本使用(一)

文章目录 一、express的了解1.定义2.作用3.使用express的前提条件&#xff08;1&#xff09;如果是新文件夹需要薪资package.json文件&#xff0c;如果有就忽略&#xff08;2&#xff09;安装第三方依赖包&#xff08;3&#xff09;在使用的地方导入express 二、express的基本使…

Jetpack:028-Jetpack中的Card

文章目录 1. 概念介绍2. 使用方法2.1 主要类型2.2 其它类型 3. 示例代码4. 内容总结 我们在上一章回中介绍了Jetpack中Switch相关的内容&#xff0c;本章回中 主要介绍Card。闲话休提&#xff0c;让我们一起Talk Android Jetpack吧&#xff01; 1. 概念介绍 我们在本章回中介…

node教程(四)Mongodb+mongoose

文章目录 一、mongodb1.简介1.1Mongodb是什么&#xff1f;1.2数据库是什么&#xff1f;1.3数据库的作用1.4数据库管理数据的特点 2.核心概念3.下载安装与启动4.命令行交互4.1数据库命令4.3文档命令 二、Mongoose1.介绍2.作用3.使用流程4.插入文档5.mongoose字段类型 一、mongod…

el-cascader级联选择器选中一个全选中问题

问题 只选中一个却把同级全选中 解决 :props中添加label、value、children属性 label、value、children属性值需要和后端返回的集合中的字段名保持一致 后端返回数据&#xff1a;

3000 台 Apache ActiveMQ 服务器易受 RCE 攻击

超过三千个暴露在互联网上的 Apache ActiveMQ 服务器容易受到最近披露的关键远程代码执行 (RCE) 漏洞的影响。 Apache ActiveMQ 是一个可扩展的开源消息代理&#xff0c;可促进客户端和服务器之间的通信&#xff0c;支持 Java 和各种跨语言客户端以及许多协议&#xff0c;包括…