spring响应式编程系列:异步消费数据

目录

示例

大致流程

parallel

cache

PARALLEL_SUPPLIER

newParallel

init

publishOn

new MonoSubscribeOnValue

​​​​​​​subscribe

​​​​​​​new LambdaMonoSubscriber

​​​​​​​MonoSubscribeOnValue.subscribe

​​​​​​​onSubscribe

​​​​​​​request

​​​​​​​schedule

​​​​​​​directSchedule

​​​​​​​run

​​​​​​​onNext

时序图

类图

数据发布者

MonoSubscribeOnValue

调度器

ParallelScheduler

数据订阅者

LambdaMonoSubscriber

订阅的消息体

ScheduledScalar


       本篇文章我们来研究如何控制数据流在特定线程池上的执行。即将操作符的执行切换到指定调度器(Scheduler)的线程。在Project Reactor框架中,Mono.publishOn()就可以实现该功能,示例如下所示:

示例

CountDownLatch countDownLatch = new CountDownLatch(1);

// 创建一个包含数据的 Mono

Mono<String> mono = Mono.just("Hello, Reactive World!");

mono.publishOn(Schedulers.parallel())

    .subscribe(x -> {

        log.info("Sub thread, subscribe: {}", x);

        countDownLatch.countDown();

    });

log.info("Main thread, blocking");

countDownLatch.await();

       首先,通过mono.publishOn方法指定线程池;

       其次,通过subscribe指定的消费者处理逻辑会在前一步指定的线程池里执行。

大致流程

       点击Schedulers.parallel(),如下所示:

parallel

public static Scheduler parallel() {

return cache(CACHED_PARALLEL, PARALLEL, PARALLEL_SUPPLIER);

}

       在这里,调用cache方法并且PARALLEL_SUPPLIER参数,返回Scheduler调度器。cache方法如下所示:

cache

static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {

CachedScheduler s = reference.get();

if (s != null) {

return s;

}

s = new CachedScheduler(key, supplier.get());

if (reference.compareAndSet(null, s)) {

return s;

}

... ...

}

       在这里,调用supplier.get()方法来生成CachedScheduler调度器。supplier.get()方法的定义如下所示:

PARALLEL_SUPPLIER

static final Supplier<Scheduler> PARALLEL_SUPPLIER =
      () -> newParallel(PARALLEL, DEFAULT_POOL_SIZE, true);

newParallel

public static Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {

final Scheduler fromFactory = factory.newParallel(parallelism, threadFactory);

fromFactory.init();

return fromFactory;

}

       在这里,通过工厂方法模式创建Scheduler对象,并调用init()初始化方法,如下所示:

init

@Override

public void init() {

... ...

SchedulerState<ScheduledExecutorService[]> b =

SchedulerState.init(new ScheduledExecutorService[n]);

for (int i = 0; i < n; i++) {

b.currentResource[i] = Schedulers.decorateExecutorService(this, this.get());

}

if (!STATE.compareAndSet(this, null, b)) {

for (ScheduledExecutorService exec : b.currentResource) {

exec.shutdownNow();

}

if (isDisposed()) {

throw new IllegalStateException(

"Initializing a disposed scheduler is not permitted"

);

}

}}

       在这里,new了一个ScheduledExecutorService线程池对象作为调度器的底导线程池实现。

       点击示例里的mono.publishOn()方法,如下所示:

​​​​​​​publishOn

public final Mono<T> publishOn(Scheduler scheduler) {
   if(this instanceof Callable) {
      if (this instanceof Fuseable.ScalarCallable) {
         try {
            T value = block();
            return onAssembly(new MonoSubscribeOnValue<>(value, scheduler));
         }
         catch (Throwable t) {
            //leave MonoSubscribeOnCallable defer error
         }
      }
      @SuppressWarnings("unchecked")
      Callable<T> c = (Callable<T>)this;
      return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler));
   }
   return onAssembly(new MonoPublishOn<>(this, scheduler));
}

       在这里,new 了一个MonoSubscribeOnValue对象,并且传递了scheduler对象作为构造参数。

       如下所示:

​​​​​​​new MonoSubscribeOnValue

final class MonoSubscribeOnValue<T> extends Mono<T> implements Scannable {
   final T value;
   final Scheduler scheduler;
   MonoSubscribeOnValue(@Nullable T value, Scheduler scheduler) {
      this.value = value;
      this.scheduler = Objects.requireNonNull(scheduler, "scheduler");
   }

       在这里,将入参scheduler作为MonoSubscribeOnValue对象的属性scheduler保留下来。

       点击示例里的mono.subscribe()方法,如下所示:

​​​​​​​subscribe

public final Disposable subscribe(
      @Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Context initialContext) {
   return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
         completeConsumer, null, initialContext));
}

      在这里,new了一个LambdaMonoSubscriber对象,这里与《spring响应式编程系列:总体流程》类似。LambdaMonoSubscriber对象的构造函数如下所示:

