聊聊并发——生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

生产者消费者模式实战

我和同事一起利用业余时间开发的Yuna工具中使用了生产者和消费者模式。首先我先介绍下Yuna工具,在阿里巴巴很多同事都喜欢通过邮件分享技术文章,因为通过邮件分享很方便,同学们在网上看到好的技术文章,复制粘贴发送就完成了分享,但是我们发现技术文章不能沉淀下来,对于新来的同学看不到以前分享的技术文章,大家也很难找到以前分享过的技术文章。为了解决这问题,我们开发了Yuna工具。Yuna取名自我喜欢的一款游戏最终幻想里的女主角。

首先我们申请了一个专门用来收集分享邮件的邮箱,比如share@alibaba.com,同学将分享的文章发送到这个邮箱,让同学们每次都抄送到这个邮箱肯定很麻烦,所以我们的做法是将这个邮箱地址放在部门邮件列表里,所以分享的同学只需要象以前一样向整个部门分享文章就行,Yuna工具通过读取邮件服务器里该邮箱的邮件,把所有分享的邮件下载下来,包括邮件的附件,图片,和邮件回复,我们可能会从这个邮箱里下载到一些非分享的文章,所以我们要求分享的邮件标题必须带有一个关键字,比如[内贸技术分享],下载完邮件之后,通过confluence的web service接口,把文章插入到confluence里,这样新同事就可以在confluence里看以前分享过的文章,并且Yuna工具还可以自动把文章进行分类和归档。

为了快速上线该功能,当时我们花了三天业余时间快速开发了Yuna1.0版本。在1.0版本中我并没有使用生产者消费模式,而是使用单线程来处理,因为当时只需要处理我们一个部门的邮件,所以单线程明显够用,整个过程是串行执行的。在一个线程里,程序先抽取全部的邮件,转化为文章对象,然后添加全部的文章,最后删除抽取过的邮件。代码如下:

public void extract() {logger.debug("开始" + getExtractorName() + "。。");//抽取邮件List<Article> articles = extractEmail();//添加文章for (Article article : articles) {addArticleOrComment(article);}//清空邮件cleanEmail();logger.debug("完成" + getExtractorName() + "。。");}

Yuna工具在推广后,越来越多的部门使用这个工具,处理的时间越来越慢,Yuna是每隔5分钟进行一次抽取的,而当邮件多的时候一次处理可能就花了几分钟,于是我在Yuna2.0版本里使用了生产者消费者模式来处理邮件,首先生产者线程按一定的规则去邮件系统里抽取邮件,然后存放在阻塞队列里,消费者从阻塞队列里取出文章后插入到conflunce里。代码如下:

public class QuickEmailToWikiExtractor extends AbstractExtractor {private ThreadPoolExecutor      threadsPool;private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;public QuickEmailToWikiExtractor() {emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>();int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000));}public void extract() {logger.debug("开始" + getExtractorName() + "。。");long start = System.currentTimeMillis();//抽取所有邮件放到队列里new ExtractEmailTask().start();// 把队列里的文章插入到WikiinsertToWiki();long end = System.currentTimeMillis();double cost = (end - start) / 1000;logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒");}/*** 把队列里的文章插入到Wiki*/private void insertToWiki() {//登录wiki,每间隔一段时间需要登录一次confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);while (true) {//2秒内取不到就退出ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);if (email == null) {break;}threadsPool.submit(new insertToWikiTask(email));}}protected List<Article> extractEmail() {List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails();if (allEmails == null) {return null;}for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) {emailQueue.offer(exchangeEmailShallowDTO);}return null;}/*** 抽取邮件任务* * @author tengfei.fangtf*/public class ExtractEmailTask extends Thread {public void run() {extractEmail();}}
}

多生产者和多消费者场景

在多核时代,多线程并发处理速度比单线程处理速度更快,所以我们可以使用多个线程来生产数据,同样可以使用多个消费线程来消费数据。而更复杂的情况是,消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它又要作为生产者把数据放在新的队列里,交给其他消费者继续处理。如下图:

我们在一个长连接服务器中使用了这种模式,生产者1负责将所有客户端发送的消息存放在阻塞队列1里,消费者1从队列里读消息,然后通过消息ID进行hash得到N个队列中的一个,然后根据编号将消息存放在到不同的队列里,每个阻塞队列会分配一个线程来消费阻塞队列里的数据。如果消费者2无法消费消息,就将消息再抛回到阻塞队列1中,交给其他消费者处理。

以下是消息总队列的代码;

