既然RACScheduler是对GCD的封装,那么在GCD的上层可以实现一些GCD所无法完成的“特性”。这里的“特性”是打引号的,因为底层是GCD,上层的特性只能通过一些特殊手段来实现看似是新的特性。在这一点上,RACScheduler就实现了GCD没有的特性——“取消”任务。
在调用performAsCurrentScheduler:之前,加了一个判断,判断当前是否取消了任务,如果取消了任务,就return,不会调用block闭包。这样就实现了取消任务的“假象”。
在整个ReactiveCocoa中,利用RACScheduler实现了很多操作,和RAC是深度整合的。这里就来总结总结ReactiveCocoa中总共有哪些地方用到了RACScheduler。
在ReactiveCocoa 中全局搜索RACScheduler,遍历完所有库,RACScheduler就用在以下10个类中。下面就来看看是如何用在这些地方的。
从下面这些地方使用了Scheduler中,我们就可以了解到哪些操作是在子线程,哪些是在主线程。区分出了这些,对于线程不安全的操作,我们就能心有成足的处理好它们,让它们回到主线程中去操作,这样就可以减少很多莫名的Crash。这些Crash都是因为线程问题导致的。
1. 在RACCommand中
id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock
这个方法十分复杂,里面用到了RACScheduler.immediateScheduler,deliverOn:RACScheduler.mainThreadScheduler。具体的源码分析会在下一篇RACCommand源码分析里面详细分析。
- (RACSignal *)execute:(id)input
在这个方法中,会调用subscribeOn:RACScheduler.mainThreadScheduler。
2. 在RACDynamicSignal中
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
在RACDynamicSignal的subscribe:订阅过程中会用到subscriptionScheduler。于是对这个scheduler调用schedule:就会执行下面这段代码:
NSCParameterAssert
(block != NULL);
self.backgroundScheduler schedule:block];
block();
如果currentScheduler不为空,闭包会在currentScheduler中执行,如果currentScheduler为空,闭包就会在backgroundScheduler中执行,这是一个Global Dispatch Queue,优先级是RACSchedulerPriorityDefault。
同理,在RACEmptySignal,RACErrorSignal,RACReturnSignal,RACSignal的相关的signal的订阅中也都会调用subscriptionScheduler。
3. 在RACBehaviorSubject中
id
<RACSubscriber>)subscriber {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
self) {
[subscriber sendNext:self.currentValue];
}
}];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[schedulingDisposable dispose];
}];
}
在RACBehaviorSubject的subscribe:订阅过程中会用到subscriptionScheduler。于是对这个scheduler调用schedule:,代码在上面分析过了。
同理,如果currentScheduler不为空,闭包会在currentScheduler中执行,如果currentScheduler为空,闭包就会在backgroundScheduler中执行,这是一个Global Dispatch Queue,优先级是RACSchedulerPriorityDefault。
4. 在RACReplaySubject中
它的订阅也同上面信号的订阅一样,会调用subscriptionScheduler。
由于RACReplaySubject是在子线程上,所以建议在使用Core Data这些不安全库的时候一定要记得加上deliverOn。
5. 在RACSequence中
在RACSequence中,以下两个方法用到了RACScheduler:
- (RACSignal *)signal {
return [[self signalWithScheduler:[RACScheduler scheduler]] setNameWithFormat:@"[%@] -signal",0)">self.name];
}
- (RACSignal *)signalWithScheduler:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block RACSequence *sequence = self;
return [scheduler scheduleRecursiveBlock:^(void (^reschedule)(void)) {
if (sequence.head == nil) {
[subscriber sendCompleted];
return;
}
[subscriber sendNext:sequence.head];
sequence = sequence.tail;
reschedule();
}];
}] setNameWithFormat:@"[%@] -signalWithScheduler: %@",0)">self.name,scheduler];
}
上面两个方法会调用RACScheduler中的scheduleRecursiveBlock:方法。关于这个方法的源码分析可以看RACSequence的源码分析。
6. 在RACSignal+Operations中
这里总共有9个方法用到了Scheduler。
第一个方法:
static RACDisposable *subscribeForever (RACSignal *signal,0)">void (^next)(id),0)">void (^error)(NSError *,RACDisposable *),0)">void (^completed)(RACDisposable *))
在上面这个方法里面用到了
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
取出currentScheduler或者一个Global Dispatch Queue,然后调用scheduleRecursiveBlock:。
第二个方法:
- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate
在上面这个方法中会调用
RACScheduler *scheduler = [RACScheduler scheduler];
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler
在delayScheduler中调用afterDelay: schedule:方法,这也是throttle:valuesPassingTest:方法实现的很重要的一步。
第三个方法:
- (RACSignal *)delay:(NSTimeInterval)interval
由于这是一个延迟方法,肯定是会调用Scheduler的after方法。
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
RACScheduler.currentScheduler ?: scheduler 这个判断在上述几个时间相关的方法都用到了。
所以,这里给一个建议:
delay由于不一定会回到当前线程中,所以delay之后再去订阅可能就在子线程中去执行。所以使用delay的时候最好追加一个deliverOn。
第四个方法:
- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler
在这个方法中理所当然的需要调用[scheduler afterDelay:interval schedule:flushValues]这个方法,来达到延迟的目的,从而实现缓冲buffer的效果。
第五个方法:
+ (RACSignal *)interval:( 第六个方法:
NSTimeInterval
)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway { }
第五个方法 和 第六个方法都用传进去的入参scheduler去调用after: repeatingEvery: withLeeway: schedule:方法。
第七个方法:
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler { }
在这个方法中会用入参scheduler调用afterDelay: schedule:,延迟一段时候后,执行[disposable dispose],从而也实现了超时发送sendError:。
第八个方法:
- (RACSignal *)deliverOn:(RACScheduler *)scheduler { }
第九个方法:
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler { }
第八个方法 和 第九个方法都是根据入参scheduler去调用schedule:方法。入参是什么类型的scheduler决定了schedule:执行在哪个queue上。
7. 在RACSignal中
在RACSignal也有积极计算和惰性求值的信号。
+ (RACSignal *)startEagerlyWithScheduler:(RACScheduler *)scheduler block:(id<RACSubscriber> subscriber))block {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(block != NULL);
RACSignal *signal = [self startLazilyWithScheduler:scheduler block:block];
[[signal publish] connect];
return [signal setNameWithFormat:@"+startEagerlyWithScheduler: %@ block:",47); font-size:16px"> startEagerlyWithScheduler中会调用startLazilyWithScheduler产生一个信号signal,然后紧接着转换成热信号。通过startEagerlyWithScheduler产生的信号就直接是一个热信号。
+ (RACSignal *)startLazilyWithScheduler:(RACScheduler *)scheduler block:(NSCParameterAssert(block != NULL);
RACMulticastConnection *connection = [[RACSignal
createSignal:^ id (id<RACSubscriber> subscriber) {
block(subscriber);
return nil;
}]
multicast:[RACReplaySubject subject]];
return [[[RACSignal
createSignal:^ id<RACSubscriber> subscriber) {
[connection.signal subscribe:subscriber];
[connection connect];
return nil;
}]
subscribeOn:scheduler]
setNameWithFormat:@"+startLazilyWithScheduler: %@ block:",47); font-size:16px"> 上述是startLazilyWithScheduler:的源码实现,在这个方法中和startEagerlyWithScheduler最大的区别就出来了,connect方法在return的信号中,所以Lazily就体现在,通过startLazilyWithScheduler建立出来的信号,只有订阅它之后才能调用到connect,转变成热信号。
在这里调用了subscribeOn:scheduler,这里用到了scheduler。
8. 在NSData+RACSupport中
+ (RACSignal *)rac_readContentsOfURL:(NSURL *)URL options:(NSDataReadingOptions)options scheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
RACReplaySubject *subject = [RACReplaySubject subject];
[subject setNameWithFormat:@"+rac_readContentsOfURL: %@ options: %lu scheduler: %@",URL,(unsigned long)options,scheduler];
[scheduler schedule:^{
NSError *error = nil;
NSData *data = [[NSData alloc] initWithContentsOfURL:URL options:options error:&error];
if (data == nil) {
[subject sendError:error];
} else {
[subject sendNext:data];
[subject sendCompleted];
}
}];
return subject;
}
在这个方法中,会传入RACQueueScheduler或者RACTargetQueueScheduler的RACScheduler。那么调用schedule方法就会执行到这里:
return disposable;
}
9. 在NSString+RACSupport中
NSURL *)URL usedEncoding:(NSStringEncoding *)encoding scheduler:(RACScheduler *)scheduler {
@"+rac_readContentsOfURL: %@ usedEncoding:scheduler: %@",210)">NSString *string = [NSString stringWithContentsOfURL:URL usedEncoding:encoding error:&error];
if (string == nil) {
[subject sendError:error];
} else {
[subject sendNext:string];
[subject sendCompleted];
}
}];
同NSData+RACSupport中的rac_readContentsOfURL: options: scheduler:一样,也会传入RACQueueScheduler或者RACTargetQueueScheduler的RACScheduler。
10. 在NSUserDefaults+RACSupport中
RACScheduler *scheduler = [RACScheduler scheduler];
在这个方法中也会新建RACTargetQueueScheduler,一个Global Dispatch Queue。优先级是RACSchedulerPriorityDefault。
最后
关于RACScheduler底层实现分析都已经分析完成。最后请大家多多指教。
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!