RACSignal的Subscription深入分析

ReactiveCocoa是一个FRP的思想在Objective-C中的实现框架,目前在美团的项目中被广泛使用。对于ReactiveCocoa的基本用法,网上有很多相关的资料,本文不再讨论。RACSignal是ReactiveCocoa中一个非常重要的概念,而本文主要关注RACSignal的实现原理。在阅读之前,你需要基本掌握RACSignal的基本用法

本文主要包含2个部分,前半部分主要分析RACSignal的subscription过程,后半部分是对前半部分的深入,在subscription过程的基础上分析ReactiveCocoa中比较难理解的两个操作:multicast && replay。

PS:为了解释清楚,我们下面只讨论next,不讨论error以及completed,这二者与next类似。本文基于ReactiveCocoa 2.x版本。

我们先刨析RACSignal的subscription过程

RACSignal的常见用法

-(RACSignal *)signInSignal {
// part 1:[RACSignal createSignal]来获得signalreturn [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {[self.signInServicesignInWithUsername:self.usernameTextField.textpassword:self.passwordTextField.textcomplete:^(BOOL success) {// part 3: 进入didSubscribe,通过[subscriber sendNext:]来执行next block[subscriber sendNext:@(success)];[subscriber sendCompleted];}];return nil;}];
}// part 2 : [signal subscribeNext:]来获得subscriber,然后进行subscription
[[self signInSignal] subscribeNext:^(id x) { NSLog(@"Sign in result: %@", x); 
}];

Subscription过程概括

RACSignal的Subscription过程概括起来可以分为三个步骤:

  1. [RACSignal createSignal]来获得signal
  2. [signal subscribeNext:]来获得subscriber,然后进行subscription
  3. 进入didSubscribe,通过[subscriber sendNext:]来执行next block

步骤一:[RACSignal createSignal]来获得signal

RACSignal.m中:
+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {return [ RACDynamicSignal   createSignal :didSubscribe];
}
RACDynamicSignal.m中
+ ( RACSignal *)createSignal:( RACDisposable * (^)( id < RACSubscriber > subscriber))didSubscribe {RACDynamicSignal *signal = [[ self   alloc ] init ];signal-> _didSubscribe = [didSubscribe copy ];return [signal setNameWithFormat : @"+createSignal:" ];
}

[RACSignal createSignal]会调用子类RACDynamicSignal的createSignal来返回一个signal,并在signal中保存后面的 didSubscribe这个block

步骤二:[signal subscribeNext:]来获得subscriber,然后进行subscription

RACSignal.m中:
- ( RACDisposable *)subscribeNext:( void (^)( id x))nextBlock {RACSubscriber *o = [ RACSubscriber   subscriberWithNext :nextBlock error : NULL   completed : NULL ];return [ self  subscribe :o];
}
RACSubscriber.m中:+ ( instancetype )subscriberWithNext:( void (^)( id x))next error:( void (^)( NSError *error))error completed:( void (^)( void ))completed {RACSubscriber *subscriber = [[ self   alloc ] init ];subscriber-> _next = [next copy ];subscriber-> _error = [error copy ];subscriber-> _completed = [completed copy ];return subscriber;
}
RACDynamicSignal.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];if (self.didSubscribe != NULL) {RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{RACDisposable *innerDisposable = self.didSubscribe(subscriber);[disposable addDisposable:innerDisposable];}];[disposable addDisposable:schedulingDisposable];}return disposable;
}
  1. [signal subscribeNext]先会获得一个subscriber,这个subscriber中保存了nextBlock、errorBlock、completedBlock
  2. 由于这个signal其实是RACDynamicSignal类型的,这个[self subscribe]方法会调用步骤一中保存的didSubscribe,参数就是1中的subscriber

步骤三:进入didSubscribe,通过[subscriber sendNext:]来执行next block

RACSubscriber.m中:
- (void)sendNext:(id)value {@synchronized (self) {void (^nextBlock)(id) = [self.next copy];if (nextBlock == nil) return;nextBlock(value);}
}

任何时候这个[subscriber sendNext:],就直接调用nextBlock

signal的subscription过程回顾

从上面的三个步骤,我们看出:

  • 先通过createSignal和subscribeNext这两个调用,声明了流中value到来时的处理方式
  • didSubscribe block块中异步处理完毕之后,subscriber进行sendNext,自动处理

