文章目录
- 背景与动机
- 30.1 条件变量的定义与基本操作 (Definition and Routines)
- 30.2 生产者/消费者问题 (Bounded Buffer Problem)
- 30.3 覆盖条件 (Covering Conditions) 与 `pthread_cond_broadcast`
- 30.4 总结
背景与动机
到目前为止,我们已经学习了锁 (Locks) 作为并发原语,它允许线程互斥地访问临界区,防止数据竞争。然而,仅有锁是不够的。在许多并发场景中,一个线程可能需要等待某个条件 (condition) 变为真之后才能继续执行。
例子:等待子线程完成 (Thread Join)
一个父线程创建了一个子线程。父线程可能希望等待子线程执行完毕后,再继续执行自己的后续任务。这种等待子线程完成的操作通常称为 join()
。
简单但不高效的尝试:自旋等待 (Spin-waiting using a shared variable - Figure 30.2)
// Figure 30.2: Parent Waiting For Child: Spin-based Approach
volatile int done = 0; // 共享变量,标记子线程是否完成void *child(void *arg) {printf("child\n");done = 1; // 子线程完成,设置标志return NULL;
}int main(int argc, char *argv[]) {printf("parent: begin\n");pthread_t c;pthread_create(&c, NULL, child, NULL); // 创建子线程while (done == 0) // 父线程在此自旋等待; // spinprintf("parent: end\n");return 0;
}
- 问题: 父线程会持续地检查
done
变量,这是一种忙等待 (busy-waiting) 或自旋 (spinning)。这种方式极度浪费 CPU 时间,因为父线程在等待期间本可以被调度出去,让其他有用的工作执行。
核心问题 (THE CRUX): 如何优雅地等待一个条件?
在多线程程序中,线程经常需要等待某个条件成立才能继续。简单地自旋等待不仅效率低下,浪费CPU,有时甚至可能不正确。那么,线程应该如何有效地等待一个条件呢?
30.1 条件变量的定义与基本操作 (Definition and Routines)
条件变量 (Condition Variable - CV) 是一种显式的队列,线程可以在某个条件不满足时,将自己放入这个队列中并进入休眠状态(通过等待该条件)。当其他线程改变了可能影响该条件的状态时,它可以通知 (signal) 一个或多个正在等待该条件的休眠线程,唤醒它们以重新检查条件并继续执行。
- 历史渊源: 条件变量的思想可以追溯到 Dijkstra 的“私有信号量 (private semaphores)”和 Hoare 在其监视器 (Monitors) 工作中提出的“条件变量”。
- 声明 (POSIX Pthreads):
pthread_cond_t c;
(还需要正确初始化) - 核心操作 (POSIX Pthreads):
pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex)
:- 原子地 (atomically):
- 释放传入的
mutex
。 - 将调用线程置于休眠状态,并将其加入与
cv
关联的等待队列。
- 释放传入的
- 当线程被唤醒时(通常由其他线程调用
pthread_cond_signal
或pthread_cond_broadcast
),pthread_cond_wait
会在返回之前重新获取 (re-acquire) 传入的mutex
。 - 前提条件: 调用
pthread_cond_wait
时,当前线程必须已经持有mutex
。
- 原子地 (atomically):
pthread_cond_signal(pthread_cond_t *cv)
:- 唤醒至少一个(通常是一个)正在
cv
上等待的线程(如果存在的话)。 - 如果没有任何线程在
cv
上等待,signal
操作通常无效(即信号丢失)。
- 唤醒至少一个(通常是一个)正在
pthread_cond_broadcast(pthread_cond_t *cv)
:- 唤醒所有正在
cv
上等待的线程。
- 唤醒所有正在
为什么 wait()
需要一个互斥锁参数?
这是条件变量设计的核心,为了防止竞争条件和“丢失的唤醒 (lost wakeup)”问题。
- 保护条件检查: 线程在决定是否需要等待之前,必须检查某个共享状态(即条件)。这个检查本身以及后续可能的
wait
操作必须是原子的。互斥锁确保了在检查条件和进入等待状态之间,条件本身不会被其他线程修改。 - 原子释放锁和休眠:
pthread_cond_wait
的关键在于它能原子地释放锁并将线程置于休眠。如果这两步不是原子的(例如,先释放锁,再尝试休眠),那么在释放锁之后、线程实际休眠之前,另一个线程可能已经修改了条件并发送了信号。这个信号就会因为没有线程在等待而被丢失,导致调用wait
的线程永久休眠。
使用条件变量解决父线程等待子线程 (Figure 30.3):
// Figure 30.3: Parent Waiting For Child: Use A Condition Variable
int done = 0; // 共享状态变量,表示子线程是否完成
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; // 互斥锁,保护 done
pthread_cond_t c = PTHREAD_COND_INITIALIZER; // 条件变量// 子线程退出时调用
void thr_exit() {pthread_mutex_lock(&m); // 获取锁,保护 donedone = 1; // 修改共享状态pthread_cond_signal(&c); // 通知等待在 c 上的线程pthread_mutex_unlock(&m); // 释放锁
}void *child(void *arg) {printf("child\n");thr_exit(); // 调用封装好的退出函数return NULL;
}// 父线程等待子线程完成时调用
void thr_join() {pthread_mutex_lock(&m); // 获取锁,保护 done 的检查while (done == 0) { // 关键:使用 while 循环检查条件// 在调用 wait 时,锁 m 被持有// wait 会原子地释放 m 并使父线程休眠pthread_cond_wait(&c, &m);// 当被唤醒时,wait 会重新获取 m,然后返回}// 此处,父线程重新持有了锁 m,并且知道 done == 1pthread_mutex_unlock(&m); // 释放锁
}int main(int argc, char *argv[]) {printf("parent: begin\n");pthread_t p_child; // 将变量名从 c 改为 p_child 以示区分pthread_create(&p_child, NULL, child, NULL);thr_join(); // 父线程等待子线程printf("parent: end\n");return 0;
}
执行流程分析 (两种情况):
- 父线程先运行到
thr_join()
:- 父线程获取锁
m
。 - 检查
done
(为0),进入while
循环。 - 调用
pthread_cond_wait(&c, &m)
。父线程原子地释放锁m
并进入休眠状态,等待条件变量c
。 - 子线程运行,打印 “child”。
- 子线程调用
thr_exit()
:- 获取锁
m
。 - 设置
done = 1
。 - 调用
pthread_cond_signal(&c)
,唤醒正在c
上等待的父线程。 - 释放锁
m
。
- 获取锁
- 父线程从
pthread_cond_wait
返回,此时它已重新获取了锁m
。 - 父线程再次检查
while (done == 0)
,此时done
为 1,循环终止。 - 父线程释放锁
m
,打印 “parent: end”。
- 父线程获取锁
- 子线程先运行并完成:
- 子线程运行,打印 “child”。
- 子线程调用
thr_exit()
:- 获取锁
m
。 - 设置
done = 1
。 - 调用
pthread_cond_signal(&c)
。此时可能没有线程在c
上等待(如果父线程还没调用thr_join
),信号“丢失”(这是正常的,信号不是计数器)。 - 释放锁
m
。子线程结束。
- 获取锁
- 父线程运行,调用
thr_join()
:- 获取锁
m
。 - 检查
done
(为1),while (done == 0)
条件不满足,循环不执行。
- 获取锁
- 父线程释放锁
m
,打印 “parent: end”。
关键点:while
循环 vs. if
语句
- 必须使用
while
循环来重新检查条件:while (condition_is_false) { pthread_cond_wait(...); }
- 原因 (Mesa Semantics & Spurious Wakeups):
- Mesa 语义 (Mesa Semantics): 当一个线程被
pthread_cond_signal
唤醒时,它仅仅是一个提示 (hint),表明条件可能已经为真。在被唤醒的线程重新获取锁并实际运行之前,其他线程可能已经运行并再次改变了条件,使得条件又变回假。因此,被唤醒的线程必须重新检查条件。 - 虚假唤醒 (Spurious Wakeups): 在某些操作系统或线程库的实现中,线程有时可能会在没有被显式
signal
或broadcast
的情况下从wait
中“虚假地”唤醒。虽然不常见,但为了代码的健壮性,必须重新检查条件。 if
语句只检查一次条件,如果条件在被唤醒后到实际执行前又变假,或者发生了虚假唤醒,程序逻辑就会出错。
- Mesa 语义 (Mesa Semantics): 当一个线程被
错误尝试分析 (加深理解):
-
没有状态变量
done
(Figure 30.4):// thr_exit() // Pthread_mutex_lock(&m); // Pthread_cond_signal(&c); // 只有信号,没有状态 // Pthread_mutex_unlock(&m);// thr_join() // Pthread_mutex_lock(&m); // Pthread_cond_wait(&c, &m); // 直接等待,不检查状态 // Pthread_mutex_unlock(&m); ```* **问题:** 如果子线程先运行并调用 `thr_exit()` 发送信号,此时父线程可能还没调用 `thr_join()` 进入等待。信号会丢失。当父线程稍后调用 `thr_join()` 并进入 `wait` 时,它将永远等待,因为已经没有信号会再来唤醒它了。 * **教训:** 条件变量通常需要与一个**显式的状态变量**一起使用,该状态变量记录了线程感兴趣的实际条件。锁、等待和信号都是围绕这个状态变量进行的。
-
没有锁的
wait
和signal
(Figure 30.5 - 概念性错误,实际 Pthreads API 不允许):// thr_exit() // done = 1; // Pthread_cond_signal(&c); // 没有锁// thr_join() // if (done == 0) // Pthread_cond_wait(&c); // 没有锁
- 问题 (细微的竞争条件 - Lost Wakeup):
- 父线程调用
thr_join()
。 - 检查
done
(为0)。 - 在父线程调用
Pthread_cond_wait
使自己休眠之前,父线程被中断。 - 子线程运行,设置
done = 1
,并调用Pthread_cond_signal
。由于父线程还没进入等待,信号丢失。 - 父线程恢复运行,执行
Pthread_cond_wait
,进入永久休眠。
- 父线程调用
- 教训:
pthread_cond_wait
必须与互斥锁一起使用,并且互斥锁在调用wait
时必须被持有,以原子地完成条件检查、释放锁和进入休眠的过程。
- 问题 (细微的竞争条件 - Lost Wakeup):
TIP: ALWAYS HOLD THE LOCK WHILE SIGNALING (通常情况下)
- 虽然在某些特定情况下,不持有锁进行
signal
可能是安全的(例如,如果确信条件状态的改变和signal
操作本身是原子的,并且之后不会再访问受锁保护的数据),但最简单和最安全的做法是在调用pthread_cond_signal
或pthread_cond_broadcast
时持有相关的互斥锁。这可以确保在修改共享状态和发出信号之间没有竞争。 - 对于
pthread_cond_wait
,持有锁是强制的语义要求。
30.2 生产者/消费者问题 (Bounded Buffer Problem)
这是一个经典的并发同步问题,用于演示条件变量的强大功能。
- 场景:
- 生产者 (Producer) 线程: 生成数据项并将其放入一个共享的、有限大小的缓冲区 (bounded buffer) 中。
- 消费者 (Consumer) 线程: 从该缓冲区中取出数据项并进行处理。
- 同步要求:
- 生产者不能在缓冲区已满时放入数据(需要等待缓冲区有空位)。
- 消费者不能在缓冲区为空时取出数据(需要等待缓冲区有数据)。
- 对共享缓冲区的访问(放入和取出操作)必须是互斥的。
简单版本:单元素缓冲区 (Single Buffer - Figure 30.6, 30.7)
buffer
: 一个共享的整数变量,用于存放数据。count
: 一个共享的整数变量,0表示缓冲区为空,1表示缓冲区已满。put(value)
: 假设缓冲区为空 (count==0
),放入数据,设置count=1
。get()
: 假设缓冲区为满 (count==1
),取出数据,设置count=0
。
首次尝试:单个条件变量和 if
语句 (Figure 30.8 - Broken)
// 共享变量
// int loops;
// cond_t cond; // 单个条件变量
// mutex_t mutex; // 单个互斥锁
// int count = 0; // 缓冲区状态 (0:空, 1:满)
// int buffer; // 单元素缓冲区void *producer(void *arg) {for (int i = 0; i < loops; i++) {pthread_mutex_lock(&mutex); // p1if (count == 1) // p2: 检查缓冲区是否已满pthread_cond_wait(&cond, &mutex); // p3: 如果满了,等待put(i); // p4: 放入数据 (内部会设置 count=1)pthread_cond_signal(&cond); // p5: 通知消费者有数据了pthread_mutex_unlock(&mutex); // p6}return NULL;
}void *consumer(void *arg) {// for (int i = 0; i < loops; i++) { // 书中是 while(1)while(1) {pthread_mutex_lock(&mutex); // c1if (count == 0) // c2: 检查缓冲区是否为空pthread_cond_wait(&cond, &mutex); // c3: 如果空了,等待int tmp = get(); // c4: 取出数据 (内部会设置 count=0)pthread_cond_signal(&cond); // c5: 通知生产者有空位了pthread_mutex_unlock(&mutex); // c6printf("%d\n", tmp);}return NULL;
}
-
问题1 (使用
if
而不是while
- Mesa Semantics):- 如 Figure 30.9 的线程轨迹所示:
- 消费者 Tc1 运行,发现
count == 0
,调用wait
并休眠。 - 生产者 Tp 运行,
put()
数据,count
变为 1,signal()
唤醒 Tc1。Tc1 进入就绪队列,但尚未运行。 - 关键: 另一个消费者 Tc2 “潜入”,获取锁,发现
count == 1
(因为 Tp 刚生产完),于是 Tc2 执行get()
,count
变为 0。Tc2 消耗了数据。 - 现在 Tc1 终于运行,它从
wait
返回 (因为之前被 Tp 唤醒了)。由于用的是if
,它不会重新检查count
。 - Tc1 直接调用
get()
,但此时count
已经是 0 了!get()
内部的assert(count == 1)
会失败。
- 消费者 Tc1 运行,发现
- 修复: 将
if (condition)
改为while (condition)
,如 Figure 30.10。这样,Tc1 被唤醒后,会重新检查while (count == 0)
,发现条件仍然为真 (因为 Tc2 已经消费了),于是 Tc1 会再次进入wait
休眠。
- 如 Figure 30.9 的线程轨迹所示:
-
问题2 (单个条件变量的混淆 - “Everyone asleep…” bug):
- 即使使用了
while
循环,单个条件变量仍然存在问题,如 Figure 30.11 的线程轨迹所示:- 两个消费者 Tc1 和 Tc2 先后运行,都发现缓冲区为空,都在
cond
上调用wait
休眠。 - 生产者 Tp 运行,
put()
数据,count
变为 1。Tp 调用signal()
唤醒一个等待者(假设是 Tc1)。 - Tp 尝试再次生产,发现
count == 1
(缓冲区已满),Tp 也在cond
上调用wait
休眠。- 现在状态:Tc1 就绪 (刚被唤醒),Tc2 休眠 (等待数据),Tp 休眠 (等待空位)。
- Tc1 运行,从
wait
返回,重新检查while (count == 0)
,条件为假 (因为count == 1
)。 - Tc1 调用
get()
,count
变为 0。 - 关键: Tc1 调用
pthread_cond_signal(&cond)
。此时,等待在cond
上的有两个线程:Tc2 (消费者,等待数据) 和 Tp (生产者,等待空位)。 - 假设
signal
唤醒了 Tc2 (另一个消费者)。 - Tc2 运行,从
wait
返回,重新检查while (count == 0)
。由于 Tc1 刚消费完,count
是 0,所以条件为真,Tc2 再次进入wait
休眠。 - 灾难发生: Tp (唯一能生产数据的线程) 仍然在休眠,等待空位。Tc1 已经消费完退出了循环(或准备下一次消费)。Tc2 也在休眠,等待数据。所有相关的线程都可能陷入休眠,没有人能打破僵局。
- 两个消费者 Tc1 和 Tc2 先后运行,都发现缓冲区为空,都在
- 原因: 消费者 Tc1 消耗完数据后,它应该唤醒一个生产者(因为现在有空位了)。但是由于只有一个条件变量,它无法区分应该唤醒谁,它错误地唤醒了另一个消费者 Tc2。
- 即使使用了
正确解决方案:使用两个条件变量 (Figure 30.12)
cond_t empty;
// 当缓冲区从满变为空时,生产者在此等待,消费者发出信号cond_t fill;
// 当缓冲区从空变为有时,消费者在此等待,生产者发出信号
// 共享变量
// int loops;
// cond_t empty, fill; // 两个条件变量
// mutex_t mutex;
// int count = 0;
// int buffer;void *producer(void *arg) {for (int i = 0; i < loops; i++) {pthread_mutex_lock(&mutex);while (count == 1) // 缓冲区满了?pthread_cond_wait(&empty, &mutex); // 等待 empty 条件 (由消费者发出)put(i);pthread_cond_signal(&fill); // 通知消费者有数据了 (发信号给 fill 条件)pthread_mutex_unlock(&mutex);}return NULL;
}void *consumer(void *arg) {while (1) {pthread_mutex_lock(&mutex);while (count == 0) // 缓冲区空了?pthread_cond_wait(&fill, &mutex); // 等待 fill 条件 (由生产者发出)int tmp = get();pthread_cond_signal(&empty); // 通知生产者有空位了 (发信号给 empty 条件)pthread_mutex_unlock(&mutex);printf("%d\n", tmp);}return NULL;
}
- 逻辑:
- 生产者在缓冲区满时,等待在
empty
条件变量上(期望消费者消费后发出empty
信号)。生产数据后,它向fill
条件变量发信号,通知消费者有数据可取。 - 消费者在缓冲区空时,等待在
fill
条件变量上(期望生产者生产后发出fill
信号)。消费数据后,它向empty
条件变量发信号,通知生产者有空位可用。
- 生产者在缓冲区满时,等待在
- 优点: 信号被导向了正确的类型的等待线程,避免了之前消费者唤醒消费者或生产者唤醒生产者的混淆问题。
最终的生产者/消费者解决方案 (多个缓冲区槽位 - Figure 30.13, 30.14)
- 将单元素缓冲区扩展为一个真正的循环队列(数组实现),包含
fill_ptr
(下一个填充位置),use_ptr
(下一个使用位置), 和count
(当前缓冲区中的元素数量)。 put()
和get()
函数相应修改以操作这个循环队列。- 条件检查的改变:
- 生产者:
while (count == MAX)
(缓冲区是否已满,MAX 是缓冲区总容量) - 消费者:
while (count == 0)
(缓冲区是否为空)
- 生产者:
- 条件变量和锁的逻辑与双条件变量的单槽版本相同:
- 生产者等待
empty
,通知fill
。 - 消费者等待
fill
,通知empty
。
- 生产者等待
- 好处:
- 更高的并发性和效率: 允许多个数据项在被消费前先生产出来,减少了生产者和消费者的直接同步等待,尤其是在生产和消费速率不均时,可以起到缓冲作用。
- 减少上下文切换: 如果缓冲区足够大,生产者可以连续生产多个条目而无需等待消费者,反之亦然。
30.3 覆盖条件 (Covering Conditions) 与 pthread_cond_broadcast
有时,一个线程发出信号时,它可能不清楚究竟应该唤醒哪一个或哪些等待的线程,或者多个等待线程的条件都可能因为这次状态改变而变为真。
例子:内存分配器 (Figure 30.15)
allocate(size)
: 线程请求分配size
大小的内存。如果当前剩余内存bytesLeft < size
,则线程需要等待。free(ptr, size)
: 线程释放size
大小的内存,bytesLeft += size
。此时,应该唤醒等待分配内存的线程。
// 共享变量
// int bytesLeft = MAX_HEAP_SIZE;
// cond_t c;
// mutex_t m;void *allocate(int size) {pthread_mutex_lock(&m);while (bytesLeft < size) { // 内存不足,等待pthread_cond_wait(&c, &m);}// 从堆中获取内存 ...void *ptr = ...; // 假设从某处获取了内存bytesLeft -= size;pthread_mutex_unlock(&m);return ptr;
}void free(void *ptr, int size) {pthread_mutex_lock(&m);bytesLeft += size;// 问题:应该唤醒谁?pthread_cond_signal(&c); // 只唤醒一个,可能是错误的那个pthread_mutex_unlock(&m);
}
- 问题:
- 线程 Ta 调用
allocate(100)
,bytesLeft
为 0,Ta 休眠。 - 线程 Tb 调用
allocate(10)
,bytesLeft
仍为 0,Tb 也休眠。 - 线程 Tc 调用
free(ptr, 50)
,bytesLeft
变为 50。Tc 调用pthread_cond_signal(&c)
。 - 如果
signal
唤醒了 Ta (需要100字节),Ta 重新检查条件bytesLeft < 100
,发现仍然为真 (50 < 100),Ta 再次休眠。 - 而 Tb (只需要10字节) 本应该被唤醒,因为现在有足够的内存 (50 > 10),但它没有被唤醒,仍在休眠。
- 线程 Ta 调用
- 原因:
free
操作无法知道哪个等待的分配请求现在可以被满足。简单地signal
一个线程可能唤醒了错误的线程。
解决方案:pthread_cond_broadcast()
- 当一个条件可能满足多个等待者,或者发出信号的线程不确定哪个等待者最适合被唤醒时,可以使用
pthread_cond_broadcast(&c)
。 broadcast
会唤醒所有正在该条件变量c
上等待的线程。- 每个被唤醒的线程都会重新获取锁,并重新检查其等待的特定条件 (在
while
循环中)。- 那些条件仍然不满足的线程会再次调用
wait
休眠。 - 那些条件已经满足的线程会跳出
while
循环并继续执行。
- 那些条件仍然不满足的线程会再次调用
- 在内存分配器的例子中: 当
free
调用pthread_cond_broadcast
时,Ta 和 Tb 都会被唤醒。- Ta 检查
bytesLeft < 100
(50 < 100),仍然休眠。 - Tb 检查
bytesLeft < 10
(50 < 10 不成立),跳出循环,成功分配内存。
- Ta 检查
- 这种条件被称为“覆盖条件 (Covering Condition)”: 一个信号(或状态改变)覆盖了所有可能需要被唤醒的情况。代价是可能会唤醒过多不必要的线程,这些线程醒来后检查条件发现不满足又会立即睡去,带来一些性能开销。但在难以精确判断唤醒对象时,这是一种保守且正确的做法。
TIP: USE WHILE (NOT IF) FOR CONDITIONS
- 再次强调: 即使不考虑
broadcast
,也始终使用while
循环来检查条件变量相关的条件。这是因为:- Mesa Semantics: 信号只是一个提示。
- Spurious Wakeups: 线程可能在没有信号的情况下被唤醒。
broadcast
的情况: 多个线程被唤醒,只有一个或少数几个的条件真正满足。
while
循环确保了线程在继续执行前,其等待的条件确实为真。
30.4 总结
- 条件变量是锁之外的另一种重要的同步原语。
- 它们允许线程在某个程序状态(条件)不满足期望时进入休眠,并在状态改变时被其他线程唤醒。
- 关键组件:
- 一个互斥锁,用于保护共享的状态变量和条件检查。
- 一个或多个条件变量,线程在上面等待。
- 一个共享的状态变量,代表线程实际关心的条件。
- 核心操作:
wait()
(原子释放锁并休眠,唤醒后重新获取锁) 和signal()
/broadcast()
(唤醒等待者)。 - 重要实践:
- 始终在
while
循环中调用wait()
并重新检查条件。 - 通常在持有相关互斥锁的情况下调用
signal()
或broadcast()
。 - 仔细设计条件和信号逻辑,考虑是否需要多个条件变量或使用
broadcast
。
- 始终在
- 条件变量优雅地解决了许多重要的同步问题,包括生产者/消费者问题和覆盖条件问题。