1. base 模块
1.1 API
1.1.1 eventfd
int eventfd(unsigned int initval, int flags);
(1)类似信号量;其内部保存了一个 uint64_t 计数器 count,使用 initval 初始化;
(2)read
没有设置 EFD_SEMAPHORE 并且 count 不为 0,返回 count 值,并将 count 设为 0;
如果 count 值为 0,阻塞直到其非 0; 设置 EFD_NONBLOCK 后,返回 EAGAIN
(3)write 调用整数相加,count 最大值为 (64位最大值 - 1),如果超过该值会阻塞;
(4)IO 复用事件
可读:计数器大于 0
可写:不会阻塞的情况下,至少还能加 1;
(5)主要作用:用来替换只作为发出事件信号的 pipe
参考网址:https://www.man7.org/linux/man-pages/man2/eventfd.2.html
1.1.2 timerfd
参考网址:https://www.man7.org/linux/man-pages/man2/timerfd_create.2.html
(1)timerfd_create
创建定时器对象
(2)timerfd_settime
启动 / 停止定时器( new_value.it_value 设为非 0 或 0,该值是相对值,相对调用时的当前时间)
struct itimerspec
{struct timespec it_interval;struct timespec it_value;
};
it_interval 设为 0,只触发一次;非 0 会重复触发;
(3)交互
可读:上次调用 timerfd_settime() 启动后,触发一次或多次
调用 read 时,返回 uint64_t 整数,表明触发次数;
1.2 其他类
1.2.1 ThreadPool
ThreadPool任务队列:生产者消费者模型(1) setMaxQueueSize 设置队列的最大大小(2) start 创建并启动 numThreads 个线程,都放入了 threads_ 中,线程函数为 runInThread(3) run 是向任务队列中添加任务;(4) runInThread 不断调用 take 函数,从任务队列中取出任务并执行;(如果有 threadInitCallback_,需要先执行该初始化函数)
1.2.2 CountDownLatch
CountDownLatch
倒计时,需要显式初始化指明最开始的计数值;内部封装了一个条件变量和一个锁(只有自己使用),当计数减为 0 时,会等待;(1) countDown 函数;计数减1,若减为 0,则唤醒所有阻塞线程;(2) wait 函数;等待 当前计数变为 0;(若为 0,则返回);
1.2.3 Thread
Threadtemplate< class R, class... Args >
class function<R(Args...)>;
R 为返回值类型,Args 为可变参数(1) start; 内部调用了 pthread_create 创建线程,使用初始计数为 1 的 CountDownLatch 来确保获取到 tid 后再返回(2) join; 内部调用了 pthread_join(3) ~Thread; 不会等待线程结束,如果没有调用 join 函数,就会调用 pthread_detach;
(1)pthread_create 和 gettid 区别
pthread_create 返回的是 pthread_t 类型,是一个进程中各个线程之间的标识号,对于这个进程内是唯一的,而不同进程中,每个线程返回线程 ID 的可能是一样的。glibc 的 Pthreads 只保证同一进程内,同一时刻的各个线程 id 不同,不能保证同一进程先后多个线程具有不同 id;
在 Linux 下,建议使用gettid 的返回值作为线程 id;其类型是 pid_t,保证任何时刻都是全局唯一的,0 是非法值;
而gettid 获取的 线程ID 和 PID 是有关系的,因为在linux中线程其实也是一个进程(clone)。在一个进程中,主线程的线程ID 和进程 PID 是一样的,gettid是不可移植的。
(2)为了效率,使用 thread local 缓存了线程 tid
// 内部调用 gettid() ,并缓存了线程 tid;
CurrentThread::tid()
调用 fork 如何保证子进程不会看到这个缓存结果?
使用 pthread_atfork 注册回调函数 child 清空缓存,重新调用 tid();
1.2.4 Condition
对条件变量进行了封装;MutexLock 的 holder_ 中记录了当前持有锁的线程 TID;
(1)Condition 构造时,MutexLock 已经构造完毕,当该线程阻塞在该条件变量时,其他线程获取到锁时,需要改变 holder_ 值;
直观想法是,在调用 pthread_cond_wait 前后重新对 holder_ 进行赋值,这里采用了 UnassignGuard 的思想,构造时,将 holder_ 赋值为 0,析构时,赋值为当前线程 TID;
1.2.5 ThreadLocal
(1)内部创建了 pthread_key_t 类型变量;
1.2.6 FixedBuffer
内部为 固定大小的字符数组 char data_[SIZE]
向 Buffer 中写数据时调用 memcpy
cur_ 指向当前待写入的位置;
全初始化为 0,内部调用 memset
1.2.7 AsyncLogging
使用 unique_ptr 指向缓存块 Buffer;
双缓存方案,前端和后端分别使用两块缓存;
(1)构造时
初始化一个后台线程,申请两块内存,buffers_ 数组大小设置为 16;
(2)start
启动后台线程,这里 CountDownLatch 的作用是确保线程函数执行后,再退出 (确保在线程中 running_ 一定为 Ture ,确保不发生重排序? );
(3)append
buffers_ 为当前写满的缓存块数组;
前端调用,向缓存区写入 Log,写满缓存后,唤醒后台线程清理缓存;
currentBuffer_ 写满后会替换为 nextBuffer_,若其也写满则申请新的缓存块;
(4)threadFunc
持有两个空的缓存块,newBuffer1 和 newBuffer2,局部变量数组 buffersToWrite
后端线程调用(若当前 buffers_ 为空,则阻塞;每隔一段时间醒来或可有前端提前唤醒);
将 currentBuffer_ 放入 buffers_,使用 newBuffer1替换 currentBuffer_ ,交换 buffersToWrite 和 buffers_,若 nextBuffer_ 为空,使用 newBuffer2 替换;
将 buffersToWrite 中缓存块写入日志文件,对 newBuffer1 和 newBuffer2 重新赋值,清空其他缓存块;
2. net 模块
2.1 时间相关类
2.1.1 Timestamp
内部使用 int64_t 表示,以 微秒 为单位
(1)now() 静态方法,返回 Timestamp(调用时的当前时间构造)
内部调用 gettimeofday(&tv, NULL);,
会计算从1970年1月1号00:00(UTC)到当前的时间跨度
struct timeval
{__time_t tv_sec; /* Seconds. */__suseconds_t tv_usec; /* Microseconds. */
};
time() 返回的 time_t ,是以秒为单位的;
(2)toFormattedString
gmtime_r 将 time_t 转换为年月日时分秒的形式;
2.1.2 Timer
时间事件,包含 到期时间、时间回调函数、时间间隔 interval_、是否可重复触发、sequence_(使用静态原子整数生成序列号)
(1)run 调用时间回调函数
(2)restart( now ) 将到期时间改为 now + interval_
2.1.3 TimerId
包含一个指向 Timer 的指针和一个 sequence_
2.1.4 TimerQueue
内部使用 set 作为 TimerList timers_,按到期时间 Timestamp 排序各时间事件
(1)构造函数
创建 timerfd 并使用 timerfd 设置 Channel,设置其读回调函数为 handleRead
(2)handleRead
-
1.获取所有过期的时间事件,从
activeTimers_中删除这些事件; -
2.清空
cancelingTimers_,调用每个事件对应的回调函数 -
3.调用
reset函数
(3)insert
将 Timer 根据到期时间插入到 timers_ 和 activeTimers_ 中;
其返回值标志,新加入的 Timer 是否是第一个元素(即到期时间最早的)
(4)reset
将可重复触发并且不在 cancelingTimers_ 中的时间事件调用 restart,重新插入到 timers_ 和 activeTimers_ 中;
否则就进行删除
如果当前 timers_ 不为空,获取下次到期时间,如果该时间有效,使用该时间调用 resetTimerfd 设置下次定时器到期时间;
(5)resetTimerfd 设置时钟下次到期时间,最小为 100 微妙
(6)addTimer
新构造一个 Timer(动态创建)
(7)addTimerInLoop
对时间事件调用 insert ;
(8)cancelInLoop
若其在 activeTimers_ 中,则将其从中删除,并释放;
否则,若 callingExpiredTimers_ 为 True,将其加入到 cancelingTimers_ 中;(只有在 handleRead中的第 2 步才有可能发生)
2.2 Channel
封装了 一个 fd 对应的事件及回调函数;(只属于一个 EventLoop)
events_ 表示 fd 上的注册事件
revents_ 接收到的事件(触发的事件)
封装了一些 readCallback_ 回调函数;
index_ 表明是否添加到了 Poller 中(-1 为未添加,1 为已添加,2 为已删除)
(1)handleEvent 根据 revents_ 调用相应的回调函数
(2)disenableXXX/enableXXX 方法都会调用 update() 注册读 / 写事件
设置 addedToLoop_ 为 Ture
调用 EventLoop 的 updateChannel 函数
调用该方法时,必定是 loop 线程;
(
在 EventLoop 构造时,注册 timefd、wakefd 可读
客户端 Connector::connecting 中,注册 sockfd 可写
服务端 Acceptor::listen(),注册 listenfd 可读
回调函数中调用
)