搞清楚了RAC的subscription过程,接着在此基础上我们讨论一个RACSignal中比较容易混淆的两个操作:multicast和replay。

为什么要清楚这两者的原理

RACSignal+Operation.h中
- (RACMulticastConnection *)publish;- (RACMulticastConnection *)multicast:(RACSubject *)subject;- (RACSignal *)replay;- (RACSignal *)replayLast;- (RACSignal *)replayLazily;
  • 在RACSignal+Operation.h中,连续定义了5个跟我们这个主题有关的RACSignal的操作,这几个操作的区别很细微,但用错的话很容易出问题。只有理解了原理之后,才明白它们之间的细微区别
  • 很多时候我们意识不到需要用这些操作,这就可能因为side effects执行多次而导致程序bug

multicast && replay的应用场景

“Side effects occur for each subscription by default, but there are certain situations where side effects should only occur once – for example, a network request typically should not be repeated when a new subscriber is added.”

// 引用ReactiveCocoa源码的Documentation目录下的一个例子
// This signal starts a new request on each subscription.
RACSignal *networkRequest = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {AFHTTPRequestOperation *operation = [clientHTTPRequestOperationWithRequest:requestsuccess:^(AFHTTPRequestOperation *operation, id response) {[subscriber sendNext:response];[subscriber sendCompleted];}failure:^(AFHTTPRequestOperation *operation, NSError *error) {[subscriber sendError:error];}];[client enqueueHTTPRequestOperation:operation];return [RACDisposable disposableWithBlock:^{[operation cancel];}];
}];// Starts a single request, no matter how many subscriptions `connection.signal`
// gets. This is equivalent to the -replay operator, or similar to
// +startEagerlyWithScheduler:block:.
RACMulticastConnection *connection = [networkRequest multicast:[RACReplaySubject subject]];
[connection connect];[connection.signal subscribeNext:^(id response) {NSLog(@"subscriber one: %@", response);
}];[connection.signal subscribeNext:^(id response) {NSLog(@"subscriber two: %@", response);
}];
  1. 在上面的例子中,如果我们不用RACMulticastConnection的话,那就会因为执行了两次subscription而导致发了两次网络请求。
  2. 从上面的例子中,我们可以看到对一个Signal进行multicast之后,我们是对connection.signal进行subscription而不是原来的networkRequest。这点是”side effects should only occur once”的关键,我们将在后面解释

multicast原理分析

replay是multicast的一个特殊case而已,而multicast的整个过程可以拆分成两个步骤,下面进行详细讨论。

multicast的机制Part 1:

RACMulticastConnection.m中:
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {NSCParameterAssert(source != nil);NSCParameterAssert(subject != nil);self = [super init];if (self == nil) return nil;_sourceSignal = source;_serialDisposable = [[RACSerialDisposable alloc] init];_signal = subject;return self;
}
  • 结合上面的例子来看,RACMulticastConnection的init是以networkRequest作为sourceSignal,而最终connnection.signal指的是[RACReplaySubject subject]
RACMulticastConnection.m中:
- (RACDisposable *)connect {BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);if (shouldConnect) {self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];}return self.serialDisposable;
}
  • 结合上面的RACSignal分析的Subscription过程,[self.sourceSignal subscribe:_signal]会执行self.sourceSignal的didSubscribe这个block。再结合上面的例子,也就是说会把_signal作为subscriber,发网络请求,success的时候,_signal会sendNext,这里的这个signal就是[RACReplaySubject subject]。可以看出,一旦进入到这个didSubscribe中,后续的不管是sendNext还是subscription,都是对这个[RACReplaySubject subject]进行的,与原来的sourceSignal彻底无关了。这就解释了为什么”side effects only occur once”。

multicast的机制Part 2:

在进行multicast的步骤二之前,需要介绍一下RACSubject以及RACReplaySubject

RACSubject

“A subject can be thought of as a signal that you can manually control by sending next, completed, and error.”

RACSubject的一个用法如下:

RACSubject *letters = [RACSubject subject];
// Outputs: A B
[letters subscribeNext:^(id x) {NSLog(@"%@ ", x);
}];
[letters sendNext:@"A"];
[letters sendNext:@"B"];

接下来分析RACSubject的原理 :

