响应式编程知多少 | Rx.NET 了解下

640?wx_fmt=png

1. 引言

An API for asynchronous programming with observable streams. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.ReactiveX 使用可观察数据流进行异步编程的API。 ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

关于Reactive(本文统一译作响应式),有一个The Reactive Manifesto【响应式宣言】:响应式系统(Reactive System)具备以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)。

640?wx_fmt=png

很显然开发一个响应式系统,并不简单。 那本文就来讲一讲如何基于Rx.NET进行响应式编程,进而开发更加灵活、松耦合、可伸缩的响应式系统。

2. 编程范式

在开始之前呢,我们有必要了解下几种编程范式:命令式编程、声明式编程、函数式编程和响应式编程。

命令式编程:命令式编程的主要思想是关注计算机执行的步骤,即一步一步告诉计算机先做什么再做什么。

  1. //1. 声明变量

  2. List<int> results = new List<int>();

  3. //2. 循环变量

  4. foreach(var num in Enumerable.Range(1,10))

  5. {

  6.    //3. 添加条件

  7.    if (num > 5)

  8.    {  

  9.        //4. 添加处理逻辑

  10.        results.Add(num);

  11.        Console.WriteLine(num);

  12.    }

  13. }

声明式编程:声明式编程是以数据结构的形式来表达程序执行的逻辑。它的主要思想是告诉计算机应该做什么,但不指定具体要怎么做。

  1. var nums = from num in Enumerable.Range(1,10) where num > 5 select num

函数式编程:主要思想是把运算过程尽量写成一系列嵌套的函数调用。

  1. Enumerable.Range(1, 10).Where(num => num > 5).ToList().ForEach(Console.WriteLine);

响应式编程:响应式编程是一种面向数据流和变化传播的编程范式,旨在简化事件驱动应用的实现。响应式编程专注于如何创建依赖于变更的数据流并对变化做出响应。

  1. IObservable<int> nums = Enumerable.Range(1, 10).ToObservable();


  2. IDisposable subscription = nums.Where(num => num > 5).Subscribe(Console.WriteLine);


  3. subscription.Dispose();

3. Hello Rx.NET

从一个简单的Demo开始。 假设我们现在模拟电热壶烧水,实时输出当前水温,一般我们会这样做:

  1. Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine);

  2. // do something else. 阻塞

假设当前程序是智能家居的中控设备,不仅控制电热壶烧水,还控制其他设备,为了避免阻塞主线程。一般我们会创建一个Thread或Task去做。

  1. Task.Run(() => Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine));

  2. // do something else. 非阻塞

假设现在我们不仅要在控制台输出而且还要实时通过扬声器报警。这时我们应该想到委托和事件。

  1. class Heater

  2. {

  3.    private delegate void TemperatureChanged(int temperature);

  4.    private event TemperatureChanged TemperatureChangedEvent;

  5.    public void BoilWater()

  6.    {

  7.        TemperatureChangedEvent += ShowTemperature;

  8.        TemperatureChangedEvent += MakeAlerm;

  9.        Task.Run(

  10.            () =>

  11.        Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))

  12.        );

  13.    }

  14.    private void ShowTemperature(int temperature)

  15.    {

  16.        Console.WriteLine($"当前温度:{temperature}");

  17.    }

  18.    private void MakeAlerm(int temperature)

  19.    {

  20.        Console.WriteLine($"嘟嘟嘟,当前水温{temperature}");

  21.    }

  22. }

  23. class Program

  24. {

  25.    static void Main(string[] args)

  26.    {

  27.        Heater heater = new Heater();        

  28.        heater.BoilWater();

  29.    }

  30. }

瞬间代码量就上去了。但是借助Rx.NET,我们可以简化成以下代码:

  1. var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//申明可观察序列

  2. Subject<int> subject = new Subject<int>();//申明Subject

  3. subject.Subscribe((temperature) => Console.WriteLine($"当前温度:{temperature}"));//订阅subject

  4. subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,当前水温:{temperature}"));//订阅subject

  5. observable.Subscribe(subject);//订阅observable

仅仅通过以下三步:

  1. 调用 ToObservable将枚举序列转换为可观察序列。

  2. 通过指定 NewTheadScheduler.Default来指定在单独的线程进行枚举。

  3. 调用 Subscribe方法进行事件注册。

  4. 借助 Subject进行多播传输

