ReactiveCocoa2 源码浅析
ReactiveCocoa2 源码浅析标签(空格分隔): ReactiveCocoa iOS Objective-C (鄙视下CSDN的代码样式,这里的看起来好点,可惜对目录支持得不好) ? 开车不需要知道离合器是怎么工作的,但如果知道离合器原理,那么车子可以开得更平稳。 ReactiveCocoa 是一个重型的 FRP 框架,内容十分丰富,它使用了大量内建的 block,这使得其有强大的功能的同时,内部源码也比较复杂。本文研究的版本是2.4.4,小版本间的差别不是太大,无需担心此问题。 这里只探究其核心 @protocol RACSubscriber 信号是一个异步数据流,即一个将要发生的以时间为序的事件序列,它能发射出三种不同的东西: // 用于从 RACSignal 中直接接收 values 的对象
@protocol RACSubscriber <NSObject>
@required
/// 发送下一个 value 给 subscribers。value 可以为 nil。
- (void)sendNext:(id)value;
/// 发送 error 给 subscribers。 error 可以为 nil。
///
/// 这会终结整个订阅行为,而且接下来也无法再订阅任何信号了。
- (void)sendError:(NSError *)error;
/// 发送 completed 给 subscribers。
///
/// 这会终结整个订阅行为,而且接下来也无法再订阅任何信号了。
- (void)sendCompleted;
/// 现在重要的是上面三个,先别管这个,忽略掉。
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end
1、NLSubscriber咱们自己来实现这个协议看看(本文自定义的类都以 “NL” 开头,以视区别): // NLSubscriber.h
@interface NLSubscriber : NSObject <RACSubscriber>
@end
// NLSubscriber.m
@implementation NLSubscriber
- (void)sendNext:(id)value {
NSLog(@"%s value:%@",sel_getName(_cmd),value);
}
- (void)sendCompleted {
NSLog(@"%s",sel_getName(_cmd));
}
- (void)sendError:(NSError *)error {
NSLog(@"%s error:%@",error);
}
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
// to nothing
}
@end
现在咱们这个类只关心 @interface RACSignal (Subscription)
/* * `subscriber` 订阅 receiver 的变化。由 receiver 决定怎么给 subscriber 发送事件。 *简单来说,就是由这个被订阅的信号来给订阅者 subscriber 发送 `sendNext:` 等消息。 */
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber;
@end
用定时器信号来试试看: /** * @brief 创建一个定时器信号,每三秒发出一个当时日期值。一共发5次。 */
RACSignal *signalInterval = [RACSignal interval:3.0 onScheduler:[RACScheduler mainThreadScheduler]];
signalInterval = [signalInterval take:5];
NLSubscriber *subscriber = [[NLSubscriber alloc] init];
/** * @brief 用订阅者 subscriber 订阅定时器信号 */
[signalInterval subscribe:subscriber];
下面是输出结果: 2015-08-15 17:45:02.612 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:02 +0000
2015-08-15 17:45:05.612 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:05 +0000
2015-08-15 17:45:08.615 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:08 +0000
2015-08-15 17:45:11.613 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:11 +0000
2015-08-15 17:45:14.615 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:14 +0000
2015-08-15 17:45:14.615 RACPraiseDemo[738:59818] sendCompleted
2、改进NLSubscriber 现在的这个订阅者类 NLSubscriber 除了打印打东西外,啥也干不了,更别说复用了,如果针对所有的信号都写一个订阅者那也太痛苦了,甚至是不太可能的事。 第2点的要求可不少,那怎么才能做到这一点呢?还好,OC 中有 block !咱们可以将 - (void)sendNext:(id)value; ----> void (^next)(id value); - (void)sendError:(NSError *)error; ----> void (^error)(NSError *error); - (void)sendCompleted; ----> void (^completed)(void);
改进目标和改进方向都有了,那咱们来看看改进后的的样子: // 头文件
/** * @brief 基于 block 的订阅者 */
@interface NLSubscriber : NSObject <RACSubscriber>
/** * @brief 创建实例 */
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed;
@end
// 实现文件
@interface NLSubscriber ()
@property (nonatomic,copy) void (^next)(id value);
@property (nonatomic,copy) void (^error)(NSError *error);
@property (nonatomic,copy) void (^completed)(void);
@end
@implementation NLSubscriber
#pragma mark Lifecycle
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
NLSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
#pragma mark RACSubscriber
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
if (completedBlock == nil) return;
completedBlock();
}
}
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
// to nothing
}
@end
现在来试试看这个改进版,还是上面那个定时器的例子: /** * @brief 创建一个定时器信号,每三秒发出一个当时日期值。一共发5次。 */
RACSignal *signalInterval = [RACSignal interval:3.0 onScheduler:[RACScheduler mainThreadScheduler]];
signalInterval = [signalInterval take:5];
NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:^(id x) {
NSLog(@"next:%@",x);
} error:nil completed:^{
NSLog(@"completed");
}];
/** * @brief 用订阅者 subscriber 订阅定时器信号 */
[signalInterval subscribe:subscriber];
输出结果如下: 2015-08-15 19:50:43.355 RACPraiseDemo[870:116551] next:2015-08-15 11:50:43 +0000
2015-08-15 19:50:46.358 RACPraiseDemo[870:116551] next:2015-08-15 11:50:46 +0000
2015-08-15 19:50:49.355 RACPraiseDemo[870:116551] next:2015-08-15 11:50:49 +0000
2015-08-15 19:50:52.356 RACPraiseDemo[870:116551] next:2015-08-15 11:50:52 +0000
2015-08-15 19:50:55.356 RACPraiseDemo[870:116551] next:2015-08-15 11:50:55 +0000
2015-08-15 19:50:55.356 RACPraiseDemo[870:116551] completed
输出结果没什么变化,但是订阅者的行为终于受到咱们的撑控了。再也不用为了一个信号而去实现 3、RACSignal 类别之 Subscription 有没有可能把 // .h
#import "RACSignal.h"
@interface RACSignal (nl_Subscription)
- (void)nl_subscribeNext:(void (^)(id x))nextBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock;
- (void)nl_subscribeCompleted:(void (^)(void))completedBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock;
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;
@end
// .m
#import "RACSignal+nl_Subscription.h"
#import "NLSubscriber.h"
@implementation RACSignal (nl_Subscription)
- (void)nl_subscribeNext:(void (^)(id x))nextBlock {
[self nl_subscribeNext:nextBlock error:nil completed:nil];
}
- (void)nl_subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock {
[self nl_subscribeNext:nextBlock error:nil completed:completedBlock];
}
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock {
[self nl_subscribeNext:nil error:errorBlock completed:nil];
}
- (void)nl_subscribeCompleted:(void (^)(void))completedBlock {
[self nl_subscribeNext:nil error:nil completed:completedBlock];
}
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock {
[self nl_subscribeNext:nextBlock error:errorBlock completed:nil];
}
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
[self nl_subscribeNext:nil error:errorBlock completed:completedBlock];
}
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
[self subscribe:subscriber];
}
@end
在这个类别中,将信号的 /** * @brief 创建一个自定义的信号。 * 这个信号在被订阅时,会发送一个当前的日期值; * 再过三秒后,再次发送此时的日期值; * 最后,再发送完成事件。 */
RACSignal *signalInterval = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:[NSDate date]];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW,(int64_t)(3.0 * NSEC_PER_SEC)),dispatch_get_main_queue(),^{
[subscriber sendNext:[NSDate date]];
[subscriber sendCompleted];
});
return (id)nil;
}];
[signalInterval nl_subscribeNext:^(id x) {
NSLog(@"next:%@",x);
} error:^(NSError *error) {
NSLog(@"error:%@",error);
} completed:^{
NSLog(@"completed");
}];
输出如下: 2015-08-16 23:29:44.406 RACPraiseDemo[653:32675] next:2015-08-16 15:29:44 +0000
2015-08-16 23:29:47.701 RACPraiseDemo[653:32675] next:2015-08-16 15:29:47 +0000
2015-08-16 23:29:47.701 RACPraiseDemo[653:32675] completed
本例并没有采用之前的 “定时器信号”,而是自己创建的信号,当有订阅者到来时,由这个信号来决定在什么时候发送什么事件。这个例子里发送的事件的逻辑请看代码里的注释。 4、DisposableRACDisposable 针对上述两个问题,RACDisposable 应运而生。也就是说 Disposable 有两个作用: 订阅者与 Disposable 的关系: 2. 当订阅者 subscriberX 有接收来自任何一个信号的 “error” 或 “completed” 事件时,则不会再接收任何事件了。 根据 Disposable 的作用和与订阅者的关系,来总结它所需要提供的接口: 咱们为这个 Disposable 也整了一个类,如下: // .h file
/** * @brief 一个 disposable 封装了用于拆除、清理订阅的任务工作 */
@interface NLDisposable : NSObject
/** * @brief 这个 disposable 是否已经拆除过 */
@property (atomic,assign,getter = isDisposed,readonly) BOOL disposed;
+ (instancetype)disposableWithBlock:(void (^)(void))block;
/** * @brief 执行拆除工作。能多次调用这个消息,只有第一次调用有效。 */
- (void)dispose;
@end
// .m file
@interface NLDisposable () {
/** * @brief 类型类似于:@property (copy) void (^disposeBlock)(void); * 在 disposal 要执行的任务逻辑。 * 1、如果没有初妈值的话,则初始值默认为 `self` * 2、当已经 disposal 过后,值为 NULL。 */
void * volatile _disposeBlock;
}
@end
@implementation NLDisposable
#pragma mark Properties
- (BOOL)isDisposed {
return _disposeBlock == NULL;
}
#pragma mark Lifecycle
- (id)init {
self = [super init];
if (self == nil) return nil;
_disposeBlock = (__bridge void *)self;
OSMemoryBarrier();
return self;
}
- (id)initWithBlock:(void (^)(void))block {
NSCParameterAssert(block != nil);
self = [super init];
if (self == nil) return nil;
_disposeBlock = (void *)CFBridgingRetain([block copy]);
OSMemoryBarrier();
return self;
}
+ (instancetype)disposableWithBlock:(void (^)(void))block {
return [[self alloc] initWithBlock:block];
}
- (void)dealloc {
if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;
CFRelease(_disposeBlock);
_disposeBlock = NULL;
}
#pragma mark Disposal
- (void)dispose {
/** * @brief 这里为了让逻辑更清晰,去掉了原子性操作。具体代码请看 RACDisposable */
if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;
void (^disposeBlock)(void) = CFBridgingRelease((void *)_disposeBlock);
_disposeBlock = NULL;
disposeBlock();
}
@end
RACCompoundDisposable 那怎么组织这“多个”的关系呢?数组?Good,就是数组。OK,咱们来相像一下这个方案的初步代码。每个订阅者有一个 Disposable 数组,订阅一个一个信号,则加入一个 Disposable;当手动拆除一个订阅关系时,找到与之相关的 Disposable,发送 dispose 消息,将其从数组中移除;当订阅者不能再接收消息时(接收过
使用组合结构,我们能把相同的操作应用在组合和个别对象上。换句话说,在大多数情况下,我们可以 忽略 对象组合和个别对象之间的差别。 // .h file
#import "NLDisposable.h"
/** * @brief A disposable of disposables。当它 dispose 时,它会 dispose * 它所包含的所有的 disposables。 * * 如果本 compound disposable 已经 dispose 过后,再来调用 -addDisposable:, * 那么其参数 disposable 会立马调用 dispose 方法。 * * 本类中的方法说明请查看 RACCompoundDisposable 中的同名方法。 * 本类与真正的类 RACCompoundDisposable 代码差别较大,但本质是一样的 */
@interface NLCompoundDisposable : NLDisposable
/** * @brief 创建并返回一个新的 compound disposable。 */
+ (instancetype)compoundDisposable;
/** * @brief 创建并返回一个新的包含了 disposables 的 compound disposable。 * * @param disposables disposable 数组 * * @return 一个新的 compound disposable */
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;
/** * @brief 将 disposable 加到本 compound disposable 中。如果本 compound disposable * 已经 dispose 的话,那么参数 disposable 会被立即 dispose。 * * 本方法是线程安全的。 * * @param disposable 要被加入的 disposable。如果它为 nil 的话,那么什么也不会发生。 */
- (void)addDisposable:(NLDisposable *)disposable;
/** * @brief 从本 compound disposable 移除指定的 disposable(不管这个 disposable 是什么 * 状态);如果这个 disposable 不在本 compound disposable 中,则什么也不会发生。 * * 本方法是线程安全的。 * * @param disposable 要被移除的 disposable。可以为 nil。 */
- (void)removeDisposable:(NLDisposable *)disposable;
@end
// .m file
#import "NLCompoundDisposable.h"
#import <libkern/OSAtomic.h>
@interface NLCompoundDisposable () {
/** * @brief 同步锁 */
OSSpinLock _spinLock;
/** * @brief 本 compound disposable 所包含的 disposables。 * * 在操作这个数组时,应该使用 _spinLock 进行同步。如果 * `_disposed` 为 YES,则这个数组可能为 nil。 */
NSMutableArray *_disposables;
/** * @brief 本 compound disposable 是否已经 dispose 。 * * 在操作这个变量时,应该使用 _spinLock 进行同步。 */
BOOL _disposed;
}
@end
@implementation NLCompoundDisposable
#pragma mark Properties
- (BOOL)isDisposed {
OSSpinLockLock(&_spinLock);
BOOL disposed = _disposed;
OSSpinLockUnlock(&_spinLock);
return disposed;
}
#pragma mark Lifecycle
+ (instancetype)compoundDisposable {
return [[self alloc] initWithDisposables:nil];
}
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables {
return [[self alloc] initWithDisposables:disposables];
}
- (id)initWithDisposables:(NSArray *)otherDisposables {
self = [self init];
if (self == nil) return nil;
if ([otherDisposables count]) {
_disposables = [NSMutableArray arrayWithArray:otherDisposables];
}
return self;
}
- (id)initWithBlock:(void (^)(void))block {
NLDisposable *disposable = [NLDisposable disposableWithBlock:block];
return [self initWithDisposables:@[ disposable ]];
}
- (void)dealloc {
_disposables = nil;
}
#pragma mark Addition and Removal
- (void)addDisposable:(NLDisposable *)disposable {
NSCParameterAssert(disposable != self);
if (disposable == nil || disposable.disposed) return;
BOOL shouldDispose = NO;
OSSpinLockLock(&_spinLock);
{
if (_disposed) {
shouldDispose = YES;
} else {
if (_disposables == nil) {
_disposables = [NSMutableArray array];
}
[_disposables addObject:disposable];
}
}
OSSpinLockUnlock(&_spinLock);
if (shouldDispose) {
[disposable dispose];
}
}
- (void)removeDisposable:(NLDisposable *)disposable {
if (disposable == nil) return;
OSSpinLockLock(&_spinLock);
{
if (!_disposed) {
if (_disposables != nil) {
[_disposables removeObject:disposable];
}
}
}
OSSpinLockUnlock(&_spinLock);
}
#pragma mark RACDisposable
- (void)dispose {
NSArray *remainingDisposables = nil;
OSSpinLockLock(&_spinLock);
{
_disposed = YES;
remainingDisposables = _disposables;
_disposables = nil;
}
OSSpinLockUnlock(&_spinLock);
if (remainingDisposables == nil) return;
[remainingDisposables makeObjectsPerformSelector:@selector(dispose)];
}
@end
RACScheduler 简介本文不打算研究 RACScheduler 源码,但其又是 RAC 中不可或缺的一个组件,在研究 RACSignal 的源码时不可避免地会遇到它,所以对其作下介绍还是有必要的。其实它的源码并不复杂,可自行研究。 ReactiveCocoa 中 RACSignal 发送的所有事件的传递交给了一个特殊的框架组件——调度器,即 RACScheduler 类簇(类簇模式稍后介绍)。调度器是为了简化 同步/异步/延迟 事件传递 以及 取消预定的任务(scheduded actions) 这两种 RAC 中常见的动作而提出来的。“事件传递” 简单而言就是些 blocks,RACScheduler 所做的就是:调度这些 blocks (schedule blokcs,还是英文的意思准确些)。我们可以通过那些调度方法所返回的 RACImmediateScheduler 这是 RAC 内部使用的私有调度器,只支持同步 scheduling。就是简单的马上执行 block。这个调试器的延迟 scheduling 是通过调用 RACQueueScheduler这个调度器使用 GCD 队列来 scheduling blocks。如果你对 GCD 有所了解的话,你会发现这个调度器的功能很简单,它只是在 GCD 队列 dispatching blocks 上的简单封装罢了。 RACSubscriptionScheduler这是另一个内部使用的私有调度器。如果当前线程有调度器(调度器可以与线程相关联起来:associated)那它就将 scheduling 转发给这个线程的调度器;否则就转发给默认的 background queue 调试器。 接口调试器有下面一些方法: - (RACDisposable *)schedule:(void (^)(void))block; - (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block; - (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block; - (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block;
scheduling block 如下: RACDisposable *disposable = [[RACScheduler mainThreadScheduler] afterDelay:5.0 schedule:^{
// do something
}];
// 如果你想要取消 scheduling block
[disposable dispose]; // block scheduling 被取消了,不会再被执行。
5、Subscriber 和 Disposable 前面介绍了 Disposable 的来源,现在来研究下怎么使用它。还记得吗,订阅者与信号打交道的唯一方式是 - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber;
咱们来研究一下自定义信号里的这个方法的实现。这个方法实现的难处在于:“一个订阅者可以订阅多个信号,并可以手动拆除其中任何一个订阅”。针对这个问题,提出了上节讲到的 给订阅者添加一个
咱们可以用装饰模式来解决这个问题
订阅者装饰器 RACPassthroughSubscriber 在订阅者每一次订阅信号时产生一个 // .f file
@class RACCompoundDisposable;
@class RACSignal;
/** * @brief 这是一个订阅者的装饰器。在没有 dispose 时,它会把接收到的所有 * 的事件都转发给真实的订阅者。 */
@interface NLPassthroughSubscriber : NSObject <RACSubscriber>
/** * @brief 初始化方法 * * @param subscriber 被包装的真实的订阅者,本装饰器会把接收到的所有的事件都转发给这个订阅者。不能为 nil * @param signal 要发送事件给这个装饰器的信号。 * @param disposable 当这个 disposable 接收到 dispose 消息后,将不会再转发事件。不能为 nil * * @return 返回一个初始化后的 passthrough subscriber */
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable;
@end
// .m file
@interface NLPassthroughSubscriber ()
// 被转发事件的订阅者。
@property (nonatomic,strong,readonly) id<RACSubscriber> innerSubscriber;
// 要发送事件给本装饰器的信号
@property (nonatomic,unsafe_unretained,readonly) RACSignal *signal;
// 代表当前订阅关系的 disposable。当它 dispose 后,将不会再转发任何事件给 `innerSubscriber`。
@property (nonatomic,readonly) RACCompoundDisposable *disposable;
@end
@implementation NLPassthroughSubscriber
#pragma mark Lifecycle
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
NSCParameterAssert(subscriber != nil);
self = [super init];
if (self == nil) return nil;
_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;
/** * 告诉订阅者:发生了订阅行为。并将这次订阅行为相关的 `Disposable` 传给订阅者。 */
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}
#pragma mark RACSubscriber
- (void)sendNext:(id)value {
/** * 如果 disposable 已经 dispose 过,就不再转发事件 */
if (self.disposable.disposed) return;
/** * 转发 next 事件 */
[self.innerSubscriber sendNext:value];
}
- (void)sendError:(NSError *)error {
if (self.disposable.disposed) return;
[self.innerSubscriber sendError:error];
}
- (void)sendCompleted {
if (self.disposable.disposed) return;
[self.innerSubscriber sendCompleted];
}
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
if (disposable != self.disposable) {
[self.disposable addDisposable:disposable];
}
}
@end
自定义信号 RACDynamicSignal 的订阅方法 subscribe 咱们来看看 - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
/** * 本次订阅相关 disposable。本方法的返回值,起 拆除 本次订阅的作用。 */
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
/** * 订阅者装饰器。 */
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
/** * _didSubscriber 是在 `+ createSignal` 方法中进入的 block 参数。 * + (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe; * 这个 block 以 subscriber 为参数,返回一个 disposable,即 innerDisposable,而这个 innerDisposable * 的作用是在 subscriber 不再订阅本 signal 时,起回收资源的作用。 */
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
可以看到,订阅者装饰器直接伪装成真正的订阅器,传给 didSubscribe 这个 block 使用。在这个 block 中,会有一些事件发送给订阅者装饰器,而这个订阅者装饰器则根据 disposable 的状态来来决定是否转发给真正的订阅者。disposable 作为返回值,返回给外部,也就是说能够从外部来取消这个订阅了。 RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
从这几行代码中,我们可以看到, 假设当前 6、再次改进NLSubscriber1、didSubscribeWithDisposable@protocol RACSubscriber <NSObject>
@required
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end
这个 从 那为啥要 2、NLSubscriber 结合 RACDisposable 这一次改进 #import "NLSubscriber.h"
#import <ReactiveCocoa.h>
@interface NLSubscriber ()
@property (nonatomic,copy) void (^completed)(void);
/** * @brief 代表订阅者本身总体订阅行为的 disposable。 * 当有接收到 error 或 completed 事件时,应该 dispose 这个 disposable。 */
@property (nonatomic,strong,readonly) RACCompoundDisposable *disposable;
@end
@implementation NLSubscriber
#pragma mark Lifecycle
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
NLSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
@unsafeify(self);
/** * 当 _disposable 被发送 dispose 消息时,将 next、error 和 completed 这三个 * block 设置为 nil,从而间实现订阅者无法再接收任何事件的功能。 */
RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
@strongify(self);
@synchronized (self) {
self.next = nil;
self.error = nil;
self.completed = nil;
}
}];
_disposable = [RACCompoundDisposable compoundDisposable];
[_disposable addDisposable:selfDisposable];
return self;
}
- (void)dealloc {
[self.disposable dispose];
}
#pragma mark RACSubscriber
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
if (otherDisposable.disposed) return;
/** * 将 otherDisposable 添加到 selfDisposable 中。这样当 selfDisposable 被 dispose 时, * otherDisposable 也能被 dispose。 * * 这样,当订阅者接收到 error 或 completed 事件时,就能解除这个订阅者自身的所有订阅行为了。 */
RACCompoundDisposable *selfDisposable = self.disposable;
[selfDisposable addDisposable:otherDisposable];
@unsafeify(otherDisposable);
/** * 如果这个订阅行为被解除,就将 otherDisposable 从 selfDisposable 中移除。 * (我们给 otherDisposable 增加了行为,这也就是参数需要是 RACCompoundDisposable * 类型的原因了。当然,其它的订阅者怎么用这个参数就跟其实际的业务相关了。) */
[otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
@strongify(otherDisposable);
[selfDisposable removeDisposable:otherDisposable];
}]];
}
@end
3、改进类别 nl_Subscription 还记得么? // .h file
@interface RACSignal (nl_Subscription)
...
- (RACDisposable *)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;
@end
// .m file
@implementation RACSignal (nl_Subscription)
...
- (RACDisposable *)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:subscriber];
}
RACSignal - Operations 本节主要研究这些操作(Operations) —— 终于看到你想看的东西了?好吧,我承认,上节的东西很无趣,可能压根不是你想看的东西。但如果没弄清上面的内容的话,直接研究 Operations 可是会比较吃力的哟~ 你以为咱们现在开始研究 Operations?哈哈,你又得失望了~ 咱得先看看这两个类: 1、两个 RACSignal 的特殊子类 RACEmptySignal 和 RACReturnSignal1、RACEmptySignal - (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
/** * 只要一订阅,就给 subscriber 发送 completed 事件。 */
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}
这样一个订阅者一订阅就会 2、RACReturnSignal @interface RACReturnSignal ()
// 在本信号被订阅时会发送的值。
@property (nonatomic,readonly) id value;
@end
@implementation RACReturnSignal
#pragma mark Lifecycle
+ (RACSignal *)return:(id)value {
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
return signal;
}
#pragma mark Subscription
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendNext:self.value];
[subscriber sendCompleted];
}];
}
@end
纯吐槽:为啥要叫 那么发送一个 2、concat: 练手 - (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];
serialDisposable.disposable = sourceDisposable;
return serialDisposable;
}] setNameWithFormat:@"[%@] -concat: %@",self.name,signal];
}
基本上,对一个 这里有一个小技巧。因为很多信号的操作是针对该信号本身 RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];
在订阅了 3、zipWith: 再来一个练手的玩意。 - (RACSignal *)zipWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block BOOL selfCompleted = NO;
NSMutableArray *selfValues = [NSMutableArray array];
__block BOOL otherCompleted = NO;
NSMutableArray *otherValues = [NSMutableArray array];
void (^sendCompletedIfNecessary)(void) = ^{
@synchronized (selfValues) {
BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
if (selfEmpty || otherEmpty) [subscriber sendCompleted];
}
};
void (^sendNext)(void) = ^{
@synchronized (selfValues) {
if (selfValues.count == 0) return;
if (otherValues.count == 0) return;
RACTuple *tuple = RACTuplePack(selfValues[0],otherValues[0]);
[selfValues removeObjectAtIndex:0];
[otherValues removeObjectAtIndex:0];
[subscriber sendNext:tuple];
sendCompletedIfNecessary();
}
};
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (selfValues) {
[selfValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
selfCompleted = YES;
sendCompletedIfNecessary();
}
}];
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (selfValues) {
[otherValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
otherCompleted = YES;
sendCompletedIfNecessary();
}
}];
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[otherDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -zipWith: %@",self.name,signal];
}
@end
同样的,重点在 4、bind:1、说明 信号的很多 operations 的实现调用来调用去最后都是调用了这个 这是在 typedef RACStream * (^RACStreamBindBlock)(id value,BOOL *stop);
- (instancetype)bind:(RACStreamBindBlock (^)(void))block;
2、源码解读既然从方法说明了解不到,那直接来看其源码了。 - (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACStreamBindBlock bindingBlock = block();
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
// 三.
void (^completeSignal)(RACSignal *,RACDisposable *) = ^(RACSignal *signal,RACDisposable *finishedDisposable) {
BOOL removeDisposable = NO;
@synchronized (signals) {
[signals removeObject:signal];
if (signals.count == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
removeDisposable = YES;
}
}
if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
};
// 二.
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
@synchronized (signals) {
[signals addObject:signal];
}
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(signal,selfDisposable);
}
}];
selfDisposable.disposable = disposable;
};
@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
// 一.
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;
BOOL stop = NO;
id signal = bindingBlock(x,&stop);
@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self,selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self,selfDisposable);
}
}];
selfDisposable.disposable = bindingDisposable;
}
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:",self.name];
}
我们一步一步来看。先从第 一 步开始,其步骤如下: 第二步: 第三步: 好了,来总结一下这个 那从中可以玩出什么花样呢? 3、示例 咱们先用用它,再看看能怎么玩吧。 示例1:结合
|