RACSubject.m中:
- (id)init {self = [super init];if (self == nil) return nil;_disposable = [RACCompoundDisposable compoundDisposable];_subscribers = [[NSMutableArray alloc] initWithCapacity:1];	return self;
}
  • RACSubject中有一个subscribers数组
RACSubject.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {NSCParameterAssert(subscriber != nil);RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];NSMutableArray *subscribers = self.subscribers;@synchronized (subscribers) {[subscribers addObject:subscriber];}return [RACDisposable disposableWithBlock:^{@synchronized (subscribers) {// Since newer subscribers are generally shorter-lived, search// starting from the end of the list.NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {return obj == subscriber;}];if (index != NSNotFound) [subscribers removeObjectAtIndex:index];}}];
}
  • 从subscribe:的实现可以看出,对RACSubject对象的每次subscription,都是将这个subscriber加到subscribers数组中而已
RACSubject.m中:
- (void)sendNext:(id)value {[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {[subscriber sendNext:value];}];
}
  • 从sendNext:的实现可以看出,每次RACSubject对象sendNext,都会对其中保留的subscribers进行sendNext,如果这个subscriber是RACSignal的话,就会执行Signal的next block。

RACReplaySubject

“A replay subject saves the values it is sent (up to its defined capacity) and resends those to new subscribers.“,可以看出,replaySubject是可以对它send next(error,completed)的东西进行buffer的。 RACReplaySubject是继承自RACSubject的,它的内部的实现例如subscribe:、sendNext:的实现也会调用super的实现。

RACReplaySubject.m中:
- (instancetype)initWithCapacity:(NSUInteger)capacity {self = [super init];if (self == nil) return nil;_capacity = capacity;_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);return self;
}
  • 从init中我们看出,RACReplaySubject对象持有capacity变量(用于决定valuesReceived缓存多少个sendNext:出来的value,这在区分replay和replayLast的时候特别有用)以及valuesReceived数组(用来保存sendNext:出来的value),这二者接下来会重点涉及到。
RACReplaySubject.m中:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{@synchronized (self) {for (id value in self.valuesReceived) {if (compoundDisposable.disposed) return;[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];}if (compoundDisposable.disposed) return;if (self.hasCompleted) {[subscriber sendCompleted];} else if (self.hasError) {[subscriber sendError:self.error];} else {RACDisposable *subscriptionDisposable = [super subscribe:subscriber];[compoundDisposable addDisposable:subscriptionDisposable];}}}];[compoundDisposable addDisposable:schedulingDisposable];return compoundDisposable;
}
  • 从subscribe:可以看出,RACReplaySubject对象每次subscription,都会把之前valuesReceived中buffer的value重新sendNext一遍,然后调用super把当前的subscriber加入到subscribers数组中。
RACReplaySubject.m中:
- (void)sendNext:(id)value {@synchronized (self) {[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];[super sendNext:value];if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];}}
}

从sendNext:可以看出,RACReplaySubject对象会buffer每次sendNext的value,然后会调用super,对subscribers中的每个subscriber,调用sendNext。buffer的数量是根据self.capacity来决定的。

介绍完了RACReplaySubject之后,我们继续进行multicast的part 2部分。

在上面的例子中,我们对connection.signal进行了两次subscription,结合上面的RACReplaySubject的subscription的subscribe:,我们得到以下过程:

  1. [RACReplaySubject subject]会将这两次subscription过程中的subscriber都保存在subscribers数组中
  2. 当网络请求success后,会[subscriber sendNext:response],前面已经讲过这个subscriber就是[RACReplaySubject subject],这样,就会把sendNext:的value保存在valuesReceived数组中,供后续subscription使用(不知道你是否注意到RACReplaySubject的subscribe:中有个for循环),然后对subscribers中保存的每个subscriber执行sendNext。

后续思考

  1. 上面讨论的是RACReplaySubject对象先进行subscription,再进行sendNext,如果是先sendNext,再subscription呢?其实魅力就在于RACReplaySubject的subscribe:中的for循环。具体过程留作思考
  2. 在RACSignal+Operation中关于multicast && replay的,一共有5个操作:publish、multicast、replay、replayLast、replayLazily,他们之间有什么细微的差别呢?相信在我上面内容的基础上,他们之间的细微差别不难理解,这里推荐一篇帮助大家理解的blog