通过以上我们可以看到Rx.NET大大简化了事件处理的步骤,而这只是Rx的冰山一角。

4. Rx.NET 核心

Reactive Extensions(Rx)是一个为.NET应用提供响应式编程模型的库,用来构建异步基于事件流的应用,通过安装 System.ReactiveNuget包进行引用。Rx将事件流抽象为Observable sequences(可观察序列)表示异步数据流,使用LINQ运算符查询异步数据流,并使用 Scheduler来控制异步数据流中的并发性。简单地说:Rx = Observables + LINQ + Schedulers。

640?wx_fmt=png

在软件系统中,事件是一种消息用于指示发生了某些事情。事件由Event Source(事件源)引发并由Event Handler(事件处理程序)使用。 在Rx中,事件源可以由observable表示,事件处理程序可以由observer表示。 但是应用程序使用的数据如何表示呢,例如数据库中的数据或从Web服务器获取的数据。而在应用程序中我们一般处理的数据无外乎两种:静态数据和动态数据。 但无论使用何种类型的数据,其都可以作为流来观察。换句话说,数据流本身也是可观察的。也就意味着,我们也可以用observable来表示数据流。

640?wx_fmt=png

讲到这里,Rx.NET的核心也就一目了然了:

  1. 一切皆为数据流

  2. Observable 是对数据流的抽象

  3. Observer是对Observable的响应

在Rx中,分别使用 IObservable<T>IObserver<T>接口来表示可观察序列和观察者。它们预置在system命名空间下,其定义如下:

  1. public interface IObservable<out T>

  2. {

  3.      //Notifies the provider that an observer is to receive notifications.

  4.      IDisposable Subscribe(IObserver<T> observer);

  5. }


  6. public interface IObserver<in T>

  7. {

  8.    //Notifies the observer that the provider has finished sending push-based notifications.

  9.    void OnCompleted();


  10.    //Notifies the observer that the provider has experienced an error condition.

  11.    void OnError(Exception error);


  12.    //Provides the observer with new data.

  13.    void OnNext(T value);

  14. }

640?wx_fmt=png

5. 创建IObservable

创建 IObservable<T>主要有以下几种方式:

1. 直接实现 IObservable<T>接口

2. 使用 Observable.Create创建

  1. Observable.Create<int>(observer=>{

  2.    for (int i = 0; i < 5; i++)

  3.    {

  4.        observer.OnNext(i);

  5.    }

  6.    observer.OnCompleted();

  7.    return Disposable.Empty;

  8. })

3. 使用 Observable.Deffer进行延迟创建(当有观察者订阅时才创建)比如要连接数据库进行查询,如果没有观察者,那么数据库连接会一直被占用,这样会造成资源浪费。使用Deffer可以解决这个问题。

  1. Observable.Defer(() =>

  2. {

  3.    var connection = Connect(user, password);

  4.    return connection.ToObservable();

  5. });

4. 使用 Observable.Generate创建迭代类型的可观察序列

  1. IObservable<int> observable =

  2.    Observable.Generate(

  3.        0,              //initial state

  4.        i => i < 10,    //condition (false means terminate)

  5.        i => i + 1,     //next iteration step

  6.        i => i * 2);      //the value in each iteration

5. 使用 Observable.Range创建指定区间的可观察序列

  1. IObservable<int> observable = Observable.Range (0, 10).Select (i => i * 2);

6. 创建特殊用途的可观察序列

  1. Observable.Return ("Hello World");//创建单个元素的可观察序列

  2. Observable.Never<string> ();//创建一个空的永远不会结束的可观察序列

  3. Observable.Throw<ApplicationException> (

  4. new ApplicationException ("something bad happened"))//创建一个抛出指定异常的可观察序列

  5. Observable.Empty<string> ()//创建一个空的立即结束的可观察序列

7. 使用 ToObservable转换 IEnumerate和Task类型

  1. Enumerable.Range(1, 10).ToObservable();

  2. IObservable<IEnumerable<string>> resultsA = searchEngineA.SearchAsync(term).ToObservable();

8. 使用 Observable.FromEventPattern<T>Observable.FromEvent<TDelegate,TEventArgs>进行事件的转换

  1. public delegate void RoutedEventHandler(object sender,

  2. System.Windows.RoutedEventArgs e)

  3. IObservable<EventPattern<RoutedEventArgs>> clicks =

  4.                Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(

  5.                    h => theButton.Click += h,

  6.                    h => theButton.Click -= h);

  7. clicks.Subscribe(eventPattern => output.Text += "button clicked" + Environment.NewLine);

