目录
- RxJava是什么?为什么使用。
- RxJava是如何使用的呢?
- RxJava如何和Retrofit一起使用。
- RxJava源码分析。
- (1)他执行流程是如何的。
- (2)map
- (3)线程的切换。
- 如何自定义RxJava操作符?
一、RxJava是什么?为什么使用
RxJava 是一个基于 响应式编程范式 的库,用于通过观察者模式和链式操作符,简化异步、事件驱动、多线程数据流处理的开发。
简单来说,RxJava 就像是一个“流水线工厂”,专门处理需要等待的任务(比如网络请求、数据库查询、复杂计算等)。它能把这些任务串成一条流水线,每个环节处理完数据后,自动传给下一个环节,还能灵活控制任务在哪个线程执行(比如后台线程干活,主线程更新UI)。
1.1 为什么要使用RxJava呢?
接下来,我们看看不使用的问题,以网络请求为例。
需求:按顺序做三件事(登录 → 查询订单 → 更新UI)。
传统写法:“回调地狱”,层层嵌套,像俄罗斯套娃!
// 伪代码:传统嵌套回调(问题代码)
api.login(new Callback<User>() {@Overridepublic void onSuccess(User user) {api.getOrders(user.getId(), new Callback<List<Order>>() {@Overridepublic void onSuccess(List<Order> orders) {runOnUiThread(() -> {showOrders(orders); // 切主线程更新UI});}@Overridepublic void onFailure(Throwable error) {showError(error); // 每个回调都要处理错误!}});}@Overridepublic void onFailure(Throwable error) {showError(error);}
});
问题总结:
- 代码缩进成“金字塔”,维护困难。
- 重复处理错误,每个回调都要写
onFailure
。 - 手动切换线程(如
runOnUiThread
),容易遗漏。
RxJava写法:
// RxJava 链式调用(解决方案)
api.rxLogin() // 1. 登录(被观察者).flatMap(user -> api.rxGetOrders(user.getId())) // 2. 查询订单(操作符).observeOn(AndroidSchedulers.mainThread()) // 3. 切到主线程.subscribe(orders -> showOrders(orders), // 4. 观察者消费数据error -> showError(error)); // 统一错误处理!
优势:
1️ 代码变“直线”,逻辑清晰。
2️ 统一错误处理,一个 onError
搞定所有。
3️ 自动线程切换,不用写 runOnUiThread
。
二、RxJava是如何使用的呢?
(1)添加依赖
implementation 'io.reactivex.rxjava3:rxjava:3.1.8'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2' // Android 需要
(2)使用
Observable.just("你好").subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "onSubscribe: 订阅开始");}@Overridepublic void onNext(@NonNull String s) {Log.d(TAG, "onNext: 拿到事件"+s);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: 错误事件");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: 事件完成");}});
解释一下执行流程:
(1)首先我们先需要知道几个角色:观察者( Observer
),被观察者( Observable
)。
(2)当被观察者发送出数据(调用just方法)你好
的时候,那么观察者就会收到消息(subscribe方法就是订阅)。
(3) subscribe()
将观察者与被观察者连接,触发 Observable 开始发射数据,Observer接收并处理事件(数据、错误、完成信号)。
(4)Observer的 onSubscribe
方法订阅时立即调用(最先执行)。 通知观察者订阅已建立。
(5) onNext
方法Observable 发射数据时调用。
(6) onComplete()
方法Observable 正常完成数据发射后调用 。onError
反之。
三、RxJava如何和Retrofit一起使用
其实就是将Retrofit的响应结果交给RxJava来处理
3.1 发起一个请求
(1)我们需要在Retrofit这里配置 RxJava
适配器
public class RetrofitClient {private static final String BASE_URL = "https://www.wanandroid.com/";private static Retrofit retrofit;public static WanAndroidService getService() {if (retrofit == null) {retrofit = new Retrofit.Builder().baseUrl(BASE_URL).addConverterFactory(GsonConverterFactory.create()) // Gson 解析.addCallAdapterFactory(RxJava3CallAdapterFactory.create()) // RxJava 支持.build();}return retrofit.create(WanAndroidService.class);}
}
这行代码的作用是 让 Retrofit 支持返回 RxJava 3 的响应式类型(如 Observable
、Flowable
、Single
等),使得网络请求的结果可以直接通过 RxJava 的流式操作符处理。
(2)在接口这里,我们也是使用Observable来接收。
public interface WanAndroidService {// 示例1:登录接口(POST)@FormUrlEncoded@POST("/user/login")Observable<ApiResponse<User>> login(@Field("username") String username,@Field("password") String password);// 示例2:获取首页文章列表(GET)@GET("/article/list/{page}/json")Observable<ApiResponse<List<Article>>> getHomeArticles(@Path("page") int page);
}
(3)调用接口
private void login(String username, String password) {WanAndroidService service = RetrofitClient.getService();service.login(username, password).subscribeOn(Schedulers.io()) // 在IO线程发起请求.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 统一管理订阅Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(@NonNull ApiResponse<User> response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {// 网络错误Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {// 请求完成Log.d(TAG, "onComplete: ");}});
}
执行逻辑:
service.login(username, password)
:
通过 Retrofit 定义的接口发起网络请求,返回一个Observable<ApiResponse<User>>
。注意 此时网络请求尚未执行,只是定义了数据源。.subscribeOn(Schedulers.io())
:
指定 Observable 的工作线程为 IO 线程observeOn(AndroidSchedulers.mainThread())
:
指定 Observer 的回调方法(onNext
、onError
、onComplete
)在 主线程 执行。-
.subscribe(Observer)
:
订阅 Observable,触发网络请求执行,并绑定观察者处理结果。 此时网络请求正式启动。
3.2 发起两个请求,网络嵌套
需求:先登录,登录成功后再获取用户的收藏列表。
// 获取收藏列表 (GET, 需要登录态)
@GET("/lg/collect/list/{page}/json")
Observable<CollectListResponse> getCollectList(@Path("page") int page
);
private void nestedNetworkRequest() {WanAndroidService service = RetrofitClient.getService();service.login("xxx", "xxx").flatMap(loginResponse -> {Log.d(TAG, "nestedNetworkRequest: "+loginResponse);// 登录成功后,获取收藏列表if (loginResponse.getErrorCode() == 0) {return service.getCollectList(0); // 第0页} else {return Observable.error(new Throwable("登录失败"));}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<CollectListResponse>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull CollectListResponse response) {Log.d(TAG, "onNext: "+response);}@Overridepublic void onError(@NonNull Throwable e) {Log.d(TAG, "onError: "+e.getMessage());}@Overridepublic void onComplete() {// 请求完成}});
}
flatMap方法是当 login
请求成功返回数据后才会调用。
四、源码分析:
4.1 先从Observer(观察者)开始
里面会有一个泛型,onNext也是使用这个泛型。
4.2 Observable(被观察者)
(1)调用create方法的时候,就会创建一个ObservableCreate
ObservableCreate里面,包裹了source,按照上面的例子,就类似于调用了我们的login方法,将要发送的请求包裹起来。那么包裹起来干嘛?因为他不是现在执行的,我们都知道,需要调用订阅,才会触发整个流程执行。
4.3 Observable的subscribe订阅过程
订阅发生:调用 subscribe()
时,触发 subscribeActual
。
进来以后,我们就可以看到,先执行了我们的onSubscribe方法,然后再去执行source,source就是我们前面说的,将请求包裹起来的内容。
我们看一个非常原始的代码。
然后再里面调用了onNext,就是执行了观察者的onNext方法,然后执行onCompleter那么到这里,整个执行流程就结束了。
4.4 Map变换操作符原理
为什么map可以改变onnext的接收类型呢?我们继续看看。
可以看到,这里的类型就进行了转换。但是为什么观察者也跟着变了呢?
在这里的时候,就已经处理。 返回一个R类型的Observable实例,那么T也就变成了R。
4.5 异步线程切换:subscribeOn(Schedulers.io())
我们拿一个代码来进行分析。
service.login(username, password).subscribeOn(Schedulers.io()) // 在IO线程发起请求.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理结果.subscribe(new Observer<ApiResponse<User>>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {compositeDisposable.add(d); // 统一管理订阅Log.d(TAG, "onSubscribe: ");}...}
}
subscribeOn接收一个Scheduler参数,Scheduler类它封装了线程池和线程切换逻辑。
那么Schedulers.io()做了什么?Schedulers.io()
是一种策略,他会将内部的线程池配置成IO密集型的。我们会发现里面有很多种策略。
那么我们看看subscribeOn做了什么呢?他拿到线程池以后,做什么呢?先保存起来。我们先记住subscribeOn返回了一个ObservableSubscribeOn类
因为我们知道任务需要靠订阅方法才触发的,所以我们来看看ObservableSubscribeOn中的
subscribeActual订阅方法
scheduler.scheduleDirect
createWorker是一个抽象方法,因为前面我们配置的是Schedulers.io(),所以打开IoScheduler的createWorker
,然后会调用schedult方法执行现成。
所以我们的任务就被异步线程执行了。
4.5 主线程切换
那么他如何从异步线程,又切换回主线程的?
.observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers.mainThread()也是一个Scheduler,这里就不多介绍了。我们主要看看他返回的Scheduler
最终切换主线程,还是使用到了handler
我们记住HandlerScheduler类
我们来到observeOn方法
所以,然后最终交给了HandlerScheduler类来执行。
好了,到这里,源码分析就结束了。
那么我们看源码是为了什么?我们可以自定义RxJava操作符来玩玩,也会让我们对前面学习更加的通透理解。
五、自定义RxJava操作符
我们之定义RxJava操作符,并不是说我们自己实现Observable,而是继承他去实现一些功能。
我们看过just方法就知道,其实继承了Observable
然后重写subscribeActual方法,将value包裹进行处理,然后再调用观察者进行分发。
下面我们就写一个防抖操作符,帮助我们理解整个流程。
5.1 自定义防抖操作符
public class RxView {private final static String TAG = RxView.class.getSimpleName();// 我们自己的操作符 == 函数public static Observable<Object> clicks(View view) {return new ViewClickObservable(view);}}
public class ViewClickObservable extends Observable<Object> {private final View view;private static final Object EVENT = new Object();public ViewClickObservable(View view) {this.view = view;}@Overrideprotected void subscribeActual(Observer<? super Object> observer) {MyListener myListener = new MyListener(view, observer);//1.拿到view进行处理observer.onSubscribe(myListener);this.view.setOnClickListener(myListener);}// 拿到viewstatic final class MyListener implements View.OnClickListener, Disposable {private final View view;private Observer<Object> observer;private final AtomicBoolean isDisposable = new AtomicBoolean();public MyListener(View view, Observer<Object> observer) {this.view = view;this.observer = observer;}@Overridepublic void onClick(View v) {if (isDisposed() == false) {observer.onNext(EVENT);}}// 如果用调用了 中断@Overridepublic void dispose() {// 如果没有中断过,才有资格, 取消view.setOnClickListener(null);if (isDisposable.compareAndSet(false, true)) {// 主线程 很好的中断if (Looper.myLooper() == Looper.getMainLooper()) {view.setOnClickListener(null);} else { // 主线程,通过Handler的切换/*new Handler(Looper.getMainLooper()) {@Overridepublic void handleMessage(@NonNull Message msg) {super.handleMessage(msg);view.setOnClickListener(null);}};*///放到主线程执行。AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {@Overridepublic void run() {view.setOnClickListener(null);}});}}}@Overridepublic boolean isDisposed() {return isDisposable.get();}}
}
RxView.clicks(button).throttleFirst(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {}});}
}
执行流程:
- 用户通过
RxView.clicks(view)
创建ViewClickObservable
。 - 调用
subscribe()
后触发subscribeActual
,创建MyListener
并绑定到View
的点击事件。 - 点击事件触发
onClick
,通过Observer
发送onNext
事件。然后Observer
的accept
方法就收到了事件(object) - 调用
dispose()
时移除View
的点击监听。