2.3 Poller
IO 复用基类(纯虚基类)
channels_ 为哈希表,key 为 fd,val 为 channel
(1)poll
内部调用 epoll_wait
如果有就绪的事件,调用 fillActiveChannels
(2)fillActiveChannels
设置触发 channel 的 revents_
将当前触发的 channel,加入到 activeChannels_
(3)updateChannel
根据 channel 的 index_ 调用 epoll_ctl;
如果 index_ 为 -1,需要添加到 channels_ 中
(4)removeChannel
从 channels_ 中删除 channel;
2.4 事件循环类
2.4.1 EventLoop

记录下创建 EventLoop 的线程 threadId_
(1)loop
循环执行下面语句:
- 清空
activeChannels_ - 调用
poll - 调用
activeChannels_中的每个 channel 的handleEvent - 调用
doPendingFunctors
(2)doPendingFunctors
callingPendingFunctors_ 设为 Ture
使用 swap 技巧减少了临界区;
调用 pendingFunctors_ 中的每个函数;
callingPendingFunctors_ 设为 False
(3)runInLoop
如果调用线程是 threadId_,直接运行函数;否则,调用 queueInLoop
(4)queueInLoop
将 回调函数 加入到 pendingFunctors_ 中(此步有锁保护)
如果调用线程不是 threadId_ ,调用 wakeup
(5)wakeup
eventfd_ 加 1
(6)updateChannel
断言调用线程为 threadId_
调用 poller 的 updateChannel
(7)添加定时器事件的流程如下图所示(在非 IO 线程中)