9. 使用 Observable.Using进行资源释放

  1. IObservable<string> lines =

  2.    Observable.Using (

  3.        () => File.OpenText ("TextFile.txt"), // opens the file and returns the stream we work with

  4.        stream =>

  5.        Observable.Generate (

  6.            stream, //initial state

  7.            s => !s.EndOfStream, //we continue until we reach the end of the file

  8.            s => s, //the stream is our state, it holds the position in the file

  9.            s => s.ReadLine ()) //each iteration will emit the current line (and moves to the next)

  10.    );

10. 使用 Observable.Interval创建指定间隔可观察序列640?wx_fmt=png

11. 使用 Observable.Timer创建可观察的计时器640?wx_fmt=png

6. RX 操作符

创建完IObservable后,我们可以对其应用系列Linq操作符,对其进行查询、过滤、聚合等等。Rx内置了以下系列操作符:640?wx_fmt=png下面通过图示来解释常用操作符的作用:640?wx_fmt=png

7. 多播传输靠:Subject

基于以上示例,我们了解到,借助Rx可以简化事件模型的实现,而其实质上就是对观察者模式的扩展。提到观察者模式,我们知道一个Subject可以被多个观察者订阅,从而完成消息的多播。同样,在Rx中,也引入了Subject用于多播消息传输,不过Rx中的Subject具有双重身份——即是观察者也是被观察者。

  1. interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>

  2. {

  3. }

Rx中默认提供了以下四种实现:

Subject- 向所有观察者广播每个通知640?wx_fmt=png

AsyncSubject- 当可观察序列完成后有且仅发送一个通知640?wx_fmt=png

ReplaySubject- 缓存指定通知以对后续订阅的观察者进行重放640?wx_fmt=png

BehaviorSubject- 推送默认值或最新值给观察者640?wx_fmt=png

但对于第一种 Subject<T>有一点需要指出,当其有多个观察者序列时,一旦其中一个停止发送消息,则Subject就停止广播所有其他序列后续发送的任何消息。

640?wx_fmt=png

8. 有温度的可观察者序列

对于Observable,它们是有温度的,有冷热之分。它们的区别如下图所示:640?wx_fmt=png

Cold Observable:有且仅当有观察者订阅时才发送通知,且每个观察者独享一份完整的观察者序列。

Hot Observable:不管有无观察者订阅都会发送通知,且所有观察者共享同一份观察者序列。

9. 一切皆在掌控:Scheduler

在Rx中,使用Scheduler来控制并发。而对于Scheduler我们可以理解为程序调度,通过Scheduler来规定在什么时间什么地点执行什么事情。Rx提供了以下几种Scheduler:

  1. NewThreadScheduler:即在新线程上执行

  2. ThreadPoolScheduler:即在线程池中执行

  3. TaskPoolScheduler:同ThreadPoolScheduler

  4. CurrentThreadScheduler:在当前线程执行

  5. ImmediateScheduler:在当前线程立即执行

  6. EventLoopScheduler:创建一个后台线程按序执行所有操作

举例而言:

  1. Observable.Return("Hello",NewThreadScheduler.Default)

  2. .Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")

  3. );

  4. Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");


  5. 以上输出:

  6. Current ThreadId1

  7. Hello on ThreadId4

10. 最后

罗里吧嗦的总算把《Rx.NET In Action》这本书的内容大致梳理了一遍,对Rx也有了一个更深的认识,Rx扩展了观察者模式用于支持数据和事件序列,内置系列操作符允许我们以声明式的方式组合这些序列,且无需关注底层的实现进行事件驱动开发:如线程、同步、线程安全、并发数据结构和非阻塞IO。

但事无巨细,难免疏漏。对响应式编程有兴趣的不妨拜读下此书,相信对你会大有裨益。

参考资料:

Rx.NET in Action.pdf

ReactiveX

.Net中的反应式编程(Reactive Programming)


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/317167.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构之trie树——First! G,电子字典,Type Printer,Nikitosh and xor

文章目录[USACO12DEC]First! G[JSOI2009]电子字典[IOI2008] Type PrinterNikitosh and xor[USACO12DEC]First! G luogu3065 考虑每一个字符串成为答案的可能 这意味着从字典树根到字符串最后一位就恰好对应重新定义的字典序 在第iii层的时候&#xff0c;想要走特定点&#…

