Rust多线程:Worker 结构体与线程池中任务的传递机制

本文分享自天翼云开发者社区《Rust多线程:Worker 结构体与线程池中任务的传递机制》,作者:l****n

Rust多线程:Worker 结构体与线程池中任务的传递机制

**在实现一个多线程的 Web 服务器时,我们会遇到一个问题:如何在创建线程之后让它们在没有任务时保持等待状态,并且在任务到来时可以立即执行。这是一个典型的线程池设计问题。在 Rust 中,我们需要通过自定义设计来实现这个功能,因为标准库中的 **thread::spawn 并不直接支持这种用法。

问题描述

**Rust 的 **thread::spawn 方法会立即执行传入的闭包。如果我们想要在线程池中创建线程并让它们等待任务(即在创建时不执行任何任务),我们就需要自己设计一种机制,能够在稍后将任务传递给这些已经创建好的线程。

解决方案:引入 Worker 结构体

**为了解决这个问题,我们引入了一个 **Worker 结构体来管理线程池中的每个线程。Worker 的作用类似于一个工人,它等待任务的到来并在接收到任务时执行。

1. Worker 结构体的定义

Worker 结构体包含两个字段:

  • id:用于标识每个 Worker
  • thread:存放线程的 JoinHandle<()>,它是由 thread::spawn 返回的。

代码如下:

struct Worker {id: usize,thread: thread::JoinHandle<()>,
}
2. 创建 Worker 实例

**为了让 **Worker 在没有任务时处于等待状态,我们可以在 Worker::new 函数中使用 thread::spawn 创建线程,并传入一个空的闭包:

impl Worker {fn new(id: usize) -> Worker {let thread = thread::spawn(|| {});
​Worker { id, thread }}
}

**在这里,我们创建了一个 **Worker 实例,每个 Worker 都会启动一个线程。但这个线程目前还什么都不做,因为我们传递给 spawn 的闭包是空的。

3. 将 Worker 集成到线程池中

**接下来,我们修改 **ThreadPool 的实现,使其存储 Worker 的实例而不是直接存储线程的 JoinHandle<()>。在 ThreadPool::new 中,我们使用一个 for 循环创建多个 Worker 实例,并将它们存储在一个 Vec<Worker> 中:

pub struct ThreadPool {workers: Vec<Worker>,
}
​
impl ThreadPool {pub fn new(size: usize) -> ThreadPool {assert!(size > 0);
​let mut workers = Vec::with_capacity(size);
​for id in 0..size {workers.push(Worker::new(id));}
​ThreadPool { workers }}
}

**这样,我们就为线程池创建了一个由多个 **Worker 组成的集合。每个 Worker 都有一个唯一的 ID,并且都启动了一个线程,虽然这些线程目前还没有执行任何有用的任务。

向 Worker 发送任务

现在,我们解决了创建线程并让它们等待任务的问题。接下来,我们需要设计一个机制,使得线程池能够在任务到来时将任务发送给等待中的线程。

1. 使用信道传递任务

**在 Rust 中,信道(channel)是一种非常适合在线程之间传递数据的工具。我们可以使用一个信道来传递任务。线程池会创建一个信道的发送端,每个 **Worker 会拥有信道的接收端。任务通过信道从线程池传递到 Worker,再由 Worker 中的线程执行。

use std::{sync::mpsc, thread};
​
pub struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}
​
struct Job;
​
impl ThreadPool {pub fn new(size: usize) -> ThreadPool {assert!(size > 0);
​let (sender, receiver) = mpsc::channel();
​let mut workers = Vec::with_capacity(size);
​for id in 0..size {workers.push(Worker::new(id));}
​ThreadPool { workers, sender }}
}
2. Worker 处理任务

**为了让 **Worker 能够处理任务,我们将信道的接收端传递给每个 Worker 的线程。线程会不断地从信道中接收任务,并执行这些任务。

impl Worker {fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {let thread = thread::spawn(move || {receiver;});
​Worker { id, thread }}
}

**不过在这段代码中,存在一个问题:信道的接收端 **receiver 被移交给了第一个 Worker,导致无法将其传递给其他 Worker