等定时器到期后,会从 poll 中返回,紧接着下一步调用 TimerQueue::handleRead(),然后调用定时器任务的回调函数;
(8)quit
设置 quit_ = true(上图中 loop 的循环条件为该变量)
如果当前不是事件循环的线程,则调用 wakeup() 唤醒事件循环
2.4.2 EventLoopThread
创建额外的一个线程专门执行 loop,并把 EventLoop 对象返回给用户;
2.4.3 EventLoopThreadPool
setThreadNum 设置线程数量
(1)start
创建并启动 numThreads_ 个 EventLoopThread;
2.5 TCP 服务端相关类
2.5.1 InetAddress
封装了 IPv4 或 IPv6 的 ip 地址和端口号;
内部数据为一个 union,为 sockaddr_in 或 sockaddr_in6
strchr() 用于查找字符串中的一个字符,并返回该字符在字符串中第一次出现的位置
(1)resolve(只能解析 IPv4)
调用 gethostbyname_r 解析主机名为 IP 地址,需要为其提供缓存供其内部查找过程使用;
链接: link
2.5.2 Socket
封装 socket fd;及 socket 选项(保活机制等)
listen 等函数的返回值检查工作
析构时关闭 fd(RAII,该类是 fd 的所有者,fd 的创建由 SocketsOps 封装的系统调用完成)
2.5.3 Buffer
vector<char> buffer_,读/写索引;类似循环数组,但这里是通过拷贝的方式实现的;
默认大小为 1024 + 8;
预留区(初始为 8 byte) | 读区 | 写区
(1)append
ensureWritableBytes 判断当前可写的空间是否大于 len
拷贝数据到写区,并 writerIndex_ += len
网络序和主机序互换通过 __bswap_ 函数完成
(2)makeSpace
当前可用空间是否大于 len,若是,则将读区的数据拷贝到数组最前面;
否则,调用 resize
(3)prepend
向预留区写入数据,并向前调整 readerIndex_ -= len;
(4)retrieveAsString 或 readInt32
读取完当前全部字节时,会调用 retrieveAll 将读/写索引设置为初始状态
(5)readFd
直接从 fd 读取数据到 Buffer 中,内部调用了 readv 函数
内部使用了额外的栈缓存char extrabuf[65536];,超出 Buffer 的数据会暂时存储在该缓存中,之后调用 makeSpace 扩容后再进行拷贝;
(6)shrink 使用 swap 技巧减少 Buffer 数组的空间占用;
2.5.4 TcpConnection
保存两个端点的地址
新建 Channel,并设置了 connectfd 的各个回调函数
输入 和 输出 Buffer
highWaterMark_ 高水位标志
(1)sendInLoop
如果当前写缓存中没有数据且没有注册写事件,则直接发送数据;
否则,会将数据向拷贝到 outputBuffer_ 中,如果没有注册写事件会进行注册;(channel_->enableWriting())
(2)handleRead
调用 Buffer 的 readFd 将数据保存到 inputBuffer_ 中
如果读取到了数据,调用 messageCallback_(业务逻辑)
如果 n == 0,调用 handleClose()
(3)handleWrite
如果将 outputBuffer_ 的数据全部发送完毕,就会删除写事件;
调用 writeCompleteCallback_(业务逻辑)
(4)connectEstablished
- 状态从 kConnecting 改为 kConnected
channel_绑定自身的智能指针,并注册读事件\- 调用
connectionCallback_(业务逻辑)
(5)startRead() / stopRead
调用 runInLoop 注册或删除读事件
(6)send
当前状态为 kConnected 时,通过runInLoop将 sendInLoop 添加到事件循环的中;
否则,什么也不做;(即关闭写端后再调用 send 是无效的)
(7)handleClose
删除所有注册事件
调用 connectionCallback_
调用 closeCallback_ (业务逻辑)
(8)shutdown(只是关闭写端)
设置状态为 kDisconnecting
如果当前注册了写事件,则什么也不做(即等待数据发送完毕,在写回调中会处理这种情况)
否则,调用 shutdown 系统调用关闭写端
(9)什么时候关闭读端?如何关闭?
注意:Socket 析构时会调用 close 系统调用,因此其析构时会关闭读写端(即整个连接);
TcpConnection 构造时,会有 socket_(new Socket(sockfd)),因此 TcpConnection 析构时会关闭整个连接;
std::unique_ptr<Socket> socket_
被动关闭时,相当于直接调用了 close 系统调用;
2.5.5 Acceptor
封装 listenfd 作用(非阻塞)
(1)listen() 向 IO 复用注册读事件
必须设置 newConnectionCallback_,否则收到连接就会关闭;
(2)当 accept 出现错误 EMFILE 时(本进程的文件描述符达到上限,无法为新连接创建 socket 文件描述符,因为新连接还在等待处理,epoll_wait 会立即返回,造成忙等待)
构造时创建了一个额外的空闲文件描述符 idleFd_
关闭 idleFd_ ,再调用 accept ,再关闭 idleFd_,最后重新打开一个额外的空闲文件描述符(在多线程中不保证正确,猜想是如果不加临界保护,关闭 idleFd_,其他线程可能正好调用 accept ,此时并未出错,但当前线程又陷入困境)
2.5.6 TcpServer
创建 EventLoopThreadPool
设置 acceptor_ 的回调函数 newConnection
维护各个连接的哈希表,key 为IP、Port、连接号组成的 string,val 为 conn 指针;
(1)newConnection
从线程池轮次获得 EventLoop (如果未设置线程池数量会返回 baseloop );
新建 TcpConnection,设置其一系列回调函数;(创建 TcpConnection 时会新建 Channel)
在对应事件循环 ioLoop 中调用 TcpConnection 的 connectEstablished 方法,注册读事件;
(2)start
调用线程池的 start
在 loop_ 中调用 acceptor_ 的 listen() 函数
(3)setThreadNum
设置线程池中线程的数量,如果不设置将只有一个线程,即一个事件循环;
(4)此处逻辑暗示了:主线程负责 listenfd ,每个子线程轮次负责 connfd
具体读写处理由 loop 中就绪事件的回调函数完成(业务逻辑由 Connect 等类中的回调函数完成,可以看作是在读写回调函数中又调用了业务逻辑函数)
注册修改事件大多都会放到 pendingFunctors_ 中
(5)可以在一个 loop 上运行多个服务端,即监听多个 listenfd;
2.6 客户端