H - Prince and Princess 计蒜客 - 42402

H - Prince and Princess 计蒜客 - 42402 题意: 你现在要寻找公主&#xff0c;有三种人&#xff0c;第一种是说真话的人(至少为1&#xff0c;因为公主是说真话的人)&#xff0c;第二种人是只会说假话的&#xff0c;第三种是胡说八道的(说的话真假都有可能)。现在给你三种人的…

模板:min-max容斥离散随机变量的几何分布(洛谷P3175:[HAOI2015]按位或)

前言 见到一道神题&#xff0c;学会两个知识点… 都是数学。 min-max容斥 给出式子&#xff1a; max⁡(S)∑T⊂S(−1)∣T∣1min⁡(T)\max(S)\sum_{T\sub S}(-1)^{|T|1}\min(T)max(S)T⊂S∑​(−1)∣T∣1min(T) min⁡(S)∑T⊂S(−1)∣T∣1max⁡(T)\min(S)\sum_{T\sub S}(-1)^…

杭电多校杂题收录

前言 和学长学弟一起打的hdu多校&#xff0c;打的很菜没啥难题收录&#xff0c;因为难的我都不会做。 正题 hdu7152-Copy 题目链接:http://acm.hdu.edu.cn/showproblem.php?pid7152 题目大意 nnn个数字的序列aaa&#xff0c;mmm次操作&#xff0c;每次将一段[l,r][l,r][l,r…

.NET Core中的验证组件FluentValidation的实战分享

今天有人问我能不能出一篇FluentValidation的教程&#xff0c;刚好今天在实现我们的.NET Core实战项目之CMS的修改密码部分的功能中有用到FluentValidation&#xff0c;所以就以修改用户密码为实例来为大家进行一下ASP.NET Core中的验证组件FluentValidation的实战分享&#xf…

笛卡尔树详解带建树模板及例题运用(Largest Submatrix of All 1’s,洗车 Myjnie,Removing Blocks,SPOJ PERIODNI)

文章目录笛卡尔树介绍例题Largest Submatrix of All 1’s应用「POI2015」洗车 Myjnie[AGC028B] Removing BlocksSPOJ PERIODNI笛卡尔树 介绍 笛卡尔树是一种数据结构&#xff0c;每个点由两个值&#xff0c;键值key和权值val&#xff0c;组成 其键值满足二叉树性质 即点的左子…

K - Triangle 计蒜客 - 42405

K - Triangle 计蒜客 - 42405 题意&#xff1a; 给你一个三角形的三点&#xff0c;再给你三角形边上一个点&#xff0c;让你求另一个点(也要在三角形上)&#xff0c;使得平分三角形的面积 题解: 计算几何 三角形的三边ab&#xff0c;ac&#xff0c;bc 如果点p在ab上&#x…

P2508-[HAOI2008]圆上的整点【数学】

正题 题目链接:https://www.luogu.com.cn/problem/P2508 题目大意 一个在(0,0)(0,0)(0,0)的圆心&#xff0c;半径为rrr&#xff0c;求圆有多少个整点。 1≤r≤21091\leq r\leq 2\times 10^91≤r≤2109 解题思路 设这个点为(x,y)(x,y)(x,y)&#xff0c;那么有x2y2r2x^2y^2r^2…

如何为ASP.NET Core设置客户端IP白名单验证

本篇博文中展示了如何在ASP.NET Core应用程序中设置IP白名单验证的3种方式。你可以使用一下3种方式&#xff1a;使用中间件检查每个请求的远程IP地址使用Action过滤器为指定的Controller或action方法添加针对远程IP地址的检查使用IPageFilter为Razor Pages应用添加针对远程IP地…

CodeForces - 140C New Year Snowmen

CodeForces - 140C New Year Snowmen 题意&#xff1a; 现在来做雪人&#xff0c;每个雪人由三个不同大小的雪球构成&#xff1a;一个大的&#xff0c;一个中等的&#xff0c;一个小的。现在有 n 个雪球半径分别为 r1, r2, …, rn. 为了做雪人&#xff0c;三个雪球的大小必须…

洛谷P4389:付公主的背包(多项式、生成函数)

