Rust从入门到精通之进阶篇:14.并发编程

并发编程

并发编程允许程序同时执行多个独立的任务,充分利用现代多核处理器的性能。Rust 提供了强大的并发原语,同时通过类型系统和所有权规则在编译时防止数据竞争和其他常见的并发错误。在本章中,我们将探索 Rust 的并发编程模型。

线程基础

创建线程

Rust 标准库提供了 std::thread 模块,用于创建和管理线程:

use std::thread;
use std::time::Duration;fn main() {// 创建一个新线程let handle = thread::spawn(|| {for i in 1..10 {println!("在新线程中: {}", i);thread::sleep(Duration::from_millis(1));}});// 主线程中的代码for i in 1..5 {println!("在主线程中: {}", i);thread::sleep(Duration::from_millis(1));}// 等待新线程完成handle.join().unwrap();
}

thread::spawn 函数接受一个闭包,该闭包包含要在新线程中执行的代码,并返回一个 JoinHandle。调用 join 方法会阻塞当前线程,直到新线程完成。

线程与所有权

当我们将闭包传递给 thread::spawn 时,Rust 需要知道闭包将在哪个线程中运行以及它将使用哪些数据。闭包默认会捕获其环境中的变量,但在线程间传递数据时,我们需要考虑所有权问题。

use std::thread;fn main() {let v = vec![1, 2, 3];// 错误:Rust 无法确定 v 的生命周期// let handle = thread::spawn(|| {//     println!("这是向量: {:?}", v);// });// 使用 move 关键字转移所有权let handle = thread::spawn(move || {println!("这是向量: {:?}", v);});// 错误:v 的所有权已经转移到新线程// println!("向量: {:?}", v);handle.join().unwrap();
}

使用 move 关键字可以强制闭包获取其使用的值的所有权,而不是借用它们。这对于确保数据在线程运行期间有效非常重要。

消息传递

一种处理并发的流行方法是消息传递,其中线程或执行者通过发送消息进行通信。Rust 的标准库提供了通道(channel)实现,这是一种实现消息传递并发的方式。

创建通道

use std::sync::mpsc;
use std::thread;fn main() {// 创建一个通道let (tx, rx) = mpsc::channel();// 在新线程中发送消息thread::spawn(move || {let val = String::from("你好");tx.send(val).unwrap();// 错误:val 的所有权已转移// println!("val 是 {}", val);});// 在主线程中接收消息let received = rx.recv().unwrap();println!("收到: {}", received);
}

mpsc 代表"多生产者,单消费者"(multiple producers, single consumer)。通道有两部分:发送端(tx)和接收端(rx)。

发送多个值

use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let vals = vec![String::from("你好"),String::from("来自"),String::from("线程"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("收到: {}", received);}
}

多个生产者