2.6.1 Connector
(1)start / startInLoop
在 loop 中调用 connect() 函数
(2) connect()
创建 sockfd,调用 connect 函数
如果立即建立或正在建立过程中,调用 connecting(注册写事件)
handleWrite 中如果出错或是服务端、客户端都位于同一主机,调用 retry,
否则,成功建立连接,调用 newConnectionCallback_,即 TcpClient 中的 newConnection
(3)retry
关闭 sockfd
调用 runAfter 设置定时器事件(到期后的回调函数是 startInLoop)
2.6.2 TcpClient
新建 Connector
设置 Connector 的 connectionCallback_
(1)newConnection
新建 TcpConnection,设置其一系列回调函数;
调用 connectEstablished 注册读事件(注意回调、事件都是由 TcpConnection 管理)
(2)connect()
调用 Connector 的start
(3)disconnect()
调用连接的 connection_->shutdown();(有锁保护)
3. 使用教程
(1)IO 复用其实是复用线程,而非 IO 连接;
(2)每千兆比特每秒的吞吐量配一个 event loop;
3.1 本质
1.连接的建立
2.连接的断开
3.消息到达
3.5 消息发送完毕(将数据写入操作系统缓冲区)
在 TcpConnection 中的四个回调函数分别对应这几个事件;
3.2 muduo 线程模型

