简述JAVA线程调度的原理,Rxjava原理(二)--线程调度

1. 创建线程池和线程管理策略分析

// 在开发中使用Rxjava来完成线程切换会调用到以下方法(还有几个就不一一列举了,原理一样的),那么就从这里开始分析

Schedulers.io()

Schedulers.computation()

Schedulers.newThread()

AndroidSchedulers.mainThread()

当我们调用以上方法中的任意一个,都会调到Schedulers类中,Schedulers使用策略模式封装了所有线程切换策略(因此后面以io()分析)。

// 1. Schedulers类中,静态创建IOTask(),当调用Schedulers.io()的时候,就是返回这个Callable.

static {

SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

}

// 2.创建IoScheduler

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

}

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

// 3.创建线程池

public IoScheduler(ThreadFactory threadFactory) {

this.threadFactory = threadFactory;

this.pool = new AtomicReference(NONE);

start();

}

public void start() {

// CachedWorkerPool任务池,里面持有任务队列和线程池

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

if (!pool.compareAndSet(NONE, update)) {

update.shutdown();

}

}

//4. CachedWorkerPool构造方法中创建线程池,并且暴露get()提供需要执行的任务

static final class CachedWorkerPool implements Runnable {

private final long keepAliveTime;

private final ConcurrentLinkedQueue expiringWorkerQueue;

final CompositeDisposable allWorkers;

private final ScheduledExecutorService evictorService;

private final Future> evictorTask;

private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {

......

if (unit != null) {

// 创建线程池

evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);

task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);

}

......

}

ThreadWorker get() {

.....

while (!expiringWorkerQueue.isEmpty()) {

// 任务队列不为空,从队列中取一个并返回

ThreadWorker threadWorker = expiringWorkerQueue.poll();

if (threadWorker != null) {

return threadWorker;

}

}

// 如果任务队列是空的,就创建一个并返回

ThreadWorker w = new ThreadWorker(threadFactory);

allWorkers.add(w);

return w;

}

......

}

用一张图可能说明得比较清楚一些。

ce19d5012d66

Schedulers调度过程.png

2. Rxjava上游任务在子线程中执行分析

// 上游线程切换使用过程

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

// ObservableCreate.subscribeOn

.subscribeOn(Schedulers.io())

// ObservableSubscribeOn.subscribe

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

从上面使用过程的代码看下面的图,分析Rxjava封装任务和抛任务到线程池的过程。

ce19d5012d66

上游任务在线程池执行流程图.png

当我们一订阅(调用subscribe(Observer)方法)的时候,Rxjava将会把上游需要执行的任务和下游的观察者经过层层包裹,包裹好之后,就会得到一个Scheduler.Worker任务对象。当调用发射器的onNext的方式的时候,结合第一小节的图片,ObservableSubscribeOn就会将任务抛到线程池执行,在子线程中执行任务并且返回,从而完成线程切换功能。

3. Rxjava下游任务在主线程中执行分析

3.1 创建AndroidSchedulers.mainThread的过程

如第一节Schedulers的创建流程一样,当调用AndroidSchedulers.mainThread()之后,最终会创建HandlerScheduler。

// 1.创建HandlerScheduler,并且传入MainLooper

public final class AndroidSchedulers {

private static final class MainHolder {

// 创建HandlerScheduler

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(

new Callable() {

@Override public Scheduler call() throws Exception {

return MainHolder.DEFAULT;

}

});

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

}

// 2.当创建任务的时候,创建HandlerWorker

final class HandlerScheduler extends Scheduler {

private final Handler handler;

HandlerScheduler(Handler handler) {

this.handler = handler;

}

@Override

public Worker createWorker() {

return new HandlerWorker(handler);

}

}

// 3.当执行任务的时候

private static final class HandlerWorker extends Worker {

private final Handler handler;

HandlerWorker(Handler handler) {

this.handler = handler;

}

@Override

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

......

// 包装任务

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

// 创建Message包装任务

Message message = Message.obtain(handler, scheduled);

message.obj = this;

// 发送任务到MainLooper中,该任务就在主线程中执行了

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

......

return scheduled;

}

}

其实真正将任务放在主线程中执行就是上面三个步骤,但是Rxjava增加了很多其他功能,例如解除订阅(将任务包装在Disposable中),增加hook功能(在任务外面在包装了ScheduledRunnable)等等,其最内层的本质就是我们需要执行的任务。细化的包裹情况如下图:

ce19d5012d66

主线程执行任务.png

4.多个线程切换,以哪个为准