/*** 总消息队列管理* * @author tengfei.fangtf*/
public class MsgQueueManager implements IMsgQueue{private static final Logger              LOGGER             = LoggerFactory.getLogger(MsgQueueManager.class);/*** 消息总队列*/public final BlockingQueue<Message> messageQueue;private MsgQueueManager() {messageQueue = new LinkedTransferQueue<Message>();}public void put(Message msg) {try {messageQueue.put(msg);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}public Message take() {try {return messageQueue.take();} catch (InterruptedException e) {Thread.currentThread().interrupt();}return null;}}

启动一个消息分发线程。在这个线程里子队列自动去总队列里获取消息。

/*** 分发消息,负责把消息从大队列塞到小队列里* * @author tengfei.fangtf*/static class DispatchMessageTask implements Runnable {@Overridepublic void run() {BlockingQueue<Message> subQueue;for (;;) {//如果没有数据,则阻塞在这里Message msg = MsgQueueFactory.getMessageQueue().take();//如果为空,则表示没有Session机器连接上来,
需要等待,直到有Session机器连接上来while ((subQueue = getInstance().getSubQueue()) == null) {try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}//把消息放到小队列里try {subQueue.put(msg);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}

使用Hash算法获取一个子队列。

/*** 均衡获取一个子队列。* * @return*/public BlockingQueue<Message> getSubQueue() {int errorCount = 0;for (;;) {if (subMsgQueues.isEmpty()) {return null;}int index = (int) (System.nanoTime() % subMsgQueues.size());try {return subMsgQueues.get(index);} catch (Exception e) {//出现错误表示,在获取队列大小之后,队列进行了一次删除操作LOGGER.error("获取子队列出现错误", e);if ((++errorCount) < 3) {continue;}}}}

使用的时候我们只需要往总队列里发消息。

//往消息队列里添加一条消息IMsgQueue messageQueue = MsgQueueFactory.getMessageQueue();Packet msg = Packet.createPacket(Packet64FrameType.
TYPE_DATA, "{}".getBytes(), (short) 1);messageQueue.put(msg);

小结

本章讲解了生产者消费者模式,并给出了实例。读者可以在平时的工作中思考下哪些场景可以使用生产者消费者模式,我相信这种场景应该非常之多,特别是需要处理任务时间比较长的场景,比如上传附件并处理,用户把文件上传到系统后,系统把文件丢到队列里,然后立刻返回告诉用户上传成功,最后消费者再去队列里取出文件处理。比如调用一个远程接口查询数据,如果远程服务接口查询时需要几十秒的时间,那么它可以提供一个申请查询的接口,这个接口把要申请查询任务放数据库中,然后该接口立刻返回。然后服务器端用线程轮询并获取申请任务进行处理,处理完之后发消息给调用方,让调用方再来调用另外一个接口拿数据。

另外Java中的线程池类其实就是一种生产者和消费者模式的实现方式,但是实现方法更高明。生产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞队列里,这种做法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明很多,因为消费者能够处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些。

我们的系统也可以使用线程池来实现多生产者消费者模式。比如创建N个不同规模的Java线程池来处理不同性质的任务,比如线程池1将数据读到内存之后,交给线程池2里的线程继续处理压缩数据。线程池1主要处理IO密集型任务,线程池2主要处理CPU密集型任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/447731.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多个 VUE 前端工程部署设置、nginx 代理配置

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 PS&#xff1a;早期 我只有一个 VUE 前端工程&#xff1a;gentle-vue &#xff0c;加一个 java 后端工程&#xff1a;gentle &#xff0…

FreeSql (二十六)贪婪加载 Include、IncludeMany、Dto、ToList

贪婪加载顾名思议就是把所有要加载的东西一次性读取。 本节内容为了配合【延时加载】而诞生&#xff0c;贪婪加载和他本该在一起介绍&#xff0c;开发项目的过程中应该双管齐下&#xff0c;才能写出高质量的程序。 Dto 映射查询 Select<Tag>().Limit(10).ToList(a > n…

FreeSql (二十七)将已写好的 SQL 语句,与实体类映射进行二次查询

有时候&#xff0c;我们希望将写好的 sql 语句&#xff0c;甚至是存储过程进行查询&#xff0c;虽然效率不高&#xff08;有时候并不是效率至上&#xff09;。 巧用AsTable var sql fsql.Select<UserX>().AsTable((a, b) > "(select * from user where clicks &…

解决: 网站访问报错 AccessDenied (阿里云 OSS + CDN )

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 1. 使用阿里云 OSS服务 CDN 服务后&#xff0c;直接用顶级域名访问个人站点失败&#xff0c;报错如下&#xff1a; <Code>Acces…

二十一世纪Windows简史

摘要&#xff1a;Windows拥有者超过90%的消费级操作系统市场份额&#xff0c;处于龙头老大的位置&#xff0c;那其成长的故事是怎么的&#xff1f;ZDNet总结了21世纪Windows的发展史&#xff0c;以及围绕微软操作系统发生的事情&#xff0c;不妨一看。 据微软4月26日的官方通知…

FreeSql (二十八)事务

FreeSql实现了四种数据库事务的使用方法&#xff0c;脏读等事务相关方法暂时未提供。主要原因系这些方法各大数据库、甚至引擎的事务级别五花八门较难统一。 事务用于处理数据的一致性&#xff0c;处于同一个事务中的操作是一个UnitOfWork&#xff0c;要么全部执行成功&#xf…

VUE 项目中引入 json 配置

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 这个写法还是第一次见到&#xff0c;也不知道是否还有什么环境配置&#xff0c;记录一下&#xff0c;或许以后什么时候也可以参考&#…

新手课堂之汽车灯光操作及位置

驾考科目三模拟夜间灯光操作你了解多少&#xff1f;汽车灯光该如何操作&#xff1f;下面我们随众悦学车网编辑一起来看看吧&#xff01; 科目三考试中&#xff0c;模拟夜间灯光使用是每个学员都要参加的一项考试&#xff0c;那么&#xff0c;汽车灯光包括些什么灯呢&#xff1f…

FreeSql (二十九)Lambda 表达式

FreeSql 支持功能丰富的表达式函数解析&#xff0c;方便程序员在不了解数据库函数的情况下编写代码。这是 FreeSql 非常特色的功能之一&#xff0c;深入细化函数解析尽量做到满意&#xff0c;所支持的类型基本都可以使用对应的表达式函数&#xff0c;例如 日期、字符串、IN查询…

Spring注解 @Qualifier 说明、用法

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 -------------------------------------------- 我是分隔线 --------------------------------------------------- Qualifier&#xf…

科目三中模拟灯光使用考试常见的错误 广州学车网光大国际驾校学车

夜间驾驶虽是 驾照考试中抽选的内容&#xff0c;但科目三中模拟灯光使用考试&#xff0c;还是要了解一下的。以下列出了考试中常见的错误。 1.前照灯非远光状态&#xff0c;听到“请将前照灯变换成远光”指令&#xff0c;不变换或变换错误的或者前照灯在远光状态下&#xff0c;…

FreeSql (三十)读写分离

FreeSql 支持数据库读写分离&#xff0c;本功能是客户端的读写分离行为&#xff0c;数据库服务器该怎么配置仍然那样配置&#xff0c;不受本功能影响&#xff0c;为了方便描术后面讲到的【读写分离】都是指客户端的功能支持。 各种数据库的读写方案不一&#xff0c;数据库端开启…

把 excel 表中的数据 批量修改到指定数据库表中、根据 excel 表中数据修改数据库表中数据

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 1. 收到一张 excel 表&#xff0c;要求根据 “转账时间”一列的值批量修改数据库表中 "放款时间"一列的值。 2. 写出 sql 模…

科​目​三​智​能​考​试​系​统​实​际​道​路​考​试​项​目​评​判​标​准

科目三智能考试系统是将公安部实际道路考试项目评判标准加以量化&#xff0c;重点考学员的驾驶技能、安全意识及文明驾车的理念&#xff0c;并将其融入 各个考试项目之中。 二考试技术指标 1、上车准备 考试开始后&#xff0c;学员应当根据考试员发出的“上车准备”指令后…

FreeSql (三十一)分区分表

分区 分区就是把一个数据表的文件和索引分散存储在不同的物理文件中。把一张表的数据分成N多个区块&#xff0c;这些区块可以在同一个磁盘上&#xff0c;也可以在不同的磁盘上&#xff0c;数据库不同实现方式有所不同。 与分表不同&#xff0c;一张大表进行分区后&#xff0c;他…

FreeSql (三十二)Aop

FreeSql AOP 已有的功能介绍&#xff0c;未来为会根据用户需求不断增强。 审计 CRUD 马云说过&#xff0c;996是修福报。对于多数程序员来说&#xff0c;加班是好事。。。起码不是闲人&#xff0c;不会下岗。 当如果因为某个 sql 骚操作耗时很高&#xff0c;没有一个相关的审计…

SpringMvc 注解 @InitBinder 表单多对象精准绑定接收

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 InitBinder用于在Controller中标注于方法&#xff0c;表示为当前控制器注册一个属性编辑器或者其他&#xff0c;只对当前的Controller有…

2014年科目三智能化考试十大必知事项

一、模拟夜考灯光使用 1&#xff0c;夜间通过急转弯/坡道/拱桥/人行横道/或没有交通信号灯控制的路口&#xff1b; 灯光使用&#xff1a;交替使用远近光(变光2次以上) 2&#xff0c;夜间在窄路窄桥与非机动车会车 灯光使用&#xff1a;近光灯 3&#xff0c;夜间在道路上发生故障…

SpringMVC注解 @initbinder 解决类型转换问题

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到教程。 使用 SpringMVC 时&#xff0c;常遇到表单中日期字符串和 JavaBean 的 Date 类型的转换&#xff0c;而 SpringMVC 默认不支持这个格式的…

看了就彻底明白人生!!!

出生一张纸&#xff0c;开始一辈子&#xff1b; 毕业一张纸&#xff0c;奋斗一辈子&#xff1b; 婚姻一张纸&#xff0c;折磨一辈子&#xff1b; 做官一张纸&#xff0c;斗争一辈子&#xff1b; 金钱一张纸&#xff0c;辛苦一辈子&#xff1b; 荣誉一张纸&#xff0c;虚名一辈子…