上图为 muduo 库的默认线程模型,适合小规模的计算;
如果有大量计算任务,可以再加一个线程池,如下所示:

3.3 非阻塞 IO 为什么必须使用应用缓存?
(1)接收时,数据可能没有一次性收全,已经收到的数据累积到 Buffer 中;
(2)发送数据时,只发送了一部分就填满了操作系统发送缓冲区,阻塞时间取决于对方接收数据的速率,剩余数据应该复制到应用缓冲区中;
3.4 IO 复用为何最好不要搭配阻塞 IO?
以下假设在 Berkeley 的实现中;
假设 listenfd 为阻塞 IO,以 accpet 为例,select 返回 listenfd 可读后,可能并不会马上调用 accpet(这也是维护一个已完成连接队列的原因),当客户在这期间中止该连接时(收到客户的 RST)这个已完成的连接被服务器 TCP 驱除出队列,如果此时队列中没有其他已完成的连接时,之后调用 accpet 时会一直阻塞,直到其他某个客户建立一个连接为止;
3.5 chat 高性能程序设计
(1)利用了 ThreadLocalSingleton
即每个事件循环都有自身线程的一个连接队列;这样在每个事件循环的内部分发时,由于是单线程就不需要加锁;
只需在 Server 端遍历各个 loop 时加锁即可;此步的速度比较快,因为只需于在事件循环中注册函数即可,不用进行等待;
(2)在设计并发的 Hub 程序时,也可以参考上述思路
每个事件循环都维护自己的 std::map<string, Topic>
(3)multiplexer
在没有连接到后台服务器时,接收到的连接都放弃掉
3.6
(1)Channel 中回调函数如何被调用?