​​​​​​​new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Consumer<? super Subscription> subscriptionConsumer,
      @Nullable Context initialContext) {
   this.consumer = consumer;
   this.errorConsumer = errorConsumer;
   this.completeConsumer = completeConsumer;
   this.subscriptionConsumer = subscriptionConsumer;
   this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

​​​​​​​MonoSubscribeOnValue.subscribe

@Override

public void subscribe(CoreSubscriber<? super T> actual) {

T v = value;

if (v == null) {

ScheduledEmpty parent = new ScheduledEmpty(actual);

actual.onSubscribe(parent);

try {

parent.setFuture(scheduler.schedule(parent));

}

catch (RejectedExecutionException ree) {

if (parent.future != OperatorDisposables.DISPOSED) {

actual.onError(Operators.onRejectedExecution(ree,

actual.currentContext()));

}

}

}

else {

actual.onSubscribe(new ScheduledScalar<>(actual, v, scheduler));

}

}

       在这里,new一个ScheduledScalar对象,传入消费者和scheduler对象,然后同样是调用消费者的onSubscribe()方法。

       如下所示:

​​​​​​​onSubscribe

public final void onSubscribe(Subscription s) {
   if (Operators.validate(subscription, s)) {
      this.subscription = s;
      if (subscriptionConsumer != null) {
         try {
            subscriptionConsumer.accept(s);
         }
         catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            s.cancel();
            onError(t);
         }
      }
      else {
         s.request(Long.MAX_VALUE);
      }
   }
}

       在这里,与《spring响应式编程系列:总体流程》类似,调用订阅消费体的request()方法,如下所示:

​​​​​​​request

@Override
public void request(long n) {
   if (Operators.validate(n)) {
      if (ONCE.compareAndSet(this, 0, 1)) {
         try {
            Disposable f = scheduler.schedule(this);
            if (!FUTURE.compareAndSet(this,
                  null,
                  f) && future != FINISHED && future != OperatorDisposables.DISPOSED) {
               f.dispose();
            }
         }
         catch (RejectedExecutionException ree) {
            if (future != FINISHED && future != OperatorDisposables.DISPOSED) {
               actual.onError(Operators.onRejectedExecution(ree,
                     this,
                     null,
                     value, actual.currentContext()));
            }
         }
      }
   }
}

       在这里,调用scheduler.schedule(this)方法,并将订阅的消息体作为参数传入。如下所示:

​​​​​​​schedule

public Disposable schedule(Runnable task) {
 return Schedulers.directSchedule(pick(), task, null, 0L, TimeUnit.MILLISECONDS);
}

       在这里,首先,调用pick()方法获取当前调度器的线程池对象,与订阅的消息体一起作为参数传入directSchedule()方法,如下所示:

​​​​​​​directSchedule

static Disposable directSchedule(ScheduledExecutorService exec,
      Runnable task,
      @Nullable Disposable parent,
      long delay,
      TimeUnit unit) {
   task = onSchedule(task);
   SchedulerTask sr = new SchedulerTask(task, parent);
   Future<?> f;
   if (delay <= 0L) {
      f = exec.submit((Callable<?>) sr);
   }
   else {
      f = exec.schedule((Callable<?>) sr, delay, unit);
   }
   sr.setFuture(f);
   return sr;
}

       在这里,将订阅的消息体任务提交到线程池去执行。接下来,我们看看为什么订阅的消息体是一个线程池可以执行的任务呢?如下所示:

​​​​​​​run

static final class ScheduledScalar<T>