参考资料

ReactiveCocoa github主页 ReactiveCocoa Documentation ReactiveCocoa raywenderlich上的资料

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/477968.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AllenAI 发布万能问答系统 MACAW!各类题型样样精通,性能大幅超越 GPT-3!

文 | python前言GPT-3 等超大规模预训练语言模型&#xff0c;在少监督任务&#xff08;few-shot tasks&#xff09;上取得了令人瞩目的成绩。而这篇文章中&#xff0c;AllenAI的研究员提出了大规模生成式问答模型&#xff0c;MACAW。基于多角度预训练&#xff0c;MACAW可以用于…

论文浅尝 | SMBOP: Semi-autoregressive Bottom-up Semantic Parsing

笔记整理&#xff1a;陈永锐&#xff0c;东南大学博士来源&#xff1a;NAACL 2021概述近年来语义解析的事实上的标准解码方法是使用自顶向下的深度优先遍历对目标程序的抽象语法树进行自回归解码。该工作提出了一种替代方法&#xff1a;半自回归自底向上解析器&#xff08;SMBO…

美团酒店Node全栈开发实践

前后端分离的背景 “前后端分离”显然已不是什么新鲜的话题&#xff0c;Zakas在2013年10月份就曾发表过一篇博客《Node.js and the new web front-end》讨论Node背景下新时代的前端。毫无疑问&#xff0c;Node的出现给JavaScript语言带来了新的生机&#xff0c;也使得前端开发者…

统计学习方法总结

统计学习方法总结 阅读目录(Content)0. 相关知识点0x1: 监督学习1. 模型假设空间2. 生成模型与判别模型的联系与区别 3. 学习策略4. 分类问题与回归问题5. 利用模型进行预测和分析0x2&#xff1a;模型评估与模型选择1. 训练误差与测试误差2. 过拟合与模型选择0x3&#xff1a;正…

LeetCode 997. 找到小镇的法官(图的出度和入度)

1. 题目 在一个小镇里&#xff0c;按从 1 到 N 标记了 N 个人。传言称&#xff0c;这些人中有一个是小镇上的秘密法官。 如果小镇的法官真的存在&#xff0c;那么&#xff1a; 小镇的法官不相信任何人。每个人&#xff08;除了小镇法官外&#xff09;都信任小镇的法官。只有…

哈工大|NLP数据增强方法?我有15种

文 | rumor源 | 李rumor卷友们好&#xff0c;我是rumor。十一假期过的太快了&#xff0c;不知道你们缓过来没有&#xff0c;没有的话今天我们就来一起读一篇综述缓缓&#xff0c;弥补假期没学习的遗憾。这篇40多页的综述出自哈工大车万翔老师的团队&#xff0c;一共总结了15种N…

论文浅尝 | Wordly Wise(WoW) - 用于语音视觉知识问答的跨语言知识融合模型

笔记整理: 谭亦鸣&#xff0c;东南大学博士生来源&#xff1a;NAACL’21链接&#xff1a;https://aclanthology.org/2021.naacl-main.153.pdf论文提出了一种新的知识图谱问答数据集命名为FVSQA&#xff0c;这是一种语音视觉知识问答类型的任务&#xff0c;即问题形式为音频&…

美团Android DEX自动拆包及动态加载简介

概述 作为一个android开发者&#xff0c;在开发应用时&#xff0c;随着业务规模发展到一定程度&#xff0c;不断地加入新功能、添加新的类库&#xff0c;代码在急剧的膨胀&#xff0c;相应的apk包的大小也急剧增加&#xff0c; 那么终有一天&#xff0c;你会不幸遇到这个错误&a…

LeetCode 83. 删除排序链表中的重复元素(链表)

1. 题目 给定一个排序链表&#xff0c;删除所有重复的元素&#xff0c;使得每个元素只出现一次。 示例 1: 输入: 1->1->2 输出: 1->2示例 2: 输入: 1->1->2->3->3 输出: 1->2->3来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#…

用多模态信息做 prompt,解锁 GPT 新玩法

文 | 子龙编 | 小轶自多模态大火以来&#xff0c;井喷式地出现了许多工作&#xff0c;通过改造预训练语言模型&#xff0c;用图像信息来增强语义信息&#xff0c;但主要集中在几个 NLU 任务上&#xff0c;在 NLG 上的研究比较少。今天要介绍的这篇 paper Multimodal Conditiona…