(2)在实现 TimerQueue 中,不能直接使用 Timestamp 为 Key,可能有多个 Timer 到期时间相同,解决办法是:使用 multiset 或者 设法区分 Key;
muduo 中使用 std::pair<Timestamp, Timer*> 作为 Key
(3)既然使用 wakeupfd 是为了唤醒事件循环去执行用户回调,那么 doPendingFunctors 为什么不直接放到 EventLoop::handleRead 中?
- 假设已经执行过
EventLoop::handleRead,此时处理其他就绪事件回调函数,此时如果调用queueInLoop,由于当前处于 IO 线程且callingPendingFunctors_ = false,是无法调用wakeup的,即用户回调不能被及时处理;(要么必须得保证调用中不涉及queueInLoop,需要约束用户,要么设法调用wakeup) - 若把
doPendingFunctors直接放到EventLoop::handleRead,那么想执行用户回调,必须先执行wakeup函数;需要执行三个系统调用,write-poll-read,如果把doPendingFunctors放到外面,那么这些就绪事件回调函数都可以直接使用这个doPendingFunctors,节省系统调用;
例如,removeConnectionInLoop中就直接调用了queueInLoop;
3.6 为何在 removeConnectionInLoop 中调用了 queueInLoop 方法?
(1)这主要是为了延长 TcpConnection 声明周期,使用 queueInLoop 方法后,其生命期会延长到 doPendingFunctors 函数执行完毕;
若不使用 queueInLoop 而使用 runInLoop 时,在 removeConnectionInLoop 执行完毕后,引用计数就会降为 0,将会析构 TcpConnection ,从而析构 Channel,这会导致无法返回到 Channel::handleEvent 中;(如果直接使用 runInLoop ,在程序中的表现是,析构 Channel 会出错)
需要注意的是,调用方式为 closeCallback_(shared_from_this()); ,即使是引用传递但由于是右值,在函数执行完后就会析构;
(2)上述这种做法对 s06 的实现是必须的,但在 muduo 实现中,Channel 使用了弱引用,在调用前会升级成 shared_ptr,这就保证了在从 handleEvent 返回前,Channel 不会被析构,把这部分代码添加到 s06 中 TcpConnection 可以正常析构,那么在 muduo 实现中是否还有必要使用 queueInLoop 呢?
if (tied_)
{guard = tie_.lock();if (guard){handleEventWithGuard(receiveTime);}
}
// 在建立连接时,用细线绑定
void TcpConnection::connectEstablished()
{...channel_->tie(shared_from_this());...
}
在采用线程池的事件循环中,muduo 中处理被动关闭时的流程可能如下所示:

这里的 queueInLoop() 替换为 runInLoop 效果是相同的;对单线程事件循环而言,如果进行替换,会在 EventLoop 中执行完 connectDestroyed 后才返回到 TcpServer 处,逻辑上感觉没什么问题;
可能有问题的地方在于,在就绪事件回调函数中,删除了这个就绪事件,是否会造成迭代器失效等问题,muduo 中采用了复制的方式将就绪事件都保存到了 currentActiveChannel_;在源代码中修改运行部分测试程序后,暂未发现什么问题;
参考
《Linux 多线程服务端编程》