之前的文章已经介绍过Publisher和Subscriber,对于概念类的东西这里就不多介绍了,在介绍Publisher和Subscriber的交互流程之前,先补充一下前面没有提到过的Subscription。
Subscription
Subscription是一个协议,实现该协议的对象负责将订阅者链接到发布者。只要它在内存中,订阅者就会继续接收值。它只包含一个方法:
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {/// Tells a publisher that it may send more values to the subscriber.func request(_ demand: Subscribers.Demand)
}
当订阅者在Publisher中接收到subscription对象后,便开始调用request方法,demand参数决定了订阅者要从发布者那里获取多少个值。
demand参数有几个可选的参数值:
none:表示订阅者一个值都不会收到。max(value): 表示订阅者要接收value个值。unlimited:表示订阅者要接受无限个值。
Subscription的实例对象中包含了一个Subscriber的引用,以使其保持最新状态。
Subscription协议有继承了Cancellable协议,所以有了cancel方法,而在自定义Subscription的时候,request和cancel方法都是必须实现的。
Publisher和Subscriber的交互流程
介绍完了Subscription协议,现在看看Publisher和Subscriber是如何建立的联系。
- 由
Publisher调用subscribe(_:)方法开启链接申请,同时参数传入Subscriber实例对象。 - 在第一步调用
subscribe(_:)方法后,即触发Publisher内部调用receive(subscriber:)方法,在该方法中创建一个连接Publisher和Subscriber的Subscription对象,然后调用Subscriber的receive(subscription:)方法,将Subscription对象传给Subscriber。 - 在
Subscriber的receive(subscription:)方法中,使用传进来的subscription对象调用request方法,并设置Subscriber的请求次数。 - 在
Subscription的request方法中,知道了Subscriber的请求次数,经过相关的逻辑处理后,在此方法中给Subscriber发送数据。 - 通过
Subscriber的receive(_:)方法向Subscriber发送数据。 - 通过
Subscriber的receive(completion:)方法向Subscriber发送结束或者失败信息。
因为Subscription是起了一个桥梁的作用,属于幕后,所以上面第5条、第6条从语义上来说相当于Publisher通过receive(_:)方法或receive(completion:)方法向Subscriber发送数据或者结束信息。实际上Subscription替Publisher做了向下游发送数据的事情。
自定义Subscriber
首先看一下Subscriber协议的定义:
public protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible {associatedtype Inputassociatedtype Failure : Errorfunc receive(subscription: any Subscription)func receive(_ input: Self.Input) -> Subscribers.Demandfunc receive(completion: Subscribers.Completion<Self.Failure>)
}
协议中有两个类型,三个方法。自定义的Subscriber需要使用class定义,而非struct,否则会报错,另外struct是值类型,Subscription没有持有最初的那个Subscriber对象。
// 自定义Subscriber
class CustomSubscriber: Subscriber {// 确定输入类型,需要和Publisher的输出类型一致。typealias Input = Int// 确定失败类型,需要和Publisher的失败类型一致,永远不会失败就定义为Never。typealias Failure = Never/** 交互流程中第3步* 接收subscription对象的方法。* 方法内subscription对象调用request方法,设置请求次数。*/func receive(subscription: any Subscription) {debugPrint("CustomSubscriber subscription.request")subscription.request(.max(5))}/** 交互流程中第5步* 接收Publisher发送数据的方法。* 该方法返回`Subscribers.Demand`,用于在request方法中计算请求次数。*/func receive(_ input: Int) -> Subscribers.Demand {print("New value \(input)")return .none}/** 交互流程中第6步* 接收Publisher发送结束的方法,或者正常结束,或者失败。*/func receive(completion: Subscribers.Completion<Never>) {print("Completion: \(completion)")}
}
自定义Publisher
在自定义Publisher前,再看一下Publisher协议的定义:
public protocol Publisher<Output, Failure> {associatedtype Outputassociatedtype Failure : Errorfunc receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
自定义的Publisher需要继承这个协议,比如:
// 自定义Publisher
class CustomPublisher: Publisher {// 确定输出类型,需要和Subscriber的输入类型一致。typealias Output = Int// 确定失败类型,需要和Subscriber的失败类型一致,永远不会失败就定义为Never。typealias Failure = Never/** 交互流程中第2步* 接收subscriber对象的方法。方法传入Subscriber实例对象,开始建立联系。* 方法内创建Subscription对象,然后调用Subscriber的receive(subscription:)方法,将Subscription对象传给Subscriber。*/func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {// 创建Subscription对象let subscription = CustomSubscription(subscriber: subscriber)debugPrint("CustomPublisher subscriber.receive")// 将Subscription对象传给Subscribersubscriber.receive(subscription: subscription)}
}
自定义Subscription
先看一下Subscription协议:
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {/// Tells a publisher that it may send more values to the subscriber.func request(_ demand: Subscribers.Demand)
}
该协议中规定了要实现request方法,因为继承了Cancellable,所以还需要实现一个cancel方法。
public protocol Cancellable {func cancel()
}
同Subscriber一样,自定义的Subscription需要使用class定义,而非struct,否则会报错,另外创建的Subscription实例对象需要在内存中保持,否则订阅就失效了。
下面是自定义Subscription:
// 自定义Subscription
class CustomSubscription<S: Subscriber>: Subscription where S.Input == Int, S.Failure == Never {// 持有传入进来的Subscriber对象。private var subscriber: Sprivate var counter = 0private var isCompleted = false// 初始化的时候将Subscriber对象传入进来,并持有,待后续发送数据使用。init(subscriber: S) {self.subscriber = subscriber}/** 交互流程中第4步* 该方法传入请求数据的次数,并给Subscriber发送数据。*/func request(_ demand: Subscribers.Demand) {debugPrint("CustomSubscription request")guard !isCompleted else { return }for _ in 0..<(demand.max ?? 10) {_ = subscriber.receive(counter) // 给Subscriber发送数据counter += 1}if counter >= 5 {subscriber.receive(completion: .finished) // 通知Subscriber结束。isCompleted = true}}// 该方法中执行一些取消订阅的操作。func cancel() {isCompleted = true}
}
如何使用
定义完了上面的,现在看看怎么使用吧。还是依托SwiftUI的界面,我们在对应的ViewModel中添加方法,使用上面自定义的类。
首先定义一个ViewModel。
class CustomCombineViewModel: ObservableObject {var subscription: AnyCancellable?func testMethod1() {// 创建自定义的Publisherlet publisher = CustomPublisher()// 创建自定义的Subscriberlet subscriber = CustomSubscriber()debugPrint("Begin subscribe")/** 交互流程中第1步,申请订阅。* 由Publisher对象调用subscribe方法,传入Subscriber对象开始。*/publisher.subscribe(subscriber)}func testMethod2() {// 创建自定义的Publisherlet publisher = CustomPublisher()// 通过sink方法申请订阅,并将创建的subscription持有,否则订阅失败,sink方法返回的时AnyCancellable,这里做了类型抹除。subscription = publisher.sink { completion inprint("sink completion: \(completion)")} receiveValue: { value inprint("sink new value \(value)")}}
}
在上面代码中的testMethod1方法中,分别创建了Publisher和Subscriber,并用Publisher对象调用subscribe方法开启订阅,这也是订阅的开启入口。
当执行testMethod1时候,输出打印:
"Begin subscribe"
"CustomPublisher subscriber.receive"
"CustomSubscriber subscription.request"
"CustomSubscription request"
New value 0
New value 1
New value 2
New value 3
New value 4
Completion: finished
上面的输出也反应了从开始订阅到发送数据结束的过程。打印了5个数据是应为我们在Subscriber类中调用request方法的时候传入了.max(5),最多发送5个数据。
再看一下第二个方法testMethod2(),这个方法中没有明确的Publisher调用subscribe方法呢?
Subscribers有两个内置的Subscriber,分别为Subscribers.Sink和Subscribers.Assign。当调用sink或者assign方法的时候,就开启了订阅流程。
当执行testMethod2时候,输出打印:
"CustomPublisher subscriber.receive"
"CustomSubscription request"
sink new value 0
sink new value 1
sink new value 2
sink new value 3
sink new value 4
sink new value 5
sink new value 6
sink new value 7
sink new value 8
sink new value 9
sink completion: finished
因为sink请求的是无限次数数据,所以将我们在Subscription中的数据都打印出来了。
Subscribers.Sink
Sink 创建的时候会立即调用 Subscription 对象的 request(.unlimited)。
Publisher 有两个 sink 扩展方法:
sink(receiveCompletion:receiveValue:)sink(receiveValue:)
Subscribers.Assign
Assign 会将接收到的值赋值给一个类对象的属性或者一个另一个 @Published publisher 上,它对 publisher 的 demand 也是 .unlimited。
Publisher 有两个 assign 扩展方法:
assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root)assign(to published: inout Published<Self.Output>.Publisher)
写在最后
现在我们完全理解了Combine订阅交互流程,是不是对Combine框架有了进一步的认识呢?
在实际开发过程中,不建议我们自己去实现Publisher,Subscriber和Subscription,因为一个逻辑错误可能会破坏发布者和订阅者之间的所有连接,这可能会导致意想不到的结果。
最后,希望能够帮助到有需要的朋友,如果觉得有帮助,还望点个赞,添加个关注,笔者也会不断地努力,写出更多更好用的文章。