论文浅尝 | 基于时序知识图谱的问答

笔记整理&#xff1a;姚云志&#xff0c;浙江大学在读博士&#xff0c;研究方向为自然语言处理。链接&#xff1a;https://arxiv.org/pdf/2106.01515.pdf时序知识图谱是一种多关系的知识图谱&#xff0c;相较于常规的知识图谱&#xff0c;时序知识图谱中的关系中会与时间段相关…

Logistic Regression 模型简介

逻辑回归&#xff08;Logistic Regression&#xff09;是机器学习中的一种分类模型&#xff0c;由于算法的简单和高效&#xff0c;在实际中应用非常广泛。本文作为美团机器学习InAction系列中的一篇&#xff0c;主要关注逻辑回归算法的数学模型和参数求解方法&#xff0c;最后也…

开源开放 | 中国近代历史人物知识图谱

OpenKG地址&#xff1a;http://openkg.cn/dataset/zgjdlsrw项目地址&#xff1a;http://www.zjuwtx.work/project/kg开放许可协议&#xff1a;CC BY-SA 4.0 &#xff08;署名相似共享&#xff09;贡献者&#xff1a;浙江大学&#xff08;王天笑&#xff09;1、引言中国近代历史…

LeetCode 1071. 字符串的最大公因子(字符串的最大公约数)

1. 题目 对于字符串 S 和 T&#xff0c;只有在 S T … T&#xff08;T 与自身连接 1 次或多次&#xff09;时&#xff0c;我们才认定 “T 能除尽 S”。 返回字符串 X&#xff0c;要求满足 X 能除尽 str1 且 X 能除尽 str2。 示例 1&#xff1a; 输入&#xff1a;str1 &q…

大模型炼丹无从下手?谷歌、OpenAI烧了几百万刀,总结出这些方法论…

文 | Yimin_饭煲都1202年了&#xff0c;不会真有深度学习炼丹侠还没有训练/推理过大模型吧“没吃过猪肉&#xff0c;还没见过猪跑吗&#xff1f;”在深度学习算力高度增长的今天&#xff0c;不论是学术界还是工业界的从业者&#xff0c;即使尚未达到从头预训练一个百亿级别参数…

文本相似度、文本匹配、文本聚类

1 1在Keras的Embedding层中使用预训练的word2vec词向量&#xff1a;https://blog.csdn.net/u012052268/article/details/90238282 import numpy as np import pandas as pd#1准备工作# graph LR # 文本-->分词 # 分词-->训练词向量 # 训练词向量-->保存词向量import …

Linux资源管理之cgroups简介

引子 cgroups 是Linux内核提供的一种可以限制单个进程或者多个进程所使用资源的机制&#xff0c;可以对 cpu&#xff0c;内存等资源实现精细化的控制&#xff0c;目前越来越火的轻量级容器 Docker 就使用了 cgroups 提供的资源限制能力来完成cpu&#xff0c;内存等部分的资源控…

会议交流 | IJCKG 2021:Keynotes released!欢迎注册参会

IJCKG 2021: The 10th International Joint Conference on Knowledge GraphsDecember 6-8, 2021 Online国际知识图谱联合会议之前是国际语义技术联合会议&#xff08;the Joint International Semantic Technology Conference (JIST)&#xff09;&#xff0c;JIST 会议的历史要…

LeetCode 1010. 总持续时间可被 60 整除的歌曲(哈希)

1. 题目 在歌曲列表中&#xff0c;第 i 首歌曲的持续时间为 time[i] 秒。 返回其总持续时间&#xff08;以秒为单位&#xff09;可被 60 整除的歌曲对的数量。形式上&#xff0c;我们希望索引的数字 i < j 且有 (time[i] time[j]) % 60 0。 示例 1&#xff1a; 输入&am…

pip加速+百度镜像|清华镜像

针对pip install 安装包下载慢的问题&#xff0c;主要pip install直接安装是从国外拉取安装包。 解决办法&#xff0c;加国内镜像&#xff0c;比如百度 https://mirror.baidu.com/pypi/simple pip install lac -i https://mirror.baidu.com/pypi/simple比如清华镜像&#xf…