3. 使用 Arc 和 Mutex 共享接收端

**为了解决这个问题,我们需要使用 **Arc<Mutex<T>> 来共享信道的接收端,这样所有的 Worker 都可以安全地从同一个信道接收任务:

use std::{sync::{mpsc, Arc, Mutex}, thread};
​
type Job = Box<dyn FnOnce() + Send + 'static>;
​
impl ThreadPool {pub fn new(size: usize) -> ThreadPool {assert!(size > 0);
​let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));
​let mut workers = Vec::with_capacity(size);
​for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}
​ThreadPool { workers, sender }}pub fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);
​self.sender.send(job).unwrap();}
}
​
impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();
​println!("Worker {id} got a job; executing.");
​job();});
​Worker { id, thread }}
}

**在 **Worker::new 中,线程会不断地尝试获取锁来接收任务,并在收到任务后执行。这里我们使用了 Arc 来共享接收端,使用 Mutex 来确保一次只有一个 Worker 能够接收任务。

type Job = Box<dyn FnOnce() + Send + 'static>;

**这行代码定义了一个类型别名 **Job。它代表了一个特定的任务类型:

  • Box<dyn FnOnce() + Send + 'static> 是一个动态分发的闭包(或函数),其具体实现类型在编译时不确定。Box 是一个堆分配的智能指针,用于将闭包存储在堆上。
  • dyn FnOnce() 表示这个闭包实现了 FnOnce trait,可以被调用一次。
  • Send 表示这个闭包可以在线程之间安全地传递。
  • 'static 表示闭包的生命周期是整个程序的生命周期,确保闭包在多个线程中可以安全使用。
execute 方法

**这个方法的功能是将一个新的任务(闭包)添加到线程池的任务队列中,以供线程池中的工作线程执行。下面是对 **F: FnOnce() + Send + 'static 的解释:

  • F: FnOnce() + Send + 'static
    

    ** 是一个泛型约束,表示必须是一个实现了 FnOnce、Send和 'static的闭包类型。**

    • FnOnce() 确保闭包可以被调用一次。
    • Send 确保闭包可以安全地在线程之间传递。
    • 'static 确保闭包的生命周期足够长,可以在整个程序运行期间有效。