如下面代码,我们作死得切换线程,那么哪些线程会最终执行我们的任务呢

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

.subscribeOn(Schedulers.io()) // 上游切换,靠近上游的生效

.subscribeOn(Schedulers.newThread())

.subscribeOn(Schedulers.computation())

.observeOn(Schedulers.io())

.observeOn(Schedulers.computation())

.observeOn(AndroidSchedulers.mainThread()) // 下游切换,靠近下游的生效

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

我们可以从第二节和第三节看出,当我们每调用一次subscribeOn方法,上游就会多包装一层Scheduler,在订阅之后,解包裹的时候越靠近“待执行任务”的subscribeOn越后解包,所以最靠近任务的subscribeOn调用会是最终被执行,也就是最终被执行的线程。

因此我们可以总结得到:

总结一: 在多次调用线程切换的时候,第一次调用subscribeOn的线程切换会是最后执行任务的线程;最后调用observeOn切换的线程会是最后执行的线程。

总结二:从调用关系来看,越靠近上游的线程切换,将是最终执行任务的线程;越靠近下游的线程切换,将是最终执行任务的线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/394930.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[前端随笔][css] 弹性布局

说在前面 弹性布局&#xff0c;顾名思义就是有弹性&#xff0c;能够根据屏幕/当前空间大小自由伸缩的。使用弹性布局可以很好的适应各种尺寸的客户端。 关键代码 display:flex;    设定元素为弹性布局  <文档传送门> box-flex: 参数;   设定元素为弹性布局  &…

不同的模块中定义同样的宏为不同的值合法吗_如何创建自定义的建模规范

本文摘要&#xff1a;主要介绍如何创建自定义的建模规范检查&#xff0c;以及在建模规范检查中&#xff0c;如何增加自动修正模型使之符合规范。比如我们想创建一个自定义的规则&#xff0c;对于constant模块&#xff0c;1. 如果value是参数的话&#xff0c;则输出数据类型必须…

DBCP连接池配置常用参数说明

参数默认值说明username\传递给JDBC驱动的用于建立连接的用户名password\传递给JDBC驱动的用于建立连接的密码url\传递给JDBC驱动的用于建立连接的URLdriverClassName\使用的JDBC驱动的完整有效的Java 类名initialSize 0初始化连接:连接池启动时创建的初始化连接数量,1.2版本后…

科大讯飞 ai算法挑战赛_为井字游戏挑战构建AI算法

科大讯飞 ai算法挑战赛by Ben Carp通过本卡尔普 为井字游戏挑战构建AI算法 (Building an AI algorithm for the Tic-Tac-Toe challenge) As part of the freeCodeCamp curriculum, I was challenged build a Tic-Tac-Toe web app. It was a real pleasure.作为freeCodeCamp课程…

js serialize php 解,[转]JavaScript 版本的 PHP serialize/unserialize 完整实现

