rxjava 3.0 BehaviorProcessor底层源代码分析

这段代码是 RxJava 中 BehaviorProcessor 类的实现,它是一个特殊的处理器(Processor),可以缓存并向新的订阅者发出最后一个观察到的项以及所有后续的项。以下是对该类的详细分析:

类的结构和字段

  1. 字段

    • subscribers: 存储当前所有订阅者的原子引用数组。
    • EMPTY, TERMINATED: 表示订阅者数组的两种特殊状态。
    • lock, readLock, writeLock: 读写锁,用于确保线程安全的读写操作。
    • value: 原子引用,存储当前缓存的值。
    • terminalEvent: 原子引用,存储终止事件(错误或完成)。
    • index: 一个简单的计数器,用于跟踪项的顺序。
  2. 静态方法

    • create(): 创建一个新的 BehaviorProcessor 实例,不包含初始值。
    • createDefault(T defaultValue): 创建一个包含默认初始值的 BehaviorProcessor 实例。
  3. 构造函数

    • 无参数构造函数:初始化字段。
    • 带默认值的构造函数:调用无参数构造函数,并设置初始值。

主要方法

  1. 订阅和取消订阅

    • subscribeActual(Subscriber<? super T> s): 订阅者订阅时被调用,创建 BehaviorSubscription 实例并尝试添加到订阅者数组。
    • remove(BehaviorSubscription<T> rs): 从订阅者数组中移除指定的订阅者。
  2. 事件处理

    • onSubscribe(Subscription s): 处理订阅事件,但实际上此类通常作为事件源,不需要调用该方法。
    • onNext(T t): 处理下一个事件,将事件传播给所有订阅者。
    • onError(Throwable t): 处理错误事件,将错误事件传播给所有订阅者,并进入终止状态。
    • onComplete(): 处理完成事件,将完成事件传播给所有订阅者,并进入终止状态。
    • offer(T t): 尝试向所有订阅者发送事件,如果所有订阅者都已请求,则返回 true,否则返回 false。
  3. 状态检查

    • hasSubscribers(): 检查是否有订阅者。
    • getThrowable(): 获取终止时的错误(如果有)。
    • getValue(): 获取当前缓存的值。
    • hasComplete(): 检查是否已完成。
    • hasThrowable(): 检查是否有错误。
    • hasValue(): 检查是否有缓存值。
  4. 内部辅助方法

    • add(BehaviorSubscription<T> rs): 添加订阅者到订阅者数组。
    • terminate(Object terminalValue): 终止处理器,并返回终止时的订阅者数组。
    • setCurrent(Object o): 设置当前缓存值,同时增加索引。

内部类 BehaviorSubscription

BehaviorSubscriptionSubscription 的实现,代表一个订阅关系。其主要职责是处理订阅者的请求、取消订阅和事件的发送。

  1. 字段

    • downstream: 实际的订阅者。
    • state: 所属的 BehaviorProcessor 实例。
    • next, emitting, fastPath, cancelled: 标志位,控制订阅状态和事件发送。
    • queue: 用于存储待发送的事件。
    • index: 当前事件的索引。
  2. 方法

    • request(long n): 处理订阅者的请求。
    • cancel(): 取消订阅。
    • emitFirst(): 向新的订阅者发送缓存的值。
    • emitNext(Object value, long stateIndex): 向订阅者发送下一个事件。
    • test(Object o): 处理事件,并根据订阅者的状态决定是否继续发送。
    • emitLoop(): 循环发送缓存的事件。
    • isFull(): 检查订阅者是否已满(请求数为零)。

BehaviorProcessor 是 RxJava 中一种强大的处理器,允许缓存并向新的订阅者发送最后一个事件和所有后续事件。它利用线程安全的原子操作和锁机制,确保在并发环境中的安全性。通过提供多种状态检查方法和事件处理方法,它为开发者提供了丰富的功能和灵活性。

联系我

补充:
这段代码实现了 RxJava 中的 BehaviorProcessor 类。它是一个处理器,用于缓存最近一次的事件并向新订阅者发送,同时向所有订阅者发送后续事件。以下是对订阅和事件处理的详细分解和分析:

1. 订阅相关的方法

subscribeActual

这个方法在订阅者订阅时被调用:

@Override
protected void subscribeActual(@NonNull Subscriber<@NonNull ? super T> s) {BehaviorSubscription<T> bs = new BehaviorSubscription<>(s, this);s.onSubscribe(bs);if (add(bs)) {if (bs.cancelled) {remove(bs);} else {bs.emitFirst();}} else {Throwable ex = terminalEvent.get();if (ex == ExceptionHelper.TERMINATED) {s.onComplete();} else {s.onError(ex);}}
}
  • 创建一个新的 BehaviorSubscription 实例,表示一个订阅关系。
  • 调用订阅者的 onSubscribe 方法,将订阅关系传递给订阅者。
  • 尝试将订阅关系添加到 subscribers 数组中。如果添加成功且订阅未取消,调用 emitFirst 发送缓存的值。如果订阅已经取消,则将其从 subscribers 数组中移除。
  • 如果处理器已经终止,则向订阅者发送终止事件。
addremove

这些方法用于管理订阅者:

boolean add(BehaviorSubscription<T> rs) {for (;;) {BehaviorSubscription<T>[] a = subscribers.get();if (a == TERMINATED) {return false;}int len = a.length;@SuppressWarnings("unchecked")BehaviorSubscription<T>[] b = new BehaviorSubscription[len + 1];System.arraycopy(a, 0, b, 0, len);b[len] = rs;if (subscribers.compareAndSet(a, b)) {return true;}}
}@SuppressWarnings("unchecked")
void remove(BehaviorSubscription<T> rs) {for (;;) {BehaviorSubscription<T>[] a = subscribers.get();int len = a.length;if (len == 0) {return;}int j = -1;for (int i = 0; i < len; i++) {if (a[i] == rs) {j = i;break;}}if (j < 0) {return;}BehaviorSubscription<T>[] b;if (len == 1) {b = EMPTY;} else {b = new BehaviorSubscription[len - 1];System.arraycopy(a, 0, b, 0, j);System.arraycopy(a, j + 1, b, j, len - j - 1);}if (subscribers.compareAndSet(a, b)) {return;}}
}
  • add 方法将新的订阅关系添加到 subscribers 数组中。
  • remove 方法从 subscribers 数组中移除指定的订阅关系。

2. 事件处理相关的方法

onSubscribe
@Override
public void onSubscribe(@NonNull Subscription s) {if (terminalEvent.get() != null) {s.cancel();return;}s.request(Long.MAX_VALUE);
}
  • 处理订阅事件。如果处理器已经终止,取消订阅。否则,请求 Long.MAX_VALUE 表示无限量请求。
onNext
@Override
public void onNext(@NonNull T t) {ExceptionHelper.nullCheck(t, "onNext called with a null value.");if (terminalEvent.get() != null) {return;}Object o = NotificationLite.next(t);setCurrent(o);for (BehaviorSubscription<T> bs : subscribers.get()) {bs.emitNext(o, index);}
}
  • 处理下一个事件。首先检查值是否为空,如果为空则抛出异常。
  • 检查处理器是否已终止,如果是,则返回。
  • 创建事件对象,并更新当前值。
  • 将事件传播给所有订阅者。
onError
@Override
public void onError(@NonNull Throwable t) {ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");if (!terminalEvent.compareAndSet(null, t)) {RxJavaPlugins.onError(t);return;}Object o = NotificationLite.error(t);for (BehaviorSubscription<T> bs : terminate(o)) {bs.emitNext(o, index);}
}
  • 处理错误事件。首先检查错误是否为空,如果为空则抛出异常。
  • 更新终止事件为错误。如果已经有终止事件,则将错误报告给全局错误处理器。
  • 创建错误事件对象,并将其传播给所有订阅者。
onComplete
@Override
public void onComplete() {if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) {return;}Object o = NotificationLite.complete();for (BehaviorSubscription<T> bs : terminate(o)) {bs.emitNext(o, index);  // relaxed read okay since this is the only mutator thread}
}
  • 处理完成事件。更新终止事件为完成。
  • 创建完成事件对象,并将其传播给所有订阅者。

3. 辅助方法

setCurrent
void setCurrent(Object o) {Lock wl = writeLock;wl.lock();index++;value.lazySet(o);wl.unlock();
}
  • 设置当前值,并更新索引。
terminate
@SuppressWarnings("unchecked")
BehaviorSubscription<T>[] terminate(Object terminalValue) {setCurrent(terminalValue);return subscribers.getAndSet(TERMINATED);
}
  • 终止处理器,并返回终止时的订阅者数组。

4. 内部类 BehaviorSubscription

字段
  • downstream: 实际的订阅者。
  • state: 所属的 BehaviorProcessor 实例。
  • next, emitting, fastPath, cancelled: 标志位,控制订阅状态和事件发送。
  • queue: 用于存储待发送的事件。
  • index: 当前事件的索引。
