关于建设网站的合作合同范本商会网站建设
news/
2025/9/23 5:32:33/
文章来源:
关于建设网站的合作合同范本,商会网站建设,google 网站收录,wordpress 显示指定分类文章第 7 章 并行与并发 文章目录第 7 章 并行与并发7.1 并行基础7.2 互斥量与临界区7.3 期物7.4 条件变量7.5 原子操作与内存模型原子操作一致性模型内存顺序总结习题进一步阅读的参考资料7.1 并行基础
std::thread 用于创建一个执行的线程实例#xff0c;所以它是一切并发编程的…第 7 章 并行与并发 文章目录第 7 章 并行与并发7.1 并行基础7.2 互斥量与临界区7.3 期物7.4 条件变量7.5 原子操作与内存模型原子操作一致性模型内存顺序总结习题进一步阅读的参考资料7.1 并行基础
std::thread 用于创建一个执行的线程实例所以它是一切并发编程的基础使用时需要包含 thread 头文件 它提供了很多基本的线程操作例如 get_id() 来获取所创建线程的线程 ID使用 join() 来加入一个线程等等例如
#include iostream
#include threadint main() {std::thread t([](){std::cout hello world. std::endl;});t.join();return 0;
}7.2 互斥量与临界区
我们在操作系统、亦或是数据库的相关知识中已经了解过了有关并发技术的基本知识mutex 就是其中的核心之一。 C11 引入了 mutex 相关的类其所有相关的函数都放在 mutex 头文件中。
std::mutex 是 C11 中最基本的 mutex 类通过实例化 std::mutex 可以创建互斥量 而通过其成员函数 lock() 可以进行上锁unlock() 可以进行解锁。 但是在在实际编写代码的过程中最好不去直接调用成员函数 因为调用成员函数就需要在每个临界区的出口处调用 unlock()当然还包括异常。 这时候 C11 还为互斥量提供了一个 RAII 语法的模板类 std::lock_guard。 RAII 在不失代码简洁性的同时很好的保证了代码的异常安全性。
在 RAII 用法下对于临界区的互斥量的创建只需要在作用域的开始部分例如
#include iostream
#include threadint v 1;void critical_section(int change_v) {static std::mutex mtx;std::lock_guardstd::mutex lock(mtx);// 执行竞争操作v change_v;// 离开此作用域后 mtx 会被释放
}int main() {std::thread t1(critical_section, 2), t2(critical_section, 3);t1.join();t2.join();std::cout v std::endl;return 0;
}由于 C 保证了所有栈对象在生命周期结束时会被销毁所以这样的代码也是异常安全的。 无论 critical_section() 正常返回、还是在中途抛出异常都会引发堆栈回退也就自动调用了 unlock()。
而 std::unique_lock 则相对于 std::lock_guard 出现的std::unique_lock 更加灵活 std::unique_lock 的对象会以独占所有权没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权 的方式管理 mutex 对象上的上锁和解锁的操作。所以在并发编程中推荐使用 std::unique_lock。
std::lock_guard 不能显式的调用 lock 和 unlock 而 std::unique_lock 可以在声明后的任意位置调用 可以缩小锁的作用范围提供更高的并发度。
如果你用到了条件变量 std::condition_variable::wait 则必须使用 std::unique_lock 作为参数。
例如
#include iostream
#include threadint v 1;void critical_section(int change_v) {static std::mutex mtx;std::unique_lockstd::mutex lock(mtx);// 执行竞争操作v change_v;std::cout v std::endl;// 将锁进行释放lock.unlock();// 在此期间任何人都可以抢夺 v 的持有权// 开始另一组竞争操作再次加锁lock.lock();v 1;std::cout v std::endl;
}int main() {std::thread t1(critical_section, 2), t2(critical_section, 3);t1.join();t2.join();return 0;
}7.3 期物
期物Future表现为 std::future它提供了一个访问异步操作结果的途径这句话很不好理解。 为了理解这个特性我们需要先理解一下在 C11 之前的多线程行为。
试想如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务并返回我一个结果。 而这时候线程 A 可能正在忙其他的事情无暇顾及 B 的结果 所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果。
在 C11 的 std::future 被引入之前通常的做法是 创建一个线程 A在线程 A 里启动任务 B当准备完毕后发送一个事件并将结果保存在全局变量中。 而主函数线程 A 里正在做其他的事情当需要结果的时候调用一个线程等待函数来获得执行的结果。
而 C11 提供的 std::future 简化了这个流程可以用来获取异步任务的结果。 自然地我们很容易能够想象到把它作为一种简单的线程同步手段即屏障barrier。
为了看一个例子我们这里额外使用 std::packaged_task它可以用来封装任何可以调用的目标从而用于实现异步的调用。 举例来说
#include iostream
#include future
#include threadint main() {// 将一个返回值为7的 lambda 表达式封装到 task 中// std::packaged_task 的模板参数为要封装函数的类型std::packaged_taskint() task([](){return 7;});// 获得 task 的期物std::futureint result task.get_future(); // 在一个线程中执行 taskstd::thread(std::move(task)).detach();std::cout waiting...;result.wait(); // 在此设置屏障阻塞到期物的完成// 输出执行结果std::cout done! std:: endl future result is result.get() std::endl;return 0;
}在封装好要调用的目标后可以使用 get_future() 来获得一个 std::future 对象以便之后实施线程同步。
7.4 条件变量
条件变量 std::condition_variable 是为了解决死锁而生当互斥操作不够用而引入的。 比如线程可能需要等待某个条件为真才能继续执行 而一个忙等待循环中可能会导致所有其他线程都无法进入临界区使得条件为真时就会发生死锁。 所以condition_variable 实例被创建出现主要就是用于唤醒等待线程从而避免死锁。 std::condition_variable的 notify_one() 用于唤醒一个线程 notify_all() 则是通知所有线程。下面是一个生产者和消费者模型的例子
#include queue
#include chrono
#include mutex
#include thread
#include iostream
#include condition_variableint main() {std::queueint produced_nums;std::mutex mtx;std::condition_variable cv;bool notified false; // 通知信号// 生产者auto producer []() {for (int i 0; ; i) {std::this_thread::sleep_for(std::chrono::milliseconds(900));std::unique_lockstd::mutex lock(mtx);std::cout producing i std::endl;produced_nums.push(i);notified true;cv.notify_all(); // 此处也可以使用 notify_one}};// 消费者auto consumer []() {while (true) {std::unique_lockstd::mutex lock(mtx);while (!notified) { // 避免虚假唤醒cv.wait(lock);}// 短暂取消锁使得生产者有机会在消费者消费空前继续生产lock.unlock();std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 消费者慢于生产者lock.lock();while (!produced_nums.empty()) {std::cout consuming produced_nums.front() std::endl;produced_nums.pop();}notified false;}};// 分别在不同的线程中运行std::thread p(producer);std::thread cs[2];for (int i 0; i 2; i) {cs[i] std::thread(consumer);}p.join();for (int i 0; i 2; i) {cs[i].join();}return 0;
}值得一提的是在生产者中我们虽然可以使用 notify_one()但实际上并不建议在此处使用 因为在多消费者的情况下我们的消费者实现中简单放弃了锁的持有这使得可能让其他消费者 争夺此锁从而更好的利用多个消费者之间的并发。话虽如此但实际上因为 std::mutex 的排他性 我们根本无法期待多个消费者能真正意义上的并行消费队列的中生产的内容我们仍需要粒度更细的手段。
7.5 原子操作与内存模型
细心的读者可能会对前一小节中生产者消费者模型的例子可能存在编译器优化导致程序出错的情况产生疑惑。 例如布尔值 notified 没有被 volatile 修饰编译器可能对此变量存在优化例如将其作为一个寄存器的值 从而导致消费者线程永远无法观察到此值的变化。这是一个好问题为了解释清楚这个问题我们需要进一步讨论 从 C 11 起引入的内存模型这一概念。我们首先来看一个问题下面这段代码输出结果是多少
#include thread
#include iostreamint main() {int a 0;int flag 0;std::thread t1([]() {while (flag ! 1);int b a;std::cout b b std::endl;});std::thread t2([]() {a 5;flag 1;});t1.join();t2.join();return 0;
}从直观上看t2 中 a 5; 这一条语句似乎总在 flag 1; 之前得到执行而 t1 中 while (flag ! 1) 似乎保证了 std::cout b b std::endl; 不会再标记被改变前执行。从逻辑上看似乎 b 的值应该等于 5。 但实际情况远比此复杂得多或者说这段代码本身属于未定义的行为因为对于 a 和 flag 而言他们在两个并行的线程中被读写 出现了竞争。除此之外即便我们忽略竞争读写仍然可能受 CPU 的乱序执行编译器对指令的重排的影响 导致 a 5 发生在 flag 1 之后。从而 b 可能输出 0。
原子操作
std::mutex 可以解决上面出现的并发读写的问题但互斥锁是操作系统级的功能 这是因为一个互斥锁的实现通常包含两条基本原理
提供线程间自动的状态转换即『锁住』这个状态保障在互斥锁操作期间所操作变量的内存与临界区外进行隔离
这是一组非常强的同步条件换句话说当最终编译为 CPU 指令时会表现为非常多的指令我们之后再来看如何实现一个简单的互斥锁。 这对于一个仅需原子级操作没有中间态的变量似乎太苛刻了。
关于同步条件的研究有着非常久远的历史我们在这里不进行赘述。读者应该明白在现代 CPU 体系结构下提供了 CPU 指令级的原子操作 因此在 C11 中多线程下共享变量的读写这一问题上还引入了 std::atomic 模板使得我们实例化一个原子类型将一个 原子类型读写操作从一组指令最小化到单个 CPU 指令。例如
std::atomicint counter;并为整数或浮点数的原子类型提供了基本的数值成员函数举例来说 包括 fetch_add, fetch_sub 等同时通过重载方便的提供了对应的 - 版本。 比如下面的例子
#include atomic
#include thread
#include iostreamstd::atomicint count {0};int main() {std::thread t1([](){count.fetch_add(1);});std::thread t2([](){count; // 等价于 fetch_addcount 1; // 等价于 fetch_add});t1.join();t2.join();std::cout count std::endl;return 0;
}当然并非所有的类型都能提供原子操作这是因为原子操作的可行性取决于 CPU 的架构以及所实例化的类型结构是否满足该架构对内存对齐 条件的要求因而我们总是可以通过 std::atomicT::is_lock_free 来检查该原子类型是否需支持原子操作例如
#include atomic
#include iostreamstruct A {float x;int y;long long z;
};int main() {std::atomicA a;std::cout std::boolalpha a.is_lock_free() std::endl;return 0;
}一致性模型
并行执行的多个线程从某种宏观层面上讨论可以粗略的视为一种分布式系统。 在分布式系统中任何通信乃至本地操作都需要消耗一定时间甚至出现不可靠的通信。
如果我们强行将一个变量 v 在多个线程之间的操作设为原子操作即任何一个线程在操作完 v 后 其他线程均能同步感知到 v 的变化则对于变量 v 而言表现为顺序执行的程序它并没有由于引入多线程 而得到任何效率上的收益。对此有什么办法能够适当的加速呢答案便是削弱原子操作的在进程间的同步条件。
从原理上看每个线程可以对应为一个集群节点而线程间的通信也几乎等价于集群节点间的通信。 削弱进程间的同步条件通常我们会考虑四种不同的一致性模型 线性一致性又称强一致性或原子一致性。它要求任何一次读操作都能读到某个数据的最近一次写的数据并且所有线程的操作顺序与全局时钟下的顺序是一致的。 x.store(1) x.load()
T1 -------------------------------T2 -------------------±------------ x.store(2)
在这种情况下线程 T1, T2 对 x 的两次写操作是原子的且 x.store(1) 是严格的发生在 x.store(2) 之前x.store(2) 严格的发生在 x.load() 之前。
值得一提的是线性一致性对全局时钟的要求是难以实现的这也是人们不断研究比这个一致性更弱条件下其他一致性的算法的原因。 顺序一致性同样要求任何一次读操作都能读到数据最近一次写入的数据但未要求与全局时钟的顺序一致。 x.store(1) x.store(3) x.load()
T1 -----------------------------------T2 ---------------±--------------------- x.store(2) 或者 x.store(1) x.store(3) x.load()T1 ---------±----------±---------±---- T2 ------±------------------------------ x.store(2)
在顺序一致性的要求下x.load() 必须读到最近一次写入的数据因此 x.store(2) 与 x.store(1) 并无任何先后保障即 只要 T2 的 x.store(2) 发生在 x.store(3) 之前即可。 因果一致性它的要求进一步降低只需要有因果关系的操作顺序得到保障而非因果关系的操作顺序则不做要求。 a 1 b 2
T1 -------------------------------------------T2 ------±-------------------±-------±------- x.store(3) c a b y.load() 或者 a 1 b 2T1 ----±----------±--------------------------- T2 ------±-------------------±-------±------- x.store(3) y.load() c a b 亦或者 b 2 a 1T1 ----±----------±--------------------------- T2 ------±-------------------±-------±------- y.load() c a b x.store(3)
上面给出的三种例子都是属于因果一致的因为整个过程中只有 c 对 a 和 b 产生依赖而 x 和 y
在此例子中表现为没有关系但实际情况中我们需要更详细的信息才能确定 x 与 y 确实无关 最终一致性是最弱的一致性要求它只保障某个操作在未来的某个时间节点上会被观察到但并未要求被观察到的时间。因此我们甚至可以对此条件稍作加强例如规定某个操作被观察到的时间总是有界的。当然这已经不在我们的讨论范围之内了。 x.store(3) x.store(4)
T1 -----------------------------------------------------------T2 ---------±-----------±-------------------±-------±------- x.read() x.read() x.read() x.read()
在上面的情况中如果我们假设 x 的初始值为 0则 T2 中四次 x.read() 结果可能但不限于以下情况 3 4 4 4 // x 的写操作被很快观察到 0 3 3 4 // x 的写操作被观察到的时间存在一定延迟 0 0 0 4 // 最后一次读操作读到了 x 的最终值但此前的变化并未观察到 0 0 0 0 // 在当前时间段内 x 的写操作均未被观察到但未来某个时间点上一定能观察到 x 为 4 的情况 内存顺序
为了追求极致的性能实现各种强度要求的一致性C11 为原子操作定义了六种不同的内存顺序 std::memory_order 的选项表达了四种多线程间的同步模型 宽松模型在此模型下单个线程内的原子操作都是顺序执行的不允许指令重排但不同线程间原子操作的顺序是任意的。类型通过 std::memory_order_relaxed 指定。我们来看一个例子 std::atomicint counter {0};
std::vectorstd::thread vt;
for (int i 0; i 100; i) {vt.emplace_back([](){counter.fetch_add(1, std::memory_order_relaxed);});
}for (auto t : vt) {t.join();
}
std::cout current counter: counter std::endl;释放/消费模型在此模型中我们开始限制进程间的操作顺序如果某个线程需要修改某个值但另一个线程会对该值的某次操作产生依赖即后者依赖前者。具体而言线程 A 完成了三次对 x 的写操作线程 B 仅依赖其中第三次 x 的写操作与 x 的前两次写行为无关则当 A 主动 x.release() 时候即使用 std::memory_order_release选项 std::memory_order_consume 能够确保 B 在调用 x.load() 时候观察到 A 中第三次对 x 的写操作。我们来看一个例子 // 初始化为 nullptr 防止 consumer 线程从野指针进行读取
std::atomicint* ptr(nullptr);
int v;
std::thread producer([]() {int* p new int(42);v 1024;ptr.store(p, std::memory_order_release);
});
std::thread consumer([]() {int* p;while(!(p ptr.load(std::memory_order_consume)));std::cout p: *p std::endl;std::cout v: v std::endl;
});
producer.join();
consumer.join();释放/获取模型在此模型下我们可以进一步加紧对不同线程间原子操作的顺序的限制在释放 std::memory_order_release 和获取 std::memory_order_acquire 之间规定时序即发生在释放操作之前的所有写操作对其他线程的任何获取操作都是可见的亦即发生顺序happens-before。 可以看到std::memory_order_release 确保了它之后的写行为不会发生在释放操作之前是一个向前的屏障而 std::memory_order_acquire 确保了它之前的写行为不会发生在该获取操作之后是一个向后的屏障。对于选项 std::memory_order_acq_rel 而言则结合了这两者的特点唯一确定了一个内存屏障使得当前线程对内存的读写不会被重排到此操作的前后。 我们来看一个例子 std::vectorint v;
std::atomicint flag {0};
std::thread release([]() {v.push_back(42);flag.store(1, std::memory_order_release);
});
std::thread acqrel([]() {int expected 1; // must before compare_exchange_strongwhile(!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) {expected 1; // must after compare_exchange_strong}// flag has changed to 2
});
std::thread acquire([]() {while(flag.load(std::memory_order_acquire) 2);std::cout v.at(0) std::endl; // must be 42
});
release.join();
acqrel.join();
acquire.join();在此例中我们使用了 compare_exchange_strong它便是比较交换原语Compare-and-swap primitive它有一个更弱的版本即 compare_exchange_weak它允许即便交换成功也仍然返回 false 失败。其原因是因为在某些平台上虚假故障导致的具体而言当 CPU 进行上下文切换时另一线程加载同一地址产生的不一致。除此之外compare_exchange_strong 的性能可能稍差于 compare_exchange_weak但大部分情况下compare_exchange_strong 应该被有限考虑。 顺序一致模型在此模型下原子操作满足顺序一致性进而可能对性能产生损耗。可显式的通过 std::memory_order_seq_cst 进行指定。最后来看一个例子 std::atomicint counter {0};
std::vectorstd::thread vt;
for (int i 0; i 100; i) {vt.emplace_back([](){counter.fetch_add(1, std::memory_order_seq_cst);});
}for (auto t : vt) {t.join();
}
std::cout current counter: counter std::endl;这个例子与第一个宽松模型的例子本质上没有区别仅仅只是将原子操作的内存顺序修改为了 memory_order_seq_cst有兴趣的读者可以自行编写程序测量这两种不同内存顺序导致的性能差异。
总结
C11 语言层提供了并发编程的相关支持本节简单的介绍了 std::thread, std::mutex, std::future 这些并发编程中不可回避的重要工具。 除此之外我们还介绍了 C11 最重要的几个特性之一的『内存模型』 它们为 C 在标准化高性能计算中提供了重要的基础。
习题 请编写一个简单的线程池提供如下功能 ThreadPool p(4); // 指定四个工作线程// 将任务在池中入队并返回一个 std::future
auto f pool.enqueue([](int life) {return meaning;
}, 42);// 从 future 中获得执行结果
std::cout f.get() std::endl;请使用 std::atomicbool 实现一个互斥锁。
返回目录 | 上一章 | 下一章 文件系统
进一步阅读的参考资料
C 并发编程(中文版)线程支持库文档Herlihy, M. P., Wing, J. M. (1990). Linearizability: a correctness condition for concurrent objects. ACM Transactions on Programming Languages and Systems, 12(3), 463–492. https://doi.org/10.1145/78969.78972
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/911481.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!