**在 **execute 方法中,你将传入的闭包 f 转换成 Job 类型(即 Box<dyn FnOnce() + Send + 'static>),然后通过 self.sender 将其发送到任务队列中。这使得线程池的工作线程可以从队列中接收并执行这些任务。

总结

**通过引入 **Worker 结构体并使用信道进行任务传递,我们成功地实现了一个可以延迟分配任务的线程池。每个 Worker 都是在创建时启动的,但它们会等待任务的到来,只有在接收到任务后才会开始执行。这种设计不仅提高了服务器的吞吐量,还确保了线程资源的高效利用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/907380.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day10-AI短视频01

今日内容 1 AI 短视频介绍 1.1 AI短视频是什么 # 1 AI 短视频是指通过人工智能技术(尤其是生成式 AI、智能剪辑算法等)自动或辅助完成制作的短视频内容,其核心是利用 AI 工具替代或简化传统短视频创作中 “内容生成…

详细介绍:今日分享 KMP算法

详细介绍:今日分享 KMP算法pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco",…

【每日算法】两数相加 LeetCode - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

04-简单查询

04-简单查询$(".postTitle2").removeClass("postTitle2").addClass("singleposttitle");查询单个字段 案例1:查询公司中所有员工编号案例2:查询公司中所有员工姓名查多个字段 案例1:…

MSS 到底是什么?Wireshark 分析TCP过程 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

线性回归与 Softmax 回归核心内容总结 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

P6631 [ZJOI2020] 序列 题解

很好的贪心题。 考虑从左到右枚举每个位置,每次在右边添加一个数时更行答案。 容易想到记录当前前缀可以继续向右延伸的 \(1,2,3\) 操作的个数。记当前需要添加的数为 \(i\),用 \(c,x,y\) 分别表示可以继续向右延伸(…

MacCAD2019.dmg 安装包使用教程|Mac电脑安装CAD2019全流程

MacCAD2019.dmg 安装包使用教程|Mac电脑安装CAD2019全流程​一、准备工作​下载文件​ 确保你已经下载了 MacCAD2019.dmg这个安装包,一般是个压缩镜像文件。安装包下载:https://pan.quark.cn/s/3efc4d51839c二、开始…

初始化一个rust环境

初始化一个rust环境初始化一个rust环境 rust 安装工具依赖入门 - Rust 程序设计语言一门帮助每个人构建可靠且高效软件的语言。 https://www.rust-lang.org/zh-CN/learn/get-startedrustup 环境变量 RUSTUP_DIST_SERVE…

编程里边有好多不容易触及的知识点

可能是因为不是原生土长的内容,编程里边有很多没有办法合情理解的领域。框架是其中一个,类似的还有 窗口库,还有好多其它东西。以前刚开始学窗口的时候,总是觉得没办法进行。周围的同学大概都是 拿过来用就可以,创…

PostgreSQL repmgr 高可用之故障转移

PostgreSQL repmgr 高可用之故障转移PostgreSQL高可用之repmgr自动切换 之前写过一个repmgr的高可用搭建的,https://www.cnblogs.com/wy123/p/18531710,repmgr的搭建过程还是比较简单的,具体过程不再赘述。这里为了…

25.9.18随笔联考总结

考试 通读题面,发现前两道是签。然后开做,饭堂,最后花费大部分时间过掉。后面两道题都不会。寄寄。 估计:100+100+0+0。实际:100+100+0+0。 有人藏分,素质有待提高! 改题+总结 T3 需要看出无限制的方案数对应卡…

P3642 [APIO2016] 烟花表演 解题报告

简要题意 给定一颗有根树,边有边权。你可以花费 \(1\) 的代价使任意一条边的边权减一或加一。询问使所有叶子到根的距离相等的最小代价。 分析 首先看上去就很 dp,于是考虑状态设计。设 \(f_{u,i}\) 表示使 \(u\) 子…

Manim实现闪光轨迹特效

在动画制作中,轨迹特效常常用于增强视觉效果,而带有闪光效果的轨迹更是能够吸引观众的注意力。 本文将介绍如何使用Manim动画库实现闪光轨迹特效。 1. 实现原理 下面的GlowingTracedPath类参考了Manim中的TracePath类…

Slope Trick 学习笔记

前言 诚然,虽然它名字里带了"Slope",但是它不是斜率优化,而是一个比它还要难的东西(作者本人主观臆断)。 并且,关于 CF13C,有一点很多文章都没有提及,所以会有人看不懂为什么要这么做(作者本人亲身…

使用 libaudioclient 实现 Android Native层 音频测试工具

libaudioclient 除了支持 setAudioPortConfig() 调用,也支持 setMasterMute()、setStreamMute()、setParameters()、getParameters()、setMode() 等接口调用,满足各种开发测试需求。它让你不需要关注这些细枝末节的差…

03-初始化测试数据

03-初始化测试数据$(".postTitle2").removeClass("postTitle2").addClass("singleposttitle");显示所有数据库 show databases;创建数据库 create database testdb;使用数据库 use test…

漏洞详解--文件上传 如何花样绕过?!

一、漏洞原理 1.1 核心 文件上传漏洞,顾名思义,将攻击者将恶意文件上传到服务器,服务器将恶意文件解析,攻击就达成了。 1.2 漏洞详解 文件上传漏洞非常好理解,有三个关键点,一是上传文件,二是找到文件上传的路径…

深入解析:AI Agent开发秘籍:Prompt工程与测评最佳实践(建议收藏反复研读)

深入解析:AI Agent开发秘籍:Prompt工程与测评最佳实践(建议收藏反复研读)2025-09-18 19:18 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-…

使用Windows客户端访问EDA环境的NFS共享

在IC设计环境中, 也总是会有Windows操作系统的开发服务器或者客户端需要使用IC设计平台中Linux主机使用的NFS服务器。 使用者也是IC设计团队中的一员,可能出于设计工具的原因, 他/她的部分工作必须在Windows中完成,…