其实所有的节日都不是为了礼物和红包而生,而是为了提醒我们,不要忘记爱与被爱,生活需要仪式感,而你需要的是在乎和关爱
目录
前言
一,Hook点
二,RXJava的观察者模式
三,Map操作符原理
前言
关于RXJava的使用请移步文章
android--RXJava详细使用篇-CSDN博客
android--RXJava+Retrofit封装使用-CSDN博客
一,Hook点
首先我们写一个RxJava从订阅到分发的简单实例:
Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {e.onNext("袁震");}
}).map(new Function<Object, Boolean>() {@Overridepublic Boolean apply(Object o) throws Exception {return true;}
})
.subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {}
});
然后我们点击Observable.create:
@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {//空检查,不用管ObjectHelper.requireNonNull(source, "source is null");//关键代码return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}
第一句代码不用管,是一个空检查。我们看这句代码
RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
参数是关于创建的被观察者的,我们先不用管。先看下onAssembly:
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {return apply(f, source);}return source;
}
在这段代码中,如果onObservableAssembly为空,则直接返回我们传入的被观察者。
如果不为空,则继续走apply函数:
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {try {return f.apply(t);} catch (Throwable ex) {throw ExceptionHelper.wrapOrThrow(ex);}
}
f.apply(t)函数:
public interface Function<T, R> {/*** Apply some calculation to the input value and return some other value.* @param t the input value* @return the output value* @throws Exception on error*/R apply(@NonNull T t) throws Exception;
}
这也就是我们前面的map等方法的入参Function。
然后我们继续回到上面查看我们的onObservableAssembly是不是为空,我们发现只有在一个地方给onObservableAssembly赋值了:
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {if (lockdown) {throw new IllegalStateException("Plugins can't be changed anymore");}RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
所以说,如果我们不手动调用RxJavaPlugins.setOnObservableAssembly,onObservableAssembly就一定是空的。
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {//不主动赋值,这段代码一定不会执行 ------------ Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {return apply(f, source);}
//----------------------------return source;
}
然后我们看一下map的源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
显然和上面create是一样的原理。
所以,如果我们手动的设置了onObservableAssembly的值,在调用map的时候,就会走到apply方法里面,并且把上一步的pbservable传递过来:
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {@Overridepublic Observable apply(Observable observable) throws Exception {Log.d(TAG, "走到了apply方法:" + observable);return observable; }
});
这样就实现了在调用create或者map等操作符之前,实现hook,先调用自己的方法
二,RXJava的观察者模式
RxJava的使用步骤,三个最主要的流程:
1,创建Observable 被观察者
2,创建Observer 观察者
3,使用subcribe订阅
下面我们先来实现一下基本的使用流程:
//创建Observable 被观察者
Observable.create(// 自定义sourcenew ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("yuanzhen");}
})
//subscribe订阅
.subscribe(//创建观察者new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});
首先,我们先看一下最简单的创建观察者:
new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}
});
Observer的全部源码如下:
public interface Observer<T> {/*** Provides the Observer with the means of cancelling (disposing) the* connection (channel) with the Observable in both* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.* @param d the Disposable instance whose {@link Disposable#dispose()} can* be called anytime to cancel the connection* @since 2.0*/void onSubscribe(@NonNull Disposable d);/*** Provides the Observer with a new item to observe.* <p>* The {@link Observable} may call this method 0 or more times.* <p>* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or* {@link #onError}.** @param t* the item emitted by the Observable*/void onNext(@NonNull T t);/*** Notifies the Observer that the {@link Observable} has experienced an error condition.* <p>* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or* {@link #onComplete}.** @param e* the exception encountered by the Observable*/void onError(@NonNull Throwable e);/*** Notifies the Observer that the {@link Observable} has finished sending push-based notifications.* <p>* The {@link Observable} will not call this method if it calls {@link #onError}.*/void onComplete();}
可以看到,Observer使用了泛型,传入了什么,onNext的参数就是什么类型。
有四个函数:
第一个函数 onSubscribe:一订阅马上调用这个函数
第二个函数onNext:拿到上一个操作符留下来的数据
第三个函数onError:拿到上一个操作符留下来的错误数据
第四个函数onComplete:事件结束
创建观察者,其实就是new了一个接口,让用户自己实现方法,这个非常简单。
下面我们看一下Observable的创建过程:
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("袁震");}
})
我们还是看一下create方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
前面我们已经分析过RxJavaPlugins.onAssembly方法,但是参数我们说后面分析。
接下来,我们就来分析一下这个参数:
首先source我们必须搞清楚是什么。他就是上面我们自己创建的ObservableOnSubscribe:
new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {emitter.onNext("袁震");}
然后把source作为参数,传递给了ObservableCreate:
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;
}
到这里,观察者和被观察者都创建完了,接下来就是最关键的订阅流程了:
分析了观察者和被观察者的创建之后,上面的使用流程,实际上我们可以简化为:
被观察者= new ObservableCreate
观察者= new Observer
被观察者.subscribe(观察者)
所以,subscribe方法肯定是在ObservableCreate这个对象或者其父类里面实现的
我们先看subscribe的源码:
它实在ObservableCreate的父类Observable里面实现的
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}
}
/**
其他代码都是异常判断,可以忽略,主要看这行代码:
subscribeActual(observer);
然后看他的实现:
protected abstract void subscribeActual(Observer<? super T> observer);
发现是一个抽象方法,所以它的实现肯定在其子类ObservableCreate中:
@Override
protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}
这里非常重要,首先他将我们自己创建的观察者传递了进来,然后:
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
他创建了一个CreateEmitter,并且将观察者传递了进去,我们看看CreateEmitter这个类:
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;}@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}}
这里面有我们熟悉的onNext,onError,onComplete等方法,调用这些方法的时候,就会调用我们传递进来的观察者的相应的方法。
然后再分析上面的subscribeActual方法
接下来执行observer.onSubscribe(parent);也就是我们自己创建的观察者的onSubscribe方法。
执行完了之后,就会执行source.subscribe(parent);这个source就是我们自己创建的被观察者。
并把创建的包含观察者的CreateEmitter对象传递到这个方法里面去。
我们在使用时是这样实现的:
Observable.create(// 自定义sourcenew ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> emitter) throws Exception {// CreateEmitter.onNextemitter.onNext("袁震");}
})
这时候就会调用CreateEmitter的onNext方法,然后就会调用到观察者的onNext方法。
或者调用onCpmlete方法。
这样就完成了RxJava的观察者模式。
注意,RXJava中的观察者模式 和传统的观察者模式是不同的,关于传统的观察者模式可以看我的文章:Android设计模式--观察者模式-CSDN博客
区别主要是:在标准的观察者设计模式中,是一个“被观察者”,多个“观察者“,并且需要“被观察者”发出改变通知后,所有的”观察者”才能观察到
在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要 起点 和 终点 在 “订阅” 一次后,才发出改变通知,终点(观察者)才能观察到
为什么说是多个被观察者呢?我们看下map源码:
@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}
所以说我们每调用一次map,就会创建一个被观察者,但是最终的观察者只有一个
而且,相比于传统的观察模式,Rxjava在观察者和被观察者之间添加了一个抽象层的发射器,降低了耦合度。所以说其实他更接近于发布订阅模式。
三,Map操作符原理
首先我们要理解map的作用,map其实就是把数据从一种类型转换到了另一种类型。
下面我们先实现一个map,它的作用就是把string类型转换为Bitmap
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {}
}).map(new Function<String, Bitmap>() {@Overridepublic Bitmap apply(String s) throws Exception {return null;}
}).subscribe(new Observer<Bitmap>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Bitmap bitmap) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});
首先,Observable.create上面我们已经分析过了,就是创建了ObservableCreate对象,
然后调用了这个对象里面的map方法,我们看下map的源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
这个源码上面我们已经看过很多次了,下面我们主要看下这句代码:
new ObservableMap<T, R>(this, mapper)
它创建了一个ObservableMap对象,并且把ObservableCreate对象和我们自己创建的Function传递进去。
实际上我们使用时的代码就可以理解为这样的:
被观察者 =new ObservableCreate
ObservableMap =被观察者.map
ObservableMap.subscribe(观察者)
所以,当我们执行subscribe时,就走到了ObservableMap里面的subscribeActual方法:
@Override
public void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));
}
这里又用MapObserver将观察者包装了起来:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {actual.onNext(null);return;}U v;try {v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}actual.onNext(v);}@Overridepublic int requestFusion(int mode) {return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic U poll() throws Exception {T t = qs.poll();return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;}
}
这里就是每添加一个map,就会用MapObserver包装一层,有多少map就包装多少层。
注意:ObservableMap里面的source,实际上就是我们传递进来的ObservableCreate
所以
source.subscribe(new MapObserver<T, U>(t, function));
实际上就会调用到ObservableCreate的subscribeActual:
@Override
protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}
但是这里传递进来的是MapObserver,它包装了观察者
之后又用CreateEmitter将MapObserver包装了起来。
下面的 observer.onSubscribe(parent);我们前面分析过了,就是调用我们自定义的观察者的onSubscribe方法
然后下面的source.subscribe(parent)会调用到我们自定义的source:
Observable.create(// 自定义sourcenew ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("袁震");}
})
然后这个发射器ObservableEmitter<String> e实际上就是包装了MapObserver的CreateEmitter
调用e.onNext("袁震") 会走到CreateEmitter的onNext(T t):
@Override
public void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}
}
这里面的observer实际上是包装了观察者的MapObserver。然后就会走到MapObserver的onNext方法中:
@Override
public void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {actual.onNext(null);return;}U v;try {v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}actual.onNext(v);
}
这里面的actual就是我们传进来的观察者, actual.onNext(v);实际上就调用到了我们自定义的onNext方法中去。
mapper就是map中自定义的Function方法,mapper.apply(t)是一个抽象的方法:
public interface Function<T, R> {/*** Apply some calculation to the input value and return some other value.* @param t the input value* @return the output value* @throws Exception on error*/R apply(@NonNull T t) throws Exception;
}
其具体的实现由我们自己定义:
.map(new Function<String, Bitmap>() {@Overridepublic Bitmap apply(String s) throws Exception {//自己实现return null;}
})
如果有多个map,就是这样一层一层的去包装,然后再一层一层的去拆包。
这样就实现了RxJava链式编程的核心思想。
这个可能比较绕,但是理解了安卓之神的RxJava设计思想,其他的框架看起来也就很easy了。