Android RxJava框架分析:它的执行流程是如何的?它的线程是如何切换的?如何自定义RxJava操作符?

目录

  1. RxJava是什么?为什么使用。
  2. RxJava是如何使用的呢?
  3. RxJava如何和Retrofit一起使用。
  4. RxJava源码分析。
  • (1)他执行流程是如何的。
  • (2)map
  • (3)线程的切换。
  1. 如何自定义RxJava操作符?

一、RxJava是什么?为什么使用

RxJava 是一个基于 ​​响应式编程范式​​ 的库,用于通过​​观察者模式​​和​​链式操作符​​,简化异步、事件驱动、多线程数据流处理的开发。

简单来说,RxJava 就像是一个​​“流水线工厂”​​,专门处理需要等待的任务(比如网络请求、数据库查询、复杂计算等)。它能把这些任务串成一条流水线,每个环节处理完数据后,自动传给下一个环节,还能灵活控制任务在哪个线程执行(比如后台线程干活,主线程更新UI)。

1.1 为什么要使用RxJava呢?

接下来,我们看看不使用的问题,以网络请求为例

需求​​:按顺序做三件事(登录 → 查询订单 → 更新UI)。

​传统写法​​:​​“回调地狱”​​,层层嵌套,像俄罗斯套娃!

// 伪代码:传统嵌套回调(问题代码)
api.login(new Callback<User>() {@Overridepublic void onSuccess(User user) {api.getOrders(user.getId(), new Callback<List<Order>>() {@Overridepublic void onSuccess(List<Order> orders) {runOnUiThread(() -> {showOrders(orders); // 切主线程更新UI});}@Overridepublic void onFailure(Throwable error) {showError(error); // 每个回调都要处理错误!}});}@Overridepublic void onFailure(Throwable error) {showError(error);}
});

​问题总结​​:

  • ​代码缩进成“金字塔”​​,维护困难。
  • ​重复处理错误​​,每个回调都要写 onFailure
  • ​手动切换线程​​(如 runOnUiThread),容易遗漏。

​RxJava写法​​:

// RxJava 链式调用(解决方案)
api.rxLogin()                   // 1. 登录(被观察者).flatMap(user -> api.rxGetOrders(user.getId())) // 2. 查询订单(操作符).observeOn(AndroidSchedulers.mainThread()) // 3. 切到主线程.subscribe(orders -> showOrders(orders), // 4. 观察者消费数据error -> showError(error));   // 统一错误处理!

优势​​:
1️ ​​代码变“直线”​​,逻辑清晰。
2️ ​​统一错误处理​​,一个 onError 搞定所有。
3️ ​​自动线程切换​​,不用写 runOnUiThread


二、RxJava是如何使用的呢?

(1)添加依赖

implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2' // Android 需要

(2)使用

Observable.just("你好").subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe: 订阅开始");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext: 拿到事件"+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: 错误事件");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: 事件完成");}});

在这里插入图片描述

解释一下执行流程:

(1)首先我们先需要知道几个角色:观察者( Observer),被观察者( Observable)。

(2)当被观察者发送出数据(调用just方法)你好的时候,那么观察者就会收到消息(subscribe方法就是订阅)。

(3) subscribe() 将观察者与被观察者连接,触发 Observable 开始发射数据,Observer接收并处理事件(数据、错误、完成信号)。

(4)Observer的 onSubscribe 方法订阅时立即调用​​(最先执行)。 通知观察者订阅已建立。

(5) onNext 方法Observable 发射数据时调用。

(6) onComplete() 方法Observable ​​正常完成数据发射​​后调用 。onError 反之。


三、RxJava如何和Retrofit一起使用

其实就是将Retrofit的响应结果交给RxJava来处理

3.1 发起一个请求

(1)我们需要在Retrofit这里配置 RxJava适配器

public class RetrofitClient {private static final String BASE_URL = "https://www.wanandroid.com/";private static Retrofit retrofit;public static WanAndroidService getService() {if (retrofit == null) {retrofit = new Retrofit.Builder().baseUrl(BASE_URL).addConverterFactory(GsonConverterFactory.create()) // Gson 解析.addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // RxJava 支持.build();}return retrofit.create(WanAndroidService.class);}
}

这行代码的作用是 ​​让 Retrofit 支持返回 RxJava 3 的响应式类型​​(如 ObservableFlowableSingle 等),使得网络请求的结果可以直接通过 RxJava 的流式操作符处理。

(2)在接口这里,我们也是使用Observable来接收。

public interface WanAndroidService {// 示例1:登录接口(POST)@FormUrlEncoded@POST("/user/login")Observable<ApiResponse<User>> login(@Field("username") String username,@Field("password") String password);// 示例2:获取首页文章列表(GET)@GET("/article/list/{page}/json")Observable<ApiResponse<List<Article>>> getHomeArticles(@Path("page") int page);
}

(3)调用接口

private void login(String username, String password) {WanAndroidService service = RetrofitClient.getService();service.login(username, password).subscribeOn(Schedulers.io()) // 在IO线程发起请求.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 统一管理订阅Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(@NonNull ApiResponse<User> response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {// 网络错误Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {// 请求完成Log.d(TAG, "onComplete: ");}});
}

执行逻辑:

  1. service.login(username, password)​:
    通过 Retrofit 定义的接口发起网络请求,返回一个 Observable<ApiResponse<User>>。注意 ​​此时网络请求尚未执行​​,只是定义了数据源。
  2. .subscribeOn(Schedulers.io())​:
    指定 Observable 的工作线程为 ​​IO 线程​
  3. observeOn(AndroidSchedulers.mainThread())​:
    指定 Observer 的回调方法(onNextonErroronComplete)在 ​​主线程​​ 执行。
  4. .subscribe(Observer)​:
    订阅 Observable,触发网络请求执行,并绑定观察者处理结果。 ​​此时网络请求正式启动​​。

3.2 发起两个请求,网络嵌套

需求:先登录,登录成功后再获取用户的收藏列表。

// 获取收藏列表 (GET, 需要登录态)
@GET("/lg/collect/list/{page}/json")
Observable<CollectListResponse> getCollectList(@Path("page") int page
);
private void nestedNetworkRequest() {WanAndroidService service = RetrofitClient.getService();service.login("xxx", "xxx").flatMap(loginResponse -> {Log.d(TAG, "nestedNetworkRequest: "+loginResponse);// 登录成功后,获取收藏列表if (loginResponse.getErrorCode() == 0) {return service.getCollectList(0); // 第0页} else {return Observable.error(new Throwable("登录失败"));}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<CollectListResponse>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull CollectListResponse response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: "+e.getMessage());}@Overridepublic void onComplete() {// 请求完成}});
}

flatMap方法是当 login 请求成功返回数据后才会调用。

在这里插入图片描述

四、源码分析:

4.1 先从Observer(观察者)开始

里面会有一个泛型,onNext也是使用这个泛型。

在这里插入图片描述

4.2 Observable(被观察者)

(1)调用create方法的时候,就会创建一个ObservableCreate
在这里插入图片描述

ObservableCreate里面,包裹了source,按照上面的例子,就类似于调用了我们的login方法,将要发送的请求包裹起来。那么包裹起来干嘛?因为他不是现在执行的,我们都知道,需要调用订阅,才会触发整个流程执行。

4.3 Observable的subscribe订阅过程

在这里插入图片描述

​订阅发生​​:调用 subscribe() 时,触发 subscribeActual

在这里插入图片描述

进来以后,我们就可以看到,先执行了我们的onSubscribe方法,然后再去执行source,source就是我们前面说的,将请求包裹起来的内容。

我们看一个非常原始的代码。
在这里插入图片描述

然后再里面调用了onNext,就是执行了观察者的onNext方法,然后执行onCompleter那么到这里,整个执行流程就结束了。

4.4 Map变换操作符原理

为什么map可以改变onnext的接收类型呢?我们继续看看。

在这里插入图片描述

可以看到,这里的类型就进行了转换。但是为什么观察者也跟着变了呢?
在这里插入图片描述

在这里的时候,就已经处理。 返回一个R类型的Observable实例,那么T也就变成了R。

4.5 异步线程切换:subscribeOn(Schedulers.io())

我们拿一个代码来进行分析。

service.login(username, password).subscribeOn(Schedulers.io()) // 在IO线程发起请求.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 统一管理订阅Log.d(TAG, "onSubscribe: ");}...}
}

subscribeOn接收一个Scheduler参数,Scheduler类它封装了线程池和线程切换逻辑。

在这里插入图片描述

在这里插入图片描述

那么Schedulers.io()做了什么?Schedulers.io()​是一种策略,他会将内部的线程池配置成IO密集型的。我们会发现里面有很多种策略。

在这里插入图片描述

那么我们看看subscribeOn做了什么呢?他拿到线程池以后,做什么呢?先保存起来。我们先记住subscribeOn返回了一个ObservableSubscribeOn类
在这里插入图片描述

因为我们知道任务需要靠订阅方法才触发的,所以我们来看看ObservableSubscribeOn中的
subscribeActual订阅方法
在这里插入图片描述

scheduler.scheduleDirect

在这里插入图片描述

createWorker是一个抽象方法,因为前面我们配置的是Schedulers.io(),所以打开IoScheduler的createWorker
,然后会调用schedult方法执行现成。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

所以我们的任务就被异步线程执行了。

4.5 主线程切换

那么他如何从异步线程,又切换回主线程的?

.observeOn(AndroidSchedulers.mainThread())

AndroidSchedulers.mainThread()也是一个Scheduler,这里就不多介绍了。我们主要看看他返回的Scheduler

在这里插入图片描述

最终切换主线程,还是使用到了handler

在这里插入图片描述

我们记住HandlerScheduler类

在这里插入图片描述

我们来到observeOn方法

在这里插入图片描述

所以,然后最终交给了HandlerScheduler类来执行。

好了,到这里,源码分析就结束了。

那么我们看源码是为了什么?我们可以自定义RxJava操作符来玩玩,也会让我们对前面学习更加的通透理解。

五、自定义RxJava操作符

我们之定义RxJava操作符,并不是说我们自己实现Observable,而是继承他去实现一些功能。

我们看过just方法就知道,其实继承了Observable
在这里插入图片描述

在这里插入图片描述

然后重写subscribeActual方法,将value包裹进行处理,然后再调用观察者进行分发。

下面我们就写一个防抖操作符,帮助我们理解整个流程。

5.1 自定义防抖操作符

public class RxView {private final static String TAG = RxView.class.getSimpleName();// 我们自己的操作符 == 函数public static Observable<Object> clicks(View view) {return new ViewClickObservable(view);}}

public class ViewClickObservable extends Observable<Object> {private final View view;private static final Object EVENT = new Object();public ViewClickObservable(View view) {this.view = view;}@Overrideprotected void subscribeActual(Observer<? super Object> observer) {MyListener myListener = new MyListener(view, observer);//1.拿到view进行处理observer.onSubscribe(myListener);this.view.setOnClickListener(myListener);}// 拿到viewstatic final class MyListener implements View.OnClickListener, Disposable {private final View view;private Observer<Object> observer;private final AtomicBoolean isDisposable = new AtomicBoolean();public MyListener(View view, Observer<Object> observer) {this.view = view;this.observer = observer;}@Overridepublic void onClick(View v) {if (isDisposed() == false) {observer.onNext(EVENT);}}// 如果用调用了 中断@Overridepublic void dispose() {// 如果没有中断过,才有资格,   取消view.setOnClickListener(null);if (isDisposable.compareAndSet(false, true)) {// 主线程 很好的中断if (Looper.myLooper() == Looper.getMainLooper()) {view.setOnClickListener(null);} else { // 主线程,通过Handler的切换/*new Handler(Looper.getMainLooper()) {@Overridepublic void handleMessage(@NonNull Message msg) {super.handleMessage(msg);view.setOnClickListener(null);}};*///放到主线程执行。AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {@Overridepublic void run() {view.setOnClickListener(null);}});}}}@Overridepublic boolean isDisposed() {return isDisposable.get();}}
}

在这里插入图片描述

RxView.clicks(button).throttleFirst(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {}});}
}

执行流程:

  • 用户通过 RxView.clicks(view) 创建 ViewClickObservable
  • 调用 subscribe() 后触发 subscribeActual,创建 MyListener 并绑定到 View 的点击事件。
  • 点击事件触发 onClick,通过 Observer 发送 onNext 事件。然后Observeraccept方法就收到了事件(object)
  • 调用 dispose() 时移除 View 的点击监听。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/80330.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT的初始代码解读及其布局和弹簧

this指的是真正的当前正在显示的窗口 main函数&#xff1a; Widget w是生成了一个主窗口&#xff0c;QT Designer是在这个主窗口里塞组件 w.show()用来展示这个主窗口 头文件&#xff1a; namespace Ui{class Widget;}中的class Widget和下面的class Widget不是一个东西 Ui…

什么是AI写作

一、AI写作简介 AI 写作正在成为未来 10 年最炙手可热的超级技能。已经有越来越多的人通过 AI 写作&#xff0c;在自媒体、公文写作、商业策划等领域实现了提效&#xff0c;甚至产生了变现收益。 掌握 AI 写作技能&#xff0c;不仅能提高个人生产力&#xff0c;还可能在未来的 …

13.原生测试框架Unittest解决用例组织问题 与测试套件的使用

13. 原生测试框架Unittest解决用例组织问题 与测试套件的使用 一、测试架构核心组件解析 1.1 系统组成模块 #mermaid-svg-bYie0B3MLRp0HL4g {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-bYie0B3MLRp0HL4g .erro…

UE5 脚部贴地不穿过地板方案

UE自带的IK RIG和ControlRig技术 【UE5】角色脚部IK——如何让脚贴在不同斜度的地面(设置脚的旋转)_哔哩哔哩_bilibili 实验后这个还是有一部分问题,首先只能保证高度不能穿过,但是脚步旋转还是会导致穿模 IK前,整个模型在斜坡上会浮空 参考制作:https://www.youtube.com/w…

关于 js:4. 异步机制与事件循环

一、同步 vs 异步 1. 什么是同步&#xff08;Synchronous&#xff09; 同步代码就是一行一行、按顺序执行的。当前行没有执行完&#xff0c;下一行不能动。 示例&#xff1a; console.log("A"); console.log("B"); console.log("C");输出&am…

如何通过外网访问内网?对比5个简单的局域网让互联网连接方案

在实际应用中&#xff0c;常常需要从外网访问内网资源&#xff0c;如远程办公访问公司内部服务器、在家访问家庭网络中的设备等。又或者在本地内网搭建的项目应用需要提供互联网服务。以下介绍几种常见的外网访问内网、内网提供公网连接实现方法参考。 一、公网IP路由器端口映…

java的输入输出模板(ACM模式)

文章目录 1、前置准备2、普通输入输出API①、输入API②、输出API 3、快速输入输出API①、BufferedReader②、BufferedWriter 案例题目描述代码 面试有时候要acm模式&#xff0c;刷惯leetcode可能会手生不会acm模式&#xff0c;该文直接通过几个题来熟悉java的输入输出模板&…

什么是移动设备管理(MDM)

移动设备管理&#xff08;MDM&#xff09;是一种安全解决方案&#xff0c;旨在监控、管理和保护企业的移动设备&#xff08;包括智能手机、平板电脑、笔记本电脑和计算机&#xff09;。MDM软件是IT部门的关键工具&#xff0c;其核心功能包括设备配置、安全策略实施、远程控制及…

c++中构造对象实例的两种方式及其返回值

c中&#xff0c;构造对象实例有两种方式&#xff0c;一种返回对象实例&#xff0c;一种返回该对象实例的指针。如下所示&#xff1a; 一、两种返回值 RedisConn conn1; //得到实例conn1;RedisConn *conn2 new RedisConn();//得到指针conn2;RedisConn conn3 new RedisConn()…

【Unity笔记】PathCreator使用教程:用PathCreator实现自定义轨迹动画与路径控制

在Unity开发过程中&#xff0c;角色移动、摄像机动画、轨道系统、AI巡逻等功能中&#xff0c;路径控制是常见又复杂的需求之一。如何优雅、高效地创建路径并控制对象沿路径运动&#xff0c;是游戏开发、动画制作乃至工业仿真中的关键问题。 在这篇文章中&#xff0c;我将介绍一…

JAVA实战开源项目:健身房管理系统 (Vue+SpringBoot) 附源码

本文项目编号 T 180 &#xff0c;文末自助获取源码 \color{red}{T180&#xff0c;文末自助获取源码} T180&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

[人机交互]交互设计过程

*一.设计 1.1什么是设计 设计是一项创新活动&#xff0c;旨在为用户提供可用的产品 –交互设计是“设计交互式产品、以支持人们的生活和工作” 1.2设计包含的四个活动 – 识别用户的需要&#xff08; needs &#xff09;并建立需求&#xff08; requirements &…

1. 视频基础知识

1. 图像基础概念 像素&#xff1a;像素是一个图片的基本单位&#xff0c;pix是英语单词picture&#xff0c;加上英语单词“元素element”&#xff0c;就得到了pixel&#xff0c;简称px。所以“像素”有“图像元素”之意。分辨率&#xff1a;指的是图像的大小或者尺寸。比如 19…

代理IP是什么,有什么用?

一、什么是代理IP&#xff1f; 简单理解&#xff0c;代理IP是一座桥梁——你通过它连接到目标服务器&#xff0c;而不是直接暴露自己。这里的“IP”是网络世界中的地址标签&#xff0c;而代理IP在运行时&#xff0c;蹦跶到台前&#xff0c;成为目标服务器看到的那个“地址”。…

日常代码逻辑实现

日常代码逻辑实现&#xff1a; 1.防抖 解释&#xff1a; 防抖是指n秒内只执行一次&#xff0c;如果n秒内事件再次触发&#xff0c;则重新计算时间 应用场景&#xff1a; 搜索框输入联想&#xff08;避免每次按键都发送请求&#xff09;窗口尺寸调整 代码实现&#xff1a;…

北斗导航 | RTKLib中模糊度解算详解,公式,代码

模糊度解算 一、模糊度解算总体流程二、核心算法与公式推导1. **双差模糊度定义**2. **浮点解方程**三、LAMBDA算法实现细节1. **降相关变换(Z-transform)**2. **整数最小二乘搜索**3. **Ratio检验**四、部分模糊度固定(Partial Ambiguity Resolution, PAR)1. **子集选择策…

基于大模型的母婴ABO血型不合溶血病全方位预测与诊疗方案研究

目录 一、引言 1.1 研究背景与目的 1.2 国内外研究现状 1.3 研究方法与创新点 二、母婴 ABO 血型不合溶血病概述 2.1 发病机制 2.2 临床表现 2.3 流行病学特征 三、大模型在母婴 ABO 血型不合溶血病预测中的应用 3.1 模型选择与构建 3.2 预测指标与数据输入 3.3 模…

驱动-互斥锁

互斥锁可以说是“量值” 为 1 的 信号量&#xff0c; 最终实现的效果相同&#xff0c; 既然有了信号量&#xff0c; 那为什么还要有互斥锁呢&#xff1f; 这就是我们这里需要了解并掌握的 文章目录 参考资料互斥锁的介绍互斥锁结构体 - mutex互斥锁 API互斥锁实验源码程序-mute…

人工智能100问☞第17问:智能体的定义及其基本特征?

目录 一、通俗解释 二、专业解析 三、权威参考 智能体是能够通过传感器感知环境、自主决策并借助执行器采取行动以实现特定目标的智能实体或系统。 一、通俗解释 智能体就像一台能自己“看、想、动”的智能机器。比如你手机里的语音助手&#xff0c;它能听懂你说的话&…

Linux系统入门第十一章 --Shell编程之函数与数组

一、Shell函数 1、函数的用法 Shell函数可用于存放一系列的指令。在Shell脚本执行的过程中&#xff0c;函数被置于内存中&#xff0c;每次调用函数时不需要从硬盘读取&#xff0c;因此运行的速度比较快。在Shell编程中函数并非是必须的元素&#xff0c;但使用函数可以对程序进…