重点内容

视频讲解:《C++Linux编程进阶:从0实现muduo C++网络框架系列》-第6讲.C++死锁问题如何分析调试-原子操作,互斥量,条件变量的封装
代码改动
lesson6代码
-  
实现:base/Atomic.h
 -  
实现:base/Mutex.h
 -  
实现:base/Condition.h/cc
 -  
examples/test_atomic_mutex.cc
 

1 AtomicIntegerT原子操作封装
1.1 封装意义
目的是更符号项目的用法,用起来更得心应手。
这种封装在网络库中特别有用,因为网络库经常需要处理并发场景,比如:
-  
统计连接数
 -  
管理连接状态
 -  
处理引用计数
 -  
实现无锁队列
 -  
实现线程安全的计数器
 
1.2 类图
AtomicIntegerT<T>
├── 私有成员
│   └── volatile T value_
├── 构造/析构
│   ├── AtomicIntegerT()	// 默认构造函数,初始值为0
│   └── explicit AtomicIntegerT(T x)	// 构造函数,禁止隐式类型转换
├── 原子读取操作
│   └── T get() // 原子性地读取value_的值
├── 原子修改操作
│   ├── T getAndSet(T newValue) // 原子性地将value_设置为newValue,并返回之前的值
│   ├── T getAndAdd(T x)// 原子性地将value_加上x,并返回之前的值
│   ├── T addAndGet(T x)// 原子性地将value_加上x,并返回新的值
│   ├── T incrementAndGet()	// 原子性地将value_加1,并返回之前的值
│   └── T decrementAndGet()	// 原子性地将value_减1,并返回之前的值
└── 便捷操作├── void add(T x)	// 原子性地将value_加上x├── void increment()	// 原子性地将value_加1└── void decrement()	// 原子性地将value_减1 
使用typedef 定义了AtomicInt32 AtomicInt64
// 定义32位和64位原子整数类型
typedef detail::AtomicIntegerT<int32_t> AtomicInt32;
typedef detail::AtomicIntegerT<int64_t> AtomicInt64; 
1.3 原子操作实现原理
-  
使用GCC内置的原子操作函数:
 
__sync_val_compare_and_swap  // CAS操作
__sync_lock_test_and_set     // 原子交换
__sync_fetch_and_add         // 原子加法// 原子性地读取value_的值
T get()
{return __sync_val_compare_and_swap(&value_, 0, 0);
}// 原子性地将value_设置为newValue,并返回之前的值
T getAndSet(T newValue)
{return __sync_lock_test_and_set(&value_, newValue);
}// 原子性地将value_加上x,并返回之前的值
T getAndAdd(T x)
{return __sync_fetch_and_add(&value_, x);
} 
-  
这些函数在编译时会被转换为CPU的原子指令
 -  
比如x86平台会使用lock前缀的指令
 
1.4 在muduo中的使用场景
class Thread : noncopyable
{static AtomicInt32 numCreated_;  //静态变量,用来计算创建的线程序号
};void Thread::setDefaultName()
{int num = numCreated_.incrementAndGet();   //创建的线程序号if (name_.empty()){char buf[32];snprintf(buf, sizeof buf, "Thread%d", num);  //设置唯一的线程名字name_ = buf;}
} 
class TcpServer {AtomicInt32 started_;  // 标记服务是否启动
};void TcpServer::start()
{if (started_.getAndSet(1) == 0)  // 默认值是0,获取之前的值并设置新值,避免程序二次启动{threadPool_->start(threadInitCallback_);assert(!acceptor_->listening());loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));}
} 
class Timer : noncopyable
{static AtomicInt64 s_numCreated_; //创建的定时器序号Timer(TimerCallback cb, Timestamp when, double interval): callback_(std::move(cb)),expiration_(when),interval_(interval),repeat_(interval > 0.0),sequence_(s_numCreated_.incrementAndGet())  //当前定时器的序号{ }
} 
1.5 使用示例
// 测试原子操作
void testAtomic()
{printf("Testing Atomic operations...\n");AtomicInt32 a0;assert(a0.get() == 0);assert(a0.getAndAdd(1) == 0);assert(a0.get() == 1);assert(a0.addAndGet(2) == 3);assert(a0.get() == 3);assert(a0.incrementAndGet() == 4);assert(a0.get() == 4);assert(a0.decrementAndGet() == 3);assert(a0.get() == 3);printf("Atomic test passed!\n");
} 
2 MutexLock/MutexLockGuard互斥量封装
2.1 封装意义
-  
提供RAII风格的互斥锁管理
 -  