通过克隆发送者,我们可以有多个生产者:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();let tx1 = tx.clone();thread::spawn(move || {let vals = vec![String::from("你好"),String::from("来自"),String::from("线程1"),];for val in vals {tx1.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});thread::spawn(move || {let vals = vec![String::from("更多"),String::from("来自"),String::from("线程2"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("收到: {}", received);}
}

共享状态

另一种处理并发的方法是允许多个线程访问同一块数据。这种方法需要特别小心,以避免数据竞争。

互斥锁(Mutex)

互斥锁(Mutex,mutual exclusion)确保在任何时刻只有一个线程可以访问数据:

use std::sync::Mutex;fn main() {let m = Mutex::new(5);{let mut num = m.lock().unwrap();*num = 6;} // 锁在这里被释放println!("m = {:?}", m);
}

在线程间共享 Mutex

use std::sync::{Arc, Mutex};
use std::thread;fn main() {let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock().unwrap();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("结果: {}", *counter.lock().unwrap());
}

Arc<T> 是一个原子引用计数类型,允许在线程间安全地共享所有权。它类似于 Rc<T>,但可以在并发环境中使用。

死锁

使用互斥锁时需要注意死锁问题,当两个线程各自持有一个锁并尝试获取对方的锁时,就会发生死锁:

use std::sync::{Mutex, MutexGuard};
use std::thread;
use std::time::Duration;fn main() {let mutex_a = Mutex::new(5);let mutex_b = Mutex::new(5);let thread_a = thread::spawn(move || {// 线程 A 先锁定 mutex_alet mut a: MutexGuard<i32> = mutex_a.lock().unwrap();println!("线程 A 获取了 mutex_a");// 睡眠一段时间,让线程 B 有机会锁定 mutex_bthread::sleep(Duration::from_millis(100));// 线程 A 尝试锁定 mutex_bprintln!("线程 A 尝试获取 mutex_b");let mut b: MutexGuard<i32> = mutex_b.lock().unwrap();*a += *b;});let thread_b = thread::spawn(move || {// 线程 B 先锁定 mutex_blet mut b: MutexGuard<i32> = mutex_b.lock().unwrap();println!("线程 B 获取了 mutex_b");// 睡眠一段时间,让线程 A 有机会锁定 mutex_athread::sleep(Duration::from_millis(100));// 线程 B 尝试锁定 mutex_aprintln!("线程 B 尝试获取 mutex_a");let mut a: MutexGuard<i32> = mutex_a.lock().unwrap();*b += *a;});// 注意:这个例子会导致死锁!
}

读写锁(RwLock)

读写锁允许多个读取器或一个写入器访问数据:

use std::sync::RwLock;fn main() {let lock = RwLock::new(5);// 多个读取器可以同时访问数据{let r1 = lock.read().unwrap();let r2 = lock.read().unwrap();println!("读取器: {} {}", r1, r2);} // 读锁在这里被释放// 只能有一个写入器{let mut w = lock.write().unwrap();*w += 1;println!("写入后: {}", *w);} // 写锁在这里被释放
}

原子类型

对于简单的计数器和标志,可以使用原子类型,它们提供了无锁的线程安全操作:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;fn main() {let counter = Arc::new(AtomicUsize::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {for _ in 0..1000 {counter.fetch_add(1, Ordering::SeqCst);}});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("结果: {}", counter.load(Ordering::SeqCst));
}

Ordering 参数指定内存顺序约束:

  • Relaxed:最宽松的顺序,只保证原子性
  • Release:写操作使用,确保之前的操作不会被重排到此操作之后
  • Acquire:读操作使用,确保之后的操作不会被重排到此操作之前
  • AcqRel:结合了 AcquireRelease
  • SeqCst:最严格的顺序,提供全序关系

条件变量

条件变量允许线程等待某个条件变为真:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;fn main() {let pair = Arc::new((Mutex::new(false), Condvar::new()));let pair2 = Arc::clone(&pair);thread::spawn(move || {let (lock, cvar) = &*pair2;let mut started = lock.lock().unwrap();println!("改变条件变量之前");*started = true;// 通知等待的线程cvar.notify_one();println!("条件变量已改变");});let (lock, cvar) = &*pair;let mut started = lock.lock().unwrap();// 等待条件变为真while !*started {started = cvar.wait(started).unwrap();}println!("条件已满足,主线程继续执行");
}

屏障(Barrier)

屏障确保多个线程在某一点同步:

use std::sync::{Arc, Barrier};
use std::thread;fn main() {let mut handles = Vec::with_capacity(10);let barrier = Arc::new(Barrier::new(10));for i in 0..10 {let b = Arc::clone(&barrier);handles.push(thread::spawn(move || {println!("线程 {} 开始工作", i);// 模拟工作thread::sleep(std::time::Duration::from_millis(i * 100));println!("线程 {} 到达屏障", i);// 等待所有线程到达屏障b.wait();println!("线程 {} 继续执行", i);}));}for handle in handles {handle.join().unwrap();}
}

线程池

创建线程有开销,线程池可以重用线程,提高性能:

use std::sync::{mpsc, Arc, Mutex};
use std::thread;struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}type Job = Box<dyn FnOnce() + Send + 'static>;impl ThreadPool {fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();println!("工作线程 {} 获得了一个任务", id);job();});Worker { id, thread }}
}fn main() {let pool = ThreadPool::new(4);for i in 0..8 {pool.execute(move || {println!("执行任务 {}", i);thread::sleep(std::time::Duration::from_secs(1));println!("任务 {} 完成", i);});}// 给线程池一些时间来处理任务thread::sleep(std::time::Duration::from_secs(10));
}

并发最佳实践

1. 优先使用消息传递

当可能时,优先使用消息传递而不是共享状态:

use std::sync::mpsc;
use std::thread;fn main() {let (tx, rx) = mpsc::channel();// 启动工作线程for i in 0..4 {let tx = tx.clone();thread::spawn(move || {// 模拟工作let result = i * i;tx.send(result).unwrap();});}// 丢弃原始发送者drop(tx);// 收集结果let mut results = Vec::new();for received in rx {results.push(received);}println!("结果: {:?}", results);
}

2. 使用适当的同步原语

根据需求选择合适的同步原语:

  • 对于简单计数器:使用 AtomicUsize
  • 对于需要独占访问的数据:使用 Mutex
  • 对于读多写少的数据:使用 RwLock
  • 对于一次性初始化:使用 lazy_staticOnceCell/OnceLock

3. 避免过度同步

过度同步会导致性能下降:

use std::sync::{Arc, Mutex};
use std::thread;// 不好的做法:锁的粒度太大
fn process_data_bad(data: &[i32]) -> i32 {let result = Arc::new(Mutex::new(0));let mut handles = vec![];for chunk in data.chunks(100) {let result = Arc::clone(&result);let chunk = chunk.to_vec();handles.push(thread::spawn(move || {// 计算和let sum: i32 = chunk.iter().sum();// 获取锁并更新结果let mut result = result.lock().unwrap();*result += sum;}));}for handle in handles {handle.join().unwrap();}*result.lock().unwrap()
}// 好的做法:减少锁的竞争
fn process_data_good(data: &[i32]) -> i32 {let mut handles = vec![];for chunk in data.chunks(100) {let chunk = chunk.to_vec();handles.push(thread::spawn(move || {// 计算和并返回chunk.iter().sum::<i32>()}));}// 收集结果let mut final_result = 0;for handle in handles {final_result += handle.join().unwrap();}final_result
}

4. 使用线程局部存储

对于每个线程需要独立状态的情况,使用线程局部存储:

use std::cell::RefCell;
use std::thread;
use std::thread_local;thread_local! {static COUNTER: RefCell<u32> = RefCell::new(0);
}fn main() {let mut handles = vec![];for _ in 0..5 {handles.push(thread::spawn(|| {for _ in 0..10 {COUNTER.with(|c| {*c.borrow_mut() += 1;});}let result = COUNTER.with(|c| *c.borrow());println!("线程计数器: {}", result);}));}for handle in handles {handle.join().unwrap();}
}

5. 使用 parking_lot 库

parking_lot 库提供了更高性能的同步原语:

// 在 Cargo.toml 中添加:
// [dependencies]
// parking_lot = "0.12.0"use parking_lot::{Mutex, RwLock};
use std::sync::Arc;
use std::thread;fn main() {let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("结果: {}", *counter.lock());
}

练习题

  1. 实现一个并发计数器,使用不同的同步原语(MutexRwLockAtomicUsize)并比较它们的性能。

  2. 创建一个简单的生产者-消费者系统,其中多个生产者线程生成随机数,多个消费者线程计算这些数字的平方并打印结果。

  3. 实现一个并发哈希表,允许多个线程同时读取,但只允许一个线程写入。使用适当的同步原语确保线程安全。

  4. 编写一个程序,使用屏障(Barrier)同步多个线程,让它们同时开始执行一个计算密集型任务,并测量完成时间。

  5. 实现一个简单的线程池,可以提交任务并等待所有任务完成。包括一个优雅的关闭机制,确保所有任务都能完成。

总结

在本章中,我们探讨了 Rust 的并发编程模型:

  • 线程基础和所有权规则
  • 消息传递并发(通道)
  • 共享状态并发(互斥锁、读写锁、原子类型)
  • 条件变量和屏障
  • 线程池实现
  • 并发编程最佳实践

Rust 的类型系统和所有权规则使得并发编程更加安全,在编译时就能捕获许多常见的并发错误。通过选择适当的并发模型和同步原语,你可以编写高效、安全的并发代码。在下一章中,我们将探索 Rust 的异步编程模型,它提供了一种更轻量级的并发方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/75567.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法训练营第二十三天 | 贪心算法(一)

文章目录 一、贪心算法理论基础二、Leetcode 455.分发饼干二、Leetcode 376. 摆动序列三、Leetcode 53. 最大子序和 一、贪心算法理论基础 贪心算法是一种在每一步选择中都采取当前状态下的最优决策&#xff0c;从而希望最终达到全局最优解的算法设计技术。 基本思想 贪心算…

css基础-display 常用布局

CSS display 属性详解 属性设置元素是否被视为块级或行级盒子以及用于子元素的布局&#xff0c;例如流式布局、网格布局或弹性布局。 一、基础显示模式 1. block 作用&#xff1a; 元素独占一行可设置宽高和内外边距默认宽度撑满父容器 应用场景&#xff1a; 布局容器&a…

速卖通API数据清洗实战:从原始JSON到结构化商品数据库

下面将详细介绍如何把速卖通 API 返回的原始 JSON 数据清洗并转换为结构化商品数据库。 1. 数据获取 首先要借助速卖通 API 获取商品数据&#xff0c;以 Python 为例&#xff0c;可使用requests库发送请求并得到 JSON 数据。 import requests# 替换为你的 API Key 和 Secret …

【零基础入门unity游戏开发——2D篇】2D物理系统 —— 2D刚体组件(Rigidbody2D)

考虑到每个人基础可能不一样,且并不是所有人都有同时做2D、3D开发的需求,所以我把 【零基础入门unity游戏开发】 分为成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要讲解C#的基础语法,包括变量、数据类型、运算符、流程控制、面向对象等,适合没有编程基础的…

Collectors.toMap / list 转 map

前言 略 Collectors.toMap List<User> userList ...; Map<Long, User> userMap userList.stream().collect(Collectors.toMap(User::getUserId, Function.identity()));假如id存在重复值&#xff0c;则会报错Duplicate key xxx, 解决方案 两个重复id中&#…

热门面试题第13天|Leetcode 110.平衡二叉树 257. 二叉树的所有路径 404.左叶子之和 222.完全二叉树的节点个数

222.完全二叉树的节点个数&#xff08;优先掌握递归&#xff09; 需要了解&#xff0c;普通二叉树 怎么求&#xff0c;完全二叉树又怎么求 题目链接/文章讲解/视频讲解&#xff1a;https://programmercarl.com/0222.%E5%AE%8C%E5%85%A8%E4%BA%8C%E5%8F%89%E6%A0%91%E7%9A%84%E8…

关于Object.assign

Object.assign 基本用法 Object.assign() 方法用于将所有可枚举属性的值从一个或者多个源对象source复制到目标对象。它将返回目标对象target const target { a: 1, b: 2 } const source { b: 4, c: 5 }const returnedTarget Object.assign(target, source)target // { a…

GitHub高级筛选小白使用手册

GitHub高级筛选小白使用手册 GitHub 提供了强大的搜索功能&#xff0c;允许用户通过高级筛选器来精确查找仓库、Issues、Pull Requests、代码等。下面是一些常用的高级筛选用法&#xff0c;帮助你更高效地使用 GitHub 搜索功能。 目录 搜索仓库搜索Issues搜索Pull Requests搜…

手动集成sqlite的方法

注意到sqlite有backup方法&#xff08;https://www.sqlite.org/backup.html&#xff09;。 也注意到android中sysroot下&#xff0c;没有sqlite3的库&#xff0c;也没有相关头文件。 如果要使用 sqlite 的backup&#xff0c;那么就需要手动集成sqlite代码到项目中。可以如下操…

蓝桥杯真题 2109.统计子矩阵

原题地址:1.统计子矩阵 - 蓝桥云课 问题描述 给定一个 NMNM 的矩阵 AA, 请你统计有多少个子矩阵 (最小 1111, 最大 NM)NM) 满足子矩阵中所有数的和不超过给定的整数 KK ? 输入格式 第一行包含三个整数 N,MN,M 和 KK. 之后 NN 行每行包含 MM 个整数, 代表矩阵 AA. 输出格…

蓝桥杯—最少操作数

一.题目 分析:每次可以进行三次操作&#xff0c;求在n步操作后可以达到目标数的最小n&#xff0c;和最短路径问题相似&#xff0c;分层遍历加记忆化搜索防止时间复杂度过高&#xff0c;还需要减枝操作 import java.util.HashSet; import java.util.LinkedList; import java.ut…

Linux内核NIC网卡驱动实战案例分析

以下Linux 内核模块实现了一个虚拟网络设备驱动程序&#xff0c;其作用和意义如下&#xff1a; 1. 作用 &#xff08;1&#xff09;创建虚拟网络设备对 驱动程序动态创建了两个虚拟网络设备&#xff08;nic_dev[0]和nic_dev[1]&#xff09;&#xff0c;模拟物理网卡的功能。这两…

Trae初使用心得(Java后端)

1.前提 2025年3月3日&#xff0c;字节跳动正式官宣“中国首个 AI 原生集成开发环境&#xff08;AI IDE&#xff09;”Trae 国内版正式上线&#xff0c;由于之前项目的原因小编没有及时的去体验&#xff0c;这几日专门抽空去体验了一下感觉还算可以。 2.特点 Trade重在可以白嫖…

[项目]基于FreeRTOS的STM32四轴飞行器: 十二.角速度加速度滤波

基于FreeRTOS的STM32四轴飞行器: 十二.滤波 一.滤波介绍二.对角速度进行一阶低通滤波三.对加速度进行卡尔曼滤波 一.滤波介绍 模拟信号滤波&#xff1a; 最常用的滤波方法可以在信号和地之间并联一个电容&#xff0c;因为电容通交隔直&#xff0c;信号突变会给电容充电&#x…

UNIX网络编程笔记:TCP、UDP、SCTP编程的区别

一、核心特性对比 特性TCPUDPSCTP连接方式面向连接&#xff08;三次握手&#xff09;无连接面向连接&#xff08;四次握手&#xff09;可靠性可靠传输&#xff08;重传、确认机制&#xff09;不可靠传输可靠传输&#xff08;多路径冗余&#xff09;传输单位字节流&#xff08;…

Python爬虫异常处理:自动跳过无效URL

爬虫在运行过程中常常会遇到各种异常情况&#xff0c;其中无效URL的出现是较为常见的问题之一。无效URL可能导致爬虫程序崩溃或陷入无限等待状态&#xff0c;严重影响爬虫的稳定性和效率。因此&#xff0c;掌握如何在Python爬虫中自动跳过无效URL的异常处理技巧&#xff0c;对于…

C++语法学习的主要内容

科技特长生方向&#xff0c;主要学习的内容为 一&#xff0c;《C语法》 二&#xff0c;《数据结构》 三&#xff0c;《算法》 四&#xff0c;《计算机基础知识》 五&#xff0c;《初高中的数学知识》 其中&#xff0c;《C语法》学习的主要内容如下: 1,cout输出语句和键盘…

3、孪生网络/连体网络(Siamese Network)

目的: 用Siamese Network (孪生网络) 解决Few-shot learning (小样本学习)。 Siamese Network并不是Meta Learning最好的方法, 但是通过学习Siamese Network,非常有助于理解其他Meta Learning算法。 这里介绍了两种方法:Siamese Network (孪生网络)、Trplet Loss Siam…

从零构建大语言模型全栈开发指南:第二部分:模型架构设计与实现-2.2.1从零编写类GPT-2模型架构(规划模块与代码组织)

👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 2.2.1 从零编写类GPT-2模型架构(规划模块与代码组织)1. 模型架构设计规划1.1 架构核心组件2. 模块化设计实现2.1 输入处理模块2.1.1 分词与嵌入2.1.2 位置编码2.2 解码块设计2.2.1 多头注意力子层2.2.…

消息队列(Kafka及RocketMQ等对比联系)

目录 消息队列 一、为什么使用消息队列&#xff1f;消息队列有什么优点/缺点&#xff1f;介绍下Kafka、ActiveMQ、RabbitMQ、RocketMQ有什么优点缺点&#xff0c;如何取舍&#xff1f; 1.公司业务场景是什么&#xff0c;这个业务场景有什么挑战&#xff0c;如果不用MQ有什么麻…