对于一些生成函数累乘的题目&#xff0c;也许可以通过求 ln⁡\lnln 转化为累加问题从而完成简化。 解析 不难写出对于单个物品 kkk 的生成函数&#xff1a; ∑i1xVi11−xVK\sum_{i1}x^{Vi}\frac{1}{1-x^{V_K}}i1∑​xVi1−xVK​1​ 那么答案的生成函数就是所有物品的函数的卷积…

数据结构之fhq-treap——Chef and Sets,[HNOI2012]永无乡,Play with Chain,[NOI2005]维修数列(结构体版代码)

因为非常板&#xff0c;所以主要是代码Tyvj 1728 普通平衡树Chef and Sets[HNOI2012]永无乡Play with Chain[NOI2005]维修数列题目很水&#xff0c;所以可能会出现代码部分细节出锅&#xff0c;但确实这些代码是能过得 还请多多包涵 Tyvj 1728 普通平衡树 luogu3369 #include…

让ASP.NET Core支持GraphQL之-GraphQL的实现原理

众所周知RESTful API是目前最流行的软件架构风格之一&#xff0c;它主要用于客户端和服务器交互类的软件。基于这个风格设计的软件可以更简洁&#xff0c;更有层次&#xff0c;更易于实现缓存等机制。RESTful的优越性是毋庸置疑的&#xff0c;不过GraphQL也可以作为一种补充&am…

CodeForces 1514A Perfectly Imperfect Array

CodeForces 1514A Perfectly Imperfect Array 题意&#xff1a; 给你n个数&#xff0c;是否存在一个数不是平方数 题解&#xff1a; 先开方&#xff0c;转int&#xff0c;判断是否等于平方 代码&#xff1a; #include<bits/stdc.h> #define debug(a,b) printf(&quo…

另一个博客

在博客园搞了个博客&#xff0c;目前来说两边会同时更新的。 有些题目不放出来&#xff0c;都写在来博客园那边&#xff0c;虽然你们也不知道密码 链接:https://www.cnblogs.com/QuantAsk/

洛谷P4173:残缺的字符串(FFT、通配符匹配)

解析 通配符匹配的经典题。 设单词串为 AAA,文章串为 BBB。 把 AAA 翻转一下&#xff0c;判断问题就能转化为一个卷积的形式&#xff1a; F(p)&i0m−1match(Ai1,Bp−i)F(p)\&_{i0}^{m-1}match(A_{i1},B_{p-i})F(p)&i0m−1​match(Ai1​,Bp−i​) match(a,b)match(…

[2021-09-02 contest]CF1251C,可达性统计(bitset优化dp),Boomerang Tournament(状压dp),小蓝的好友(mrx)(treap平衡树)

文章目录CF1251C Minimize The Integeracwing164&#xff1a;可达性统计Facebook Hacker Cup 2016 Round 1 Boomerang Tournament[Zjoi2012]小蓝的好友(mrx)CF1251C Minimize The Integer ………………… 给你一个大整数aaa&#xff0c;它由nnn位数字&#xff0c;也可能有前导…

Entity Framework 的一些性能建议

点击上方蓝字关注“汪宇杰博客”这是一篇我在2012年写的老文章&#xff0c;至今适用&#xff08;没错&#xff0c;我说的就是适用于EF Core&#xff09;。因此使用微信重新推送&#xff0c;希望能帮到大家。自从我用了EF&#xff0c;每次都很关心是否有潜在的性能问题。所以每次…

AND 0, Sum Big CodeForces - 1514B

AND 0, Sum Big CodeForces - 1514B 题意&#xff1a; 构造一个含n个k位二进制数的序列&#xff0c;使得序列中所有数按位与的结果为0&#xff0c;且序列和最大&#xff0c;求构造方案数。 题解&#xff1a; 对于n个数的每一位&#xff0c;都至少有个0&#xff0c;这样可以…

CF438E:The Child and Binary Tree(生成函数)

解析&#xff1a; 设计 fif_ifi​ 表示权值为 iii 的方案数&#xff0c;f01f_01f0​1。 枚举根节点权值&#xff0c;可以写出转移&#xff1a; fn∑gk∑ififn−k−i∑ijknfifjgkf_n\sum g_k\sum_{i}f_if_{n-k-i}\sum_{ijkn}f_if_jg_kfn​∑gk​i∑​fi​fn−k−i​ijkn∑​fi​…