implements QueueSubscription<T>, InnerProducer<T>, Runnable {

        ... ...

@Override

public void run() {

try {

if (fusionState == NO_VALUE) {

fusionState = HAS_VALUE;

}

actual.onNext(value);

actual.onComplete();

}

finally {

FUTURE.lazySet(this, FINISHED);

}

}

       在这里,ScheduledScalar实现了Runnable 接口,并且实现了run()方法,所以,订阅的消息体就是一个线程池可以执行的任务了。该线程池任务的执行逻辑如下所示:

​​​​​​​onNext

public final void onNext(T x) {
   Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
   if (s == Operators.cancelledSubscription()) {
      Operators.onNextDropped(x, this.initialContext);
      return;
   }
   if (consumer != null) {
      try {
         consumer.accept(x);
      }
      catch (Throwable t) {
         Exceptions.throwIfFatal(t);
         s.cancel();
         doError(t);
      }
   }
   if (completeConsumer != null) {
      try {
         completeConsumer.run();
      }
      catch (Throwable t) {
         Operators.onErrorDropped(t, this.initialContext);
      }
   }
}

       在这里,调用数据消费者的onNext()方法执行相关的消费逻辑。

       至此,大致流程就结束了。

时序图

  1. 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
  2. Mono和MonoSubscribeOnValue是数据发布者,LambdaMonoSubscriber是数据订阅者,ScheduledScalar是订阅的消息体;
  3. 不同点在于,多了Schedulers(暂且叫着调度器工厂)和ParallelScheduler调度器;以及ScheduledScalar在执行request方法时,需要将任务交由调度器来处理。
  4. Schedulers类里有一个Factory接口,该接口可以默认创建各种Scheduler调度器对象,如(ElasticScheduler、BoundedElasticScheduler、ParallelScheduler、SingleScheduler),这就是典型的工厂方法设计模式。

类图

数据发布者

MonoSubscribeOnValue

MonoSubscribeOnValue与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。

不同点在于,该数据发布者多了一个属性,如下所示:

final Scheduler scheduler;

该属性带有线程池ScheduledExecutorService信息,可以为数据订阅者提供异步执行的功能。

调度器

ParallelScheduler

  1. Scheduler

    提供了接口:Disposable schedule(Runnable task);

  1. ParallelScheduler

    该类封装了线程池信息;实现了接口schedule(Runnable task),用于提供对所封装的线程池的调度。

数据订阅者

LambdaMonoSubscriber

LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。

订阅的消息体

ScheduledScalar

       ScheduledScalar与《spring响应式编程系列:总体流程》介绍的类似,都实现了Subscription接口。

       不同点在于,ScheduledScalar实现了Runnable接口,从而可以提供给线程池执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/79857.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频编解码学习十二之Android疑点

一、android.view.SurfaceControl.setDisplaySurface的作用 android.view.SurfaceControl.setDisplaySurface 是 Android 系统中一个 native 层级别的 API&#xff0c;主要用于 设置某个物理显示屏&#xff08;Display&#xff09;的输出 Surface&#xff0c;属于 SurfaceFlin…

家用或办公 Windows 电脑玩人工智能开源项目配备核显的必要性(含 NPU 及显卡类型补充)

一、GPU 与显卡的概念澄清 首先需要明确一个容易误解的概念&#xff1a;GPU 不等同于显卡。 显卡和GPU是两个不同的概念。 【概念区分】 在讨论图形计算领域时&#xff0c;需首先澄清一个常见误区&#xff1a;GPU&#xff08;图形处理单元&#xff09;与显卡&#xff08;视…

Python----神经网络(《Deep Residual Learning for Image Recognition》论文和ResNet网络结构)

一、论文 1.1、论文基本信息 标题&#xff1a;Deep Residual Learning for Image Recognition 作者&#xff1a;Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun 单位&#xff1a;Microsoft Research 会议&#xff1a;CVPR 2016 主要贡献&#xff1a;提出了一种深度残…

Qt/C++开发监控GB28181系统/录像文件查询/录像回放/倍速播放/录像文件下载

一、前言 搞定了实时预览后&#xff0c;另一个功能就是录像回放&#xff0c;录像回放和视频点播功能完全一致&#xff0c;唯一的区别就是发送点播的sdp信息中携带了开始时间和结束时间&#xff0c;因为是录像文件&#xff0c;所以有这个时间&#xff0c;而实时视频预览这个对应…

在Spark搭建YARN

&#xff08;一&#xff09;什么是SparkONYarn模式 Spark on YARN&#xff08;Yet Another Resource Negotiator&#xff09;是 Spark 框架在 Hadoop 集群中运行的一种部署模式&#xff0c;它借助 Hadoop YARN 来管理资源和调度任务。 架构组成 ResourceManager&#xff1a;作…

SpringAI

机器学习&#xff1a; 定义&#xff1a;人工智能的子领域&#xff0c;通过数据驱动的方法让计算机学习规律&#xff0c;进行预测或决策。 核心方法&#xff1a; 监督学习&#xff08;如线性回归、SVM&#xff09;。 无监督学习&#xff08;如聚类、降维&#xff09;。 强化学…

如何用Redis实现分布式锁?RedLock算法的核心思想?Redisson的看门狗机制原理?

一、Redis分布式锁基础实现 public class RedisDistributedLock {private JedisPool jedisPool;private String lockKey;private String clientId;private int expireTime 30; // 默认30秒public boolean tryLock() {try (Jedis jedis jedisPool.getResource()) {// NX表示不…

前端面试宝典---js垃圾回收机制

什么是垃圾回收 垃圾回收是指一种自动内存管理机制&#xff0c;当声明一个变量时&#xff0c;会在内存中开辟一块内存空间用于存放这个变量。当这个变量被使用过后&#xff0c;可能再也不需要它了&#xff0c;此时垃圾回收器会自动检测并回收这些不再使用的内存空间。垃圾回收…

阿里妈妈LMA2新进展:集成大语言模型与电商知识的通用召回大模型URM

近日&#xff0c;阿里妈妈在国际顶级学术会议 —— 国际万维网大会&#xff08;International World Wide Web Conference, 简称WWW&#xff09;上共同主持了计算广告算法技术相关的Tutorial&#xff08;讲座&#xff09;&#xff0c;介绍了计算广告领域的技术发展脉络&#xf…

数字孪生实时监控汽车零部件工厂智能化巡检新范式

在汽车制造业面临数字化转型时&#xff0c;汽车零部件工厂也面临着提升生产效率、降低运营成本和增强市场竞争力的多重挑战。传统的巡检方式已经难以满足现代工厂对高效、精准管理和实时决策的需求。数字孪生系统的出现&#xff0c;为汽车零部件工厂提供了一种创新的智能化巡检…

【计算机网络】3数据链路层②

1. 数据链路层所处的地位 数据链路层使用的信道主要有两种: ①点对点信道:PPP协议 ②广播信道:有线局域网,CSMA/CD协议;无线局域网,CSMA/CA协议 对比项点对点信道 vs 单播广播信道 vs 广播核心是否一致✅ 一致(一对一传输)✅ 一致(一对所有传输)差异点前者是物理层…

c++中的函数(默认参数,占位参数,重载)

1&#xff0c;函数默认参数 在c中&#xff0c;函数的形参列表中的形参是可以有默认值得 语法&#xff1a;返回值类型 函数名 &#xff08;参数 默认值&#xff09;{} 示例&#xff1a; #include<iostream> using namespace std;//函数默认参数 // 就是如果传了就…

【原创】使用阿里云存放一个临时共享的文件

在某些场合&#xff0c;需要临时将一个文件存储到一个可被公网访问的地方&#xff0c;某个服务需要访问一下这个文件。这个文件基本上就是一次寿命&#xff0c;也就是你上传一下&#xff0c;然后被访问一下&#xff0c;这个文件的寿命就结束了。 对于这种需求&#xff0c;自建…

Python中列表(list)知识详解(2)和注意事项以及应用示例

在 Python 中列表&#xff08;list&#xff09; 的包括其结构、常见操作&#xff08;更新、添加、删除、查找、队列栈行为等&#xff09;&#xff0c;下面将逐一的进行讲解并附相关的示例。 一、列表的基础知识 1. 定义与特点 定义方式&#xff1a;用 [] 包裹的有序可变集合 …

vscode extention踩坑记

# npx vsce package --allow-missing-repository --no-dependencies #耗时且不稳定 npx vsce package --allow-missing-repository #用这行 code --install-extension $vsixFileName --force我问ai&#xff1a;为什么我的.vsix文件大了那么多 ai答&#xff1a;因为你没有用 --n…

移动端巡检点检,让设备管理更便捷高效

在企业设备管理的日常工作中&#xff0c;巡检点检是保障设备正常运行的重要环节。传统的巡检方式依赖纸质记录、人工操作&#xff0c;效率低、易出错&#xff0c;已难以满足现代企业的管理需求。随着技术发展&#xff0c;越来越多设备管理系统引入移动端功能&#xff0c;为设备…

laravel 中使用的pdf 扩展包 laravel-snappy(已解决中文乱码)

Centos7 安装 wkhtmltopdf 1、先查看系统是 32 位的还是 64 位的 uname -a2、通过 composer 安装 wkhtmltopdf 32位: $ composer require h4cc / wkhtmltopdf-i386 0.12.x $ composer require h4cc / wkhtmltoimage-i386 0.12.x 64位: $ composer require h4cc/wkhtmltopdf-…

Rust:重新定义系统编程的安全与效率边界

在软件工程领域&#xff0c;内存安全漏洞每年造成数千亿美元损失&#xff0c;而C/C生态中60%的漏洞源于指针误用。正是在这样的背景下&#xff0c;Rust凭借其革命性的内存安全机制异军突起。作为一门现代系统级编程语言&#xff0c;Rust不仅解决了困扰开发者数十年的内存管理难…

C++学习细节回顾(汇总二)

一.初始化列表相关 1.初始化顺序受申明顺序影响 2.在必要时可以部分不采用初始化列表&#xff0c;避免受特性1影响 二.非类型模板参数 template< class T , size_t N 10 > 三.特化–特殊化处理 template< class T > bool less(T left , T right) { return left&…

勾选某一行的勾选框,更改当前行的颜色,ALV数据发生变化的事件

文章目录 屏幕ALV的创建定义变量注册事件方法定义方法实现frm_data_change 效果 屏幕 ALV的创建 DATA: g_gui_custom_container TYPE REF TO cl_gui_custom_container. DATA: g_gui_alv_grid TYPE REF TO cl_gui_alv_grid.DATA: gt_listheader TYPE slis_t_listheader, &quo…