追踪锁的持有者,方便调试
 -  
防止误用和死锁
 
2.2 类图
MutexLock
├── 私有成员
│   ├── pthread_mutex_t mutex_
│   └── pid_t holder_
├── 构造/析构
│   ├── MutexLock()
│   └── ~MutexLock()
├── 锁操作
│   ├── void lock()
│   └── void unlock()
├── 调试功能
│   ├── bool isLockedByThisThread()
│   └── void assertLocked()
└── 友元类└── ConditionMutexLockGuard
├── 私有成员
│   └── MutexLock& mutex_
├── 构造/析构
│   ├── MutexLockGuard(MutexLock& mutex)
│   └── ~MutexLockGuard()
└── 禁止拷贝└── noncopyable 
2.3 实现原理
-  
使用pthread_mutex作为底层实现
 -  
通过RAII自动管理锁的获取和释放
 -  
通过gettid()追踪锁的持有者
 
2.4 在muduo中的使用场景
使用的场景较多,这里只展示了在EventLoop,ThreadPool中的使用。
void EventLoop::queueInLoop(Functor cb)
{{MutexLockGuard lock(mutex_);  //加锁pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
} 
size_t ThreadPool::queueSize() const
{MutexLockGuard lock(mutex_);   //加锁return queue_.size();
} 
2.4 使用示例
muduo库封装的Mutex功能很多,但我们目前知道调用
MutexLockGuard lock(mutex);  加锁就行,其他的用法不做要求。 
isLockedByThisThread 是一个用于调试和断言的重要函数,让我详细解释它的使用:
1.基本用法(掌握这里就行)
MutexLock mutex;
{MutexLockGuard lock(mutex);
} 
2.常见使用场景(不做要求)
class ThreadSafeClass {
public:void method1() {MutexLockGuard lock(mutex_);// 确保在持有锁的情况下调用method2method2();  // 内部方法调用}private:void method2() {// 确保调用此方法时已经持有锁mutex_.assertLocked();  // 如果未持有锁,会触发断言// 执行操作...}MutexLock mutex_;
}; 
3.调试死锁(不做要求)
class Resource {
public:void access() {MutexLockGuard lock(mutex_);// 检查是否真的持有锁if (!mutex_.isLockedByThisThread()) {printf("Warning: Lock not held by current thread!\n");}// 执行操作...}private:MutexLock mutex_;
}; 
4.防止误用(不做要求)
class SafeCounter {
public:void increment() {// 确保在持有锁的情况下修改数据if (!mutex_.isLockedByThisThread()) {printf("Error: Must hold lock to modify counter!\n");return;}count_++;}private:MutexLock mutex_;int count_;
}; 
5.实际应用示例(不做要求)
class ThreadSafeQueue {
public:void push(int value) {MutexLockGuard lock(mutex_);// 确保在持有锁的情况下调用内部方法doPush(value);}private:void doPush(int value) {// 确保调用此方法时已经持有锁mutex_.assertLocked();queue_.push(value);}MutexLock mutex_;std::queue<int> queue_;
}; 
3 Condition条件变量的封装
3.1 封装意义
-  
提供线程间的同步机制
 -  
实现生产者-消费者模式
 -  
支持超时等待功能
 -  
与MutexLock配合使用,确保线程安全
 
3.2 类图
Condition
├── 私有成员
│   ├── MutexLock& mutex_      // 互斥锁引用
│   └── pthread_cond_t pcond_  // 条件变量
├── 构造/析构
│   ├── explicit Condition(MutexLock& mutex)
│   └── ~Condition()
├── 等待操作
│   ├── void wait()           // 等待通知
│   └── bool waitForSeconds(double seconds)  // 超时等待
└── 通知操作├── void notify()         // 通知一个等待线程└── void notifyAll()      // 通知所有等待线程 
3.3 实现原理
本质是调用pthread线程库的接口 以及封装的MutexLock。
class Condition : noncopyable....MutexLock& mutex_;        // 互斥锁引用pthread_cond_t pcond_;    // 条件变量
};
 
3.4 在muduo中的使用场景
用于ThreadPool任务队列的生产者-消费者模型。
用于EventLoopThread等等IO Loop线程创建成功。
class ThreadPool : noncopyable
{mutable MutexLock mutex_;Condition notEmpty_ GUARDED_BY(mutex_);Condition notFull_ GUARDED_BY(mutex_);  //用来处理任务队列(生产者消费者模式)
} 
EventLoop* EventLoopThread::startLoop()
{assert(!thread_.started());thread_.start();EventLoop* loop = NULL;{MutexLockGuard lock(mutex_);while (loop_ == NULL){cond_.wait();    //等等loop线程创建成功}loop = loop_;}return loop;
}void EventLoopThread::threadFunc()
{EventLoop loop;if (callback_){callback_(&loop);}{MutexLockGuard lock(mutex_);loop_ = &loop;cond_.notify();  //发通知,io loop线程已经准备好了}loop.loop();//assert(exiting_);MutexLockGuard lock(mutex_);loop_ = NULL;
} 
使用示例
// 生产者-消费者模式
class Buffer {
public:Buffer(int size) : size_(size), notEmpty_(mutex_), notFull_(mutex_) {}void put(int item) {MutexLockGuard lock(mutex_);while (queue_.size() == size_) {notFull_.wait();  // 等待队列不满}queue_.push(item);notEmpty_.notify();  // 通知消费者}int get() {MutexLockGuard lock(mutex_);while (queue_.empty()) {notEmpty_.wait();  // 等待队列不空}int item = queue_.front();queue_.pop();notFull_.notify();  // 通知生产者return item;}private:MutexLock mutex_;Condition notEmpty_;Condition notFull_;std::queue<int> queue_;int size_;
}; 
关键特性
-  
与MutexLock配合使用
 -  
支持超时等待
 -  
提供广播通知
 -  
自动管理锁的释放和获取
 
使用建议
-  
总是与MutexLock配合使用
 -  
使用while循环检查条件
 -  
注意通知的时机
 -  
合理使用notify和notifyAll
 -  
考虑使用超时机制避免死锁
 
这种封装在网络库中特别有用,因为:
-  
需要处理异步事件
 -  
需要实现任务队列
 -  
需要处理定时器
 -  
需要实现线程池
 -  
需要处理生产者-消费者模式
 
4 完整的测试范例
完整测试代码:examples/test_atomic_mutex.cc
4.1 测试框架图
test_atomic_mutex.cc
├── 基础功能测试
│   ├── testAtomic()     // 原子操作测试
│   └── testMutex()      // 互斥锁测试
└── 条件变量测试└── testCondition()  // 生产者-消费者模式测试├── Buffer类├── producer线程└── consumer线程 
4.2 原子操作测试流程图
testAtomic()
├── 初始化测试
│   └── AtomicInt32 a0
├── 基本操作测试
│   ├── get() == 0
│   ├── getAndAdd(1) == 0
│   └── get() == 1
├── 复合操作测试
│   ├── addAndGet(2) == 3
│   └── get() == 3
└── 自增自减测试├── incrementAndGet() == 4├── get() == 4├── decrementAndGet() == 3└── get() == 3 
4.3 互斥锁测试流程图
testMutex()
├── 创建互斥锁
│   └── MutexLock mutex
├── 加锁测试
│   ├── MutexLockGuard lock(mutex)
│   └── assert(mutex.isLockedByThisThread())
└── 解锁测试└── 作用域结束自动解锁 
4.4 生产者-消费者模式原理图
[生产者线程1]     [生产者线程2]↓               ↓└──────┐    ┌──┘↓    ↓[Buffer缓冲区]↓    ↓┌──────┘    └──┐↓               ↓
[消费者线程1]  [消费者线程2]  [消费者线程3] 
4.5 Buffer类结构图
Buffer类
├── 私有成员
│   ├── size_t size_           // 缓冲区大小
│   ├── MutexLock mutex_       // 互斥锁
│   ├── Condition notEmpty_    // 不空条件变量
│   ├── Condition notFull_     // 不满条件变量
│   └── std::queue<int> queue_ // 数据队列
└── 公共方法├── put(int item)          // 生产者方法└── get()                  // 消费者方法 
4.6 线程同步流程图

4.6 测试输出
具体看代码打印
lesson6/build$ ./bin/test_atomic_mutex
5 章节总结
重点:
-  
原子封装的目的是提供对于项目更易用的接口,并大致了解下
 -  
gcc原子操作相关的函数,比如__sync_fetch_and_add
 -  
了解Atomic MutexLock和Condition等封装在muduo网络库的使用。
 -  
掌握MutexLock类里保存获得锁的线程ID的作用,这样更方便分析死锁问题。