方法
requestcancel
@Override
public void request(long n) {if (SubscriptionHelper.validate(n)) {BackpressureHelper.add(this, n);}
}@Override
public void cancel() {if (!cancelled) {cancelled = true;state.remove(this);}
}
  • request 处理订阅者的请求。
  • cancel 取消订阅。
emitFirstemitNext
void emitFirst() {if (cancelled) {return;}Object o;synchronized (this) {if (cancelled) {return;}if (next) {return;}BehaviorProcessor<T> s = state;Lock readLock = s.readLock;readLock.lock();index = s.index;o = s.value.get();readLock.unlock();emitting = o != null;next = true;}if (o != null) {if (test(o)) {return;}emitLoop();}
}void emitNext(Object value, long stateIndex) {if (cancelled) {return;}if (!fastPath) {synchronized (this) {if (cancelled) {return;}if (index == stateIndex) {return;}if (emitting) {AppendOnlyLinkedArrayList<Object> q = queue;if (q == null) {q = new AppendOnlyLinkedArrayList<>(4);queue = q;}q.add(value);return;}next = true;}fastPath = true;}test(value);
}
  • emitFirst 向新的订阅者发送缓存的值。
  • emitNext 向订阅者发送下一个事件。
testemitLoop
@Override
public boolean test(Object o) {if (cancelled) {return true;}if (NotificationLite.isComplete(o)) {downstream.onComplete();return true;} else if (NotificationLite.isError(o)) {downstream.onError(NotificationLite.getError(o));return true;}long r = get();if (r != 0L) {downstream.onNext(NotificationLite.<T>getValue(o));if (r != Long.MAX_VALUE) {decrementAndGet();}return false;}cancel();downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));return true;
}void emitLoop() {for (;;) {if (cancelled) {return;}AppendOnlyLinkedArrayList<Object> q;synchronized (this) {q = queue;if (q == null) {emitting = false;

联系我

还是有一点不明白,subscribeActual这个方法在订阅者订阅时如何被调用
再分析:
在 RxJava 的架构中,subscribeActual 方法是由 Flowable.subscribe 方法间接调用的。当你创建一个 Flowable 并调用 subscribe 方法时,RxJava 框架内部会进行一系列操作,最终调用到 subscribeActual 方法。以下是一个简单的调用链,展示了 subscribeActual 方法如何在订阅者订阅时被调用。

调用链

  1. 用户代码调用 subscribe 方法:
BehaviorProcessor<Object> processor = BehaviorProcessor.create();
processor.subscribe(subscriber);
  1. Flowable.subscribe 方法:

当你调用 subscribe 方法时,实际上调用的是 Flowable 类中的 subscribe 方法。

@Override
public final void subscribe(Subscriber<? super T> s) {Objects.requireNonNull(s, "s is null");try {Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);Objects.requireNonNull(z, "Plugin returned null Subscriber");subscribeActual(z);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a subscription has happened so can't call cancelRxJavaPlugins.onError(e);throw ExceptionHelper.wrapOrThrow(e);}
}

这个方法做了以下几件事:

  • 确保订阅者 s 不为空。
  • 调用 RxJavaPlugins.onSubscribe 方法,这是一个钩子方法,允许 RxJava 插件处理订阅逻辑。
  • 调用 subscribeActual(z),这里的 z 是经过插件处理后的订阅者。
  1. 调用 BehaviorProcessor.subscribeActual 方法:

subscribeActual 方法在 BehaviorProcessor 类中被重写,它会处理实际的订阅逻辑。

@Override
protected void subscribeActual(@NonNull Subscriber<@NonNull ? super T> s) {BehaviorSubscription<T> bs = new BehaviorSubscription<>(s, this);s.onSubscribe(bs);if (add(bs)) {if (bs.cancelled) {remove(bs);} else {bs.emitFirst();}} else {Throwable ex = terminalEvent.get();if (ex == ExceptionHelper.TERMINATED) {s.onComplete();} else {s.onError(ex);}}
}

具体过程

  1. 创建 BehaviorSubscription 实例:
BehaviorSubscription<T> bs = new BehaviorSubscription<>(s, this);

这里 BehaviorSubscription 是一个内部类,表示一个具体的订阅关系。

  1. 调用订阅者的 onSubscribe 方法:
s.onSubscribe(bs);

BehaviorSubscription 实例传递给订阅者。

  1. BehaviorSubscription 添加到订阅者列表:
if (add(bs)) {if (bs.cancelled) {remove(bs);} else {bs.emitFirst();}
}
  • 如果添加成功且订阅未取消,调用 emitFirst 方法发送缓存的值。
  • 如果订阅已经取消,则将其从 subscribers 列表中移除。
  1. 处理终止状态:
else {Throwable ex = terminalEvent.get();if (ex == ExceptionHelper.TERMINATED) {s.onComplete();} else {s.onError(ex);}
}

如果处理器已经终止,则向订阅者发送终止事件。

在 RxJava 中,subscribeActual 方法在订阅者调用 subscribe 方法时被框架内部调用。这个过程涉及一系列方法调用和检查,确保订阅关系被正确建立,并处理可能的并发和终止状态。通过这种设计,RxJava 提供了一个灵活且强大的异步编程模型。

联系我

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/28806.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python中的np.setdiff1d()函数

Python中的np.setdiff1d()函数可用于找出两个序列集合中元素的差异 API及参数说明如下&#xff1a; np.setdiff1d(ar1,ar2,assume_unique)&#xff1a;找出序列1在序列2中的差异&#xff0c;并返回序列1中不在序列2中的元素 ar1&#xff1a;输入数组ar2&#xff1a;输入比较…

网络安全等级保护制度详解,一文掌握核心要点!

一、等级保护制度发展情况 等级保护制度的法律依据 《计算机信息系统安全保护条例》&#xff08;1994年General Office of the State Council第147号令&#xff09; 公安部主管全国计算机信息系统安全保护工作。 计算机信息系统实行安全等级保护&#xff0c;安全等级的划分…

目录文件管理

文章目录 Linux目录结构树形目录结构根目录常见的子目录子目录的作用 查看及检索文件查看文件内容cat格式 more格式操作方法 less格式操作方法 head格式 tail格式 统计文件内容wc格式选项 检索和过滤文件内容grep格式选项查找条件 备份及恢复文档压缩命令gzip bzip2格式压缩解压…

Redis 网络模型

一、用户空间和内核空间 1.1 linux 简介 服务器大多采用 Linux 系统&#xff0c;这里我们以 Linux 为例来讲解&#xff0c;下面有两个不同的 linux 发行版&#xff0c;分别位 ubuntu 和 centos&#xff0c;其实发行版就是在 Linux 系统上包了一层壳。 任何 Linux 发行版&#…

Draw.io

Draw.io Draw.io是一款很实用的思维导图软件&#xff0c;现在已经推出了桌面版&#xff0c;对于很多办公人士而言画张流程图可以轻松的罗列整个流程&#xff0c;比使用文字或者幻灯片制作简单并且容易理解&#xff0c;该应用程序不仅免费&#xff0c;还拥有专业的工具&#xf…

详细图文手把手教你阿里云注册域名如何托管到CloudFlare DNS服务

1.第一步&#xff1a;注册并登录Cloudflare账号&#xff0c;点击右上角“添加站点”&#xff0c;进入下图页面填写域名&#xff0c;点击继续。 2.第二步&#xff1a;进入页面滑动到最下方&#xff0c;选择Free免费套餐&#xff0c;再次点击继续。 3.第三步&#xff1a;这个页面…

基于starknet构建应用链之Madara

文章目录 什么是Madara应用链模板其他仓库为何要构建应用链?什么是Madara 欢迎来到Madara,使用Cairo和Starknet技术构建链的模块化堆栈。像dYdX V3、Immutable和Sorare这样的应用程序已经使用StarkEx进行扩展有一段时间了,现在有了Madara,它是开源的,每个人都可以使用。 …

c# 二维图形绘制实践

1.等边三角形 1.1 概述 1.2 代码 using System; using System.Drawing; using System.Windows.Forms;public partial class TriangleForm : Form {public TriangleForm(){//InitializeComponent();// 确保窗体大小足够大&#xff0c;以容纳三角形 this.ClientSize new Siz…

AbMole带你探索细胞的“铁”门:Piezo1通道在椎间盘退变中的关键角色

在生物医学领域&#xff0c;铁是细胞功能不可或缺的元素&#xff0c;但铁的异常积累却可能成为细胞的“隐形杀手”。最近&#xff0c;一项发表在《Bone Research》上的研究&#xff0c;为我们揭开了铁代谢与椎间盘退变之间神秘联系的一角。这项研究不仅深化了我们对铁离子通道P…

5个超实用1688选品技巧!轻松出单999+

1、研究市场需求 通过市场调查和分析&#xff0c;了解目标市场的消费者喜好和趋势。选择具有市场需求且竞争相对较小的产品类别。 用店雷达热销商 品榜和飙升商 品榜。比如做女装类目&#xff0c;选择“女士T恤”我们可以根据日、周、月为时间维度下商品的销售笔数、件数、销…

Browserslist: caniuse-lite is outdated。浏览器列表:caniuse lite已经过时???

一、最近运行项目启动时提示 Browserslist: caniuse-lite is outdated. Please run: npx update-browserslist-dblatest Why you should do it regularly: https://github.com/browserslist/update-db#readme 这要是这一句&#xff0c;Browserslist: caniuse-lite is outdated.…

大神出新品,吴恩达开源机器翻译智能体项目

节前&#xff0c;我们星球组织了一场算法岗技术&面试讨论会&#xff0c;邀请了一些互联网大厂朋友、参加社招和校招面试的同学。 针对算法岗技术趋势、大模型落地项目经验分享、新手如何入门算法岗、该如何准备、面试常考点分享等热门话题进行了深入的讨论。 合集&#x…

走近科学之《netty 的秘密》

Approaching science《the secret of netty》 IO 相关概念、五种 IO 模型、BIO NIO AIO 特点及区别、NIO 设计原理及核心组件、netty 简介及应用场景、netty 线程模型&#xff08;Reactor 线程模型&#xff09;、netty 设计原理及核心组件、netty 常用技巧实现&#xff08;心跳…

Django REST framework序列化器详解:普通序列化器与模型序列化器的选择与运用

系列文章目录 Django入门全攻略&#xff1a;从零搭建你的第一个Web项目Django ORM入门指南&#xff1a;从概念到实践&#xff0c;掌握模型创建、迁移与视图操作Django ORM实战&#xff1a;模型字段与元选项配置&#xff0c;以及链式过滤与QF查询详解Django ORM深度游&#xff…

【unity笔记】二、海洋系统Crest Ocean System基础

1. 创建海平面 首先确定项目中导入了HDRP插件。这里使用Crest Ocean System HDRP插件。 在场景下创建空对象&#xff0c;这里命名为Ocean。将 OceanRenderer 组件分配给Ocean。该组件将生成海洋几何图形并执行所有必需的初始化。其中Global Wind Speed 属性可以调节风浪大小。…

vuedraggable在vue2.0和vue3.0 中的应用

文章目录 vuedraggable在vue2.0中的应用在vue3.0中的应用 vuedraggable Vue.Draggable是一款基于Sortable.js实现的vue拖拽插件。支持移动设备、拖拽和选择文本、智能滚动&#xff0c;可以在不同列表间拖拽、不依赖jQuery为基础、vue 2过渡动画兼容、支持撤销操作&#xff0c;…

部署远程控制台访问服务Rttys,第一部分客户端(安装CMAKE)

背景&#xff1a;现公司有一需求&#xff0c;需要开发一个程序&#xff0c;实现页面点击按钮后跳转&#xff0c;远程连接到指定的虚拟机&#xff0c;并可以进行linux命令操作&#xff0c;在网上找了很多文章&#xff0c;发现都没有详细的步骤和部署问题处理&#xff0c;所以自己…

Web渗透信息收集进阶

网站敏感目录与文件 网站敏感目录表示网站目录中容易被恶意人员利用的一些目录。通常恶意人员都是通过工具扫描&#xff0c;来扫出网站的敏感目录&#xff0c;敏感目录是能够得到其他网页的信息&#xff0c;从而找到后台管理页面&#xff0c;尝试进入后台等&#xff0c;扫描网…

33、matlab矩阵分解汇总:LU矩阵分解、Cholesky分解、QR分解和SVD分解

1、矩阵分解简介 矩阵分解是指将一个矩阵分解成子矩阵或其他形式的矩阵表示的过程。常见的矩阵分解方法包括LU分解、QR分解、奇异值分解&#xff08;SVD&#xff09;、特征值分解等。 LU分解&#xff1a;将一个矩阵分解为一个下三角矩阵L和一个上三角矩阵U的乘积&#xff0c;…

淘宝评论电商API接口,揭示用户真实评价

随着互联网的快速发展&#xff0c;电子商务已经成为了人们生活中不可或缺的一部分。淘宝作为中国最大的在线购物平台&#xff0c;拥有数以亿计的消费者和商家。而用户评价作为消费者了解商品和服务的重要途径&#xff0c;对于商家的信誉和销售有着至关重要的影响。因此&#xf…