下载: phpserializer.js/* phpserializer.js - JavaScript to PHP serialize / unserialize class.** This class is designed to convert php variables to javascript* and javascript variables to php with a php serialize unserialize* compatible way.** Copyright (C) …

Git 的 .gitignore 配置

.gitignore 配置文件用于配置不需要加入版本管理的文件&#xff0c;配置好该文件可以为我们的版本管理带来很大的便利&#xff0c;以下是个人对于配置 .gitignore 的一些心得。 1、配置语法&#xff1a; 以斜杠“/”开头表示目录&#xff1b; 以星号“*”通配多个字符&#xff…

wsdl文件是怎么生成的_C++ 动态库.dll的生成---超级详细!!!

怎么将建好的工程生成.dll工程&#xff1f;1、在C中打开工程2、运行结果&#xff1a;输出Print修改开始&#xff1a;1、打开属性。2、修改以下内容&#xff1a;目标文件扩展名&#xff0c;由.exe--》.dll,直接删除修改即可配置类型&#xff0c;由.exe--》.dll,下拉菜单可选择最…

时钟设置

date --set"05/31/16 18:16" 时钟设置 设置系统时间# date --set“07/07/06 10:19" &#xff08;月/日/年 时:分:秒&#xff09;2、hwclock/clock查看硬件时# hwclock --show# clock --show设置硬件时间# hwclock --set --date"07/07/06 10:19" &…

《成为一名机器学习工程师》_成为机器学习的拉斐尔·纳达尔

《成为一名机器学习工程师》by Sudharsan Asaithambi通过Sudharsan Asaithambi 成为机器学习的拉斐尔纳达尔 (Become the Rafael Nadal of Machine Learning) One year back, I was a newbie to the world of Machine Learning. I used to get overwhelmed by small decisions…

HTTP基本认证(Basic Authentication)的JAVA示例

大家在登录网站的时候&#xff0c;大部分时候是通过一个表单提交登录信息。但是有时候浏览器会弹出一个登录验证的对话框&#xff0c;如下图&#xff0c;这就是使用HTTP基本认证。下面来看看一看这个认证的工作过程:第一步: 客户端发送http request 给服务器,服务器验证该用户…

php-fpm 内存 facebook,【百家号】脸书百科,安装php-fpm-5.4.16-42.遇到的小问题 Web程序 - 贪吃蛇学院-专业IT技术平台...

环境&#xff1a;redhat 7.2版本 yum源也是7.2的iso[[email protected] lnmp_soft]# yum -y install php-fpm-5.4.16-42.el7.x86_64.rpm已加载插件&#xff1a;langpacks, product-id, search-disabled-repos, subscription-managerThis system is not registered to Red Hat S…

Codeforces Round #424 (Div. 2, rated, based on VK Cup Finals)

昨晚的没来得及打&#xff0c;最近错过好几场CF了&#xff0c;这场应该不算太难 A. Unimodal Arraytime limit per test1 secondmemory limit per test256 megabytesinputstandard inputoutputstandard outputArray of integers is unimodal, if: it is strictly increasing in…

python能print中文吗_python怎么print汉字

今天就为大家分享一篇python中使用print输出中文的方法&#xff0c;具有很好的参考价值&#xff0c;希望对大家有所帮助。看Python简明教程&#xff0c;学习使用print打印字符串&#xff0c;试了下打印中文&#xff0c;不行。&#xff08;推荐学习&#xff1a;Python视频教程&a…

ajax的一些相关

1、AJAX Asynchronous&#xff08;异步的&#xff09; JavaScript and XML AJAX是能不刷新整个网页的前提下&#xff0c;更新内容。通过少量的数据交换&#xff0c;达成局部页面刷新的效果。 而form表单提交经常是刷新整个页面&#xff0c;很繁琐 2、AJAX是基于现有的Internet…

select ...as_一起使用.select .map和.reduce方法可充分利用Ruby

select ...asby Declan Meehan由Declan Meehan 一起使用.select .map和.reduce方法可充分利用Ruby (Get the most out of Ruby by using the .select .map and .reduce methods together) You should absolutely never ever repeat yourself when writing code. In other word…

一些书单

仅对近来的学习做些回顾吧 学习永无止境--> 2015年已完成书单&#xff1a; 文学&#xff1a; 硅谷之火浪潮之巅天才在左疯子在右从0到1生命咖啡馆黑客与画家奇思妙想&#xff1a;15位计算机天才及其重大发现乔布斯传平凡的世界&#xff08;三部全&#xff09;一只iphone的全…

oracle 11gogg,【OGG】Oracle GoldenGate 11g (二) GoldenGate 11g 单向同步配置 上

Oracle GoldenGate 11g (二)GoldenGate 11g 单向同步配置 上ItemSource SystemTarget SystemPlatformRHEL6.4 - 64bitRHEL6.4 - 64bitHostnamerhel64.oracle.comora11g.oracle.comDatabaseOracle 11.2.0.3Oracle 11.2.0.3Character SetAL32UTF8AL32UTF8ORACLE_SIDPRODEMREPList…

今天听说了一个压缩解压整型的方式-group-varint

group varint https://github.com/facebook/folly/blob/master/folly/docs/GroupVarint.md 这个是facebook的实现 https://www.slideshare.net/parallellabs/building-software-systems-at-google-and-lessons-learned/48-Group_Varint_Encoding_Idea_encode

Centos7-卸载自带的jdk 安装jdk8

卸载JDK Centos7一般都会带有自己的openjdk,我们一般都回用oracle的jdk,所以要卸载 步骤一&#xff1a;查询系统是否以安装jdk #rpm -qa|grep java 或 #rpm -qa|grep jdk 或 #rpm -qa|grep gcj 步骤二&#xff1a;卸载已安装的jdk #rpm -e --nodeps java-1.8.0-openjdk…

小猪佩奇python_python画个小猪佩奇

#!/usr/bin/python #-*- coding: utf-8 -*-import turtleast def nose(x,y):#鼻子 t.pu() t.goto(x,y) t.pd() t.seth(-30) t.begin_fill() a0.4 for i in range(120):if 0<i<30 or 60<i<90: aa0.08t.lt(3) #向左转3度 t.fd(a) #向前走a的步长else: aa-0.08t.lt(3)…