在使用ReactiveCocoa 過程當中,Josh Abernathy和Justin Spahr-Summers 兩位大神爲了能讓RAC的使用者更暢快的在沉浸在FRP的世界裏,更好的進行併發編程,因而就對GCD進行了一次封裝,並與RAC的各大組件進行了完美的整合。git
自從有了RACScheduler之後,使整個RAC併發編程的代碼裏面更加和諧統一,更加順手,更加「ReactiveCocoa」。github
RACScheduler在ReactiveCocoa中究竟是幹嗎的呢?處於什麼地位呢?官方給出的定義以下:編程
Schedulers are used to control when and where work is performed複製代碼
RACScheduler在ReactiveCocoa中是用來控制一個任務,什麼時候何地被執行。它主要是用來解決ReactiveCocoa中併發編程的問題的。vim
RACScheduler的實質是對GCD的封裝,底層就是GCD實現的。安全
要分析RACScheduler,先來回顧一下GCD。多線程
衆所周知,在GCD中,Dispatch Queue主要分爲2類,Serial Dispatch Queue 和 Concurrent Dispatch Queue 。其中Serial Dispatch Queue是等待如今執行中處理結束的隊列,Concurrent Dispatch Queue是不等待如今執行中處理結束的隊列。閉包
生成Dispatch Queue的方法也有2種,第一種方式是經過GCD的API生成Dispatch Queue。併發
生成Serial Dispatch Queue異步
dispatch_queue_t serialDispatchQueue = dispatch_queue_create("com.gcd.SerialDispatchQueue", DISPATCH_QUEUE_SERIAL);複製代碼
生成Concurrent Dispatch Queuejsp
dispatch_queue_t concurrentDispatchQueue = dispatch_queue_create("com.gcd.ConcurrentDispatchQueue", DISPATCH_QUEUE_CONCURRENT);複製代碼
第二種方法是直接獲取系統提供的Dispatch Queue。系統提供的也分爲2類,Main Dispatch Queue 和 Global Dispatch Queue。Main Dispatch Queue 對應着是Serial Dispatch Queue,Global Dispatch Queue 對應着是Concurrent Dispatch Queue。
Global Dispatch Queue主要分爲8種。
首先是如下4種,分別是優先級對應Qos的狀況。
- DISPATCH_QUEUE_PRIORITY_HIGH: QOS_CLASS_USER_INITIATED
- DISPATCH_QUEUE_PRIORITY_DEFAULT: QOS_CLASS_DEFAULT
- DISPATCH_QUEUE_PRIORITY_LOW: QOS_CLASS_UTILITY
- DISPATCH_QUEUE_PRIORITY_BACKGROUND: QOS_CLASS_BACKGROUND複製代碼
其次是,是否支持 overcommit。加上上面4個優先級,因此一共8種Global Dispatch Queue。帶有 overcommit 的隊列表示每當有任務提交時,系統都會新開一個線程處理,這樣就不會形成某個線程過載(overcommit)。
回到RACScheduler中來,RACScheduler既然是對GCD的封裝,那麼上述說的這些類型也都有其一一對應的封裝。
typedef enum : long {
RACSchedulerPriorityHigh = DISPATCH_QUEUE_PRIORITY_HIGH,
RACSchedulerPriorityDefault = DISPATCH_QUEUE_PRIORITY_DEFAULT,
RACSchedulerPriorityLow = DISPATCH_QUEUE_PRIORITY_LOW,
RACSchedulerPriorityBackground = DISPATCH_QUEUE_PRIORITY_BACKGROUND,
} RACSchedulerPriority;複製代碼
首先是RACScheduler中的優先級,這裏只封裝了4種,也是分別對應GCD中的DISPATCH_QUEUE_PRIORITY_HIGH,DISPATCH_QUEUE_PRIORITY_DEFAULT,DISPATCH_QUEUE_PRIORITY_LOW,DISPATCH_QUEUE_PRIORITY_BACKGROUND。
RACScheduler有6個類方法,都是用來生成一個queue的。
+ (RACScheduler *)immediateScheduler;
+ (RACScheduler *)mainThreadScheduler;
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name;
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;
+ (RACScheduler *)scheduler;
+ (RACScheduler *)currentScheduler;複製代碼
接下來依次分析一下它們的底層實現。
+ (instancetype)immediateScheduler {
static dispatch_once_t onceToken;
static RACScheduler *immediateScheduler;
dispatch_once(&onceToken, ^{
immediateScheduler = [[RACImmediateScheduler alloc] init];
});
return immediateScheduler;
}複製代碼
immediateScheduler底層實現就是生成了一個RACImmediateScheduler的單例。
RACImmediateScheduler 是繼承自RACScheduler。
@interface RACImmediateScheduler : RACScheduler
@end複製代碼
在RACScheduler中,每一個種類的RACScheduler都會有一個name屬性,名字也算是他們的標示。RACImmediateScheduler的name是@"com.ReactiveCocoa.RACScheduler.immediateScheduler"
RACImmediateScheduler的做用和它的名字同樣,是當即執行閉包裏面的任務。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
block();
return nil;
}
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);
[NSThread sleepUntilDate:date];
block();
return nil;
}複製代碼
在schedule:方法中,直接調用執行入參block( )閉包。在after: schedule:方法中,線程先睡眠,直到date的時刻,再醒過來執行入參block( )閉包。
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
NSCAssert(NO, @"+[RACScheduler immediateScheduler] does not support %@.", NSStringFromSelector(_cmd));
return nil;
}複製代碼
固然RACImmediateScheduler是不可能支持after: repeatingEvery: withLeeway: schedule:方法的。由於它的定義就是當即執行的,不該該repeat。
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
for (__block NSUInteger remaining = 1; remaining > 0; remaining--) {
recursiveBlock(^{
remaining++;
});
}
return nil;
}複製代碼
RACImmediateScheduler的scheduleRecursiveBlock:方法中只要recursiveBlock閉包存在,就會無限遞歸調用執行,除非recursiveBlock不存在了。
mainThreadScheduler也是一個類型是RACTargetQueueScheduler的單例。
+ (instancetype)mainThreadScheduler {
static dispatch_once_t onceToken;
static RACScheduler *mainThreadScheduler;
dispatch_once(&onceToken, ^{
mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
});
return mainThreadScheduler;
}複製代碼
mainThreadScheduler的名字是@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler"。
RACTargetQueueScheduler繼承自RACQueueScheduler
@interface RACTargetQueueScheduler : RACQueueScheduler
- (id)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue;
@end複製代碼
在RACTargetQueueScheduler中,只有一個初始化方法。
- (id)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
NSCParameterAssert(targetQueue != NULL);
if (name == nil) {
name = [NSString stringWithFormat:@"com.ReactiveCocoa.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
}
dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
if (queue == NULL) return nil;
dispatch_set_target_queue(queue, targetQueue);
return [super initWithName:name queue:queue];
}複製代碼
先新建了一個queue,name是@"com.ReactiveCocoa.RACScheduler.mainThreadScheduler",類型是Serial Dispatch Queue 類型的,而後調用了dispatch_set_target_queue方法。
因此重點就在dispatch_set_target_queue方法裏面了。
dispatch_set_target_queue方法主要有兩個目的:一是設置dispatch_queue_create建立隊列的優先級,二是創建隊列的執行階層。
舉個例子:
dispatch_queue_t serialQueue = dispatch_queue_create("serialQueue", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0);
//注意:被設置優先級的隊列是第一個參數。
dispatch_set_target_queue(serialQueue, globalQueue);複製代碼
經過上面的代碼,就把將serailQueue設置成DISPATCH_QUEUE_PRIORITY_HIGH。
舉個例子:
dispatch_queue_t targetQueue = dispatch_queue_create("targetQueue", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue1 = dispatch_queue_create("queue1", DISPATCH_QUEUE_SERIAL);
dispatch_queue_t queue2 = dispatch_queue_create("queue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_set_target_queue(queue1, targetQueue);
dispatch_set_target_queue(queue2, targetQueue);
dispatch_async(queue1, ^{
NSLog(@"queue1 1");
});
dispatch_async(queue1, ^{
NSLog(@"queue1 2");
});
dispatch_async(queue2, ^{
NSLog(@"queue2 1");
});
dispatch_async(queue2, ^{
NSLog(@"queue2 2");
});
dispatch_async(targetQueue, ^{
NSLog(@"target queue");
});複製代碼
若是targetQueue爲Serial Dispatch Queue,那麼輸出結果一定以下:
queue1 1
queue1 2
queue2 1
queue2 2
target queue複製代碼
若是targetQueue爲Concurrent Dispatch Queue,那麼輸出結果可能以下:
queue1 1
queue2 1
queue1 2
target queue
queue2 2複製代碼
回到RACTargetQueueScheduler中來,在這裏傳進來的入參是dispatch_get_main_queue( ),這是一個Serial Dispatch Queue,這裏再調用dispatch_set_target_queue方法,至關於把queue的優先級設置的和main_queue一致。
如下三個方法實質是同一個方法。
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name;
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority;
+ (RACScheduler *)scheduler;複製代碼
+ (instancetype)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0)];
}
+ (instancetype)schedulerWithPriority:(RACSchedulerPriority)priority {
return [self schedulerWithPriority:priority name:@"com.ReactiveCocoa.RACScheduler.backgroundScheduler"];
}
+ (instancetype)scheduler {
return [self schedulerWithPriority:RACSchedulerPriorityDefault];
}複製代碼
經過源碼咱們能知道,scheduler這一系列的三個方法,是建立了一個 Global Dispatch Queue,對應的屬於Concurrent Dispatch Queue。
schedulerWithPriority: name:方法能夠指定線程的優先級和名字。
schedulerWithPriority:方法只能執行優先級,名字爲默認的@"com.ReactiveCocoa.RACScheduler.backgroundScheduler"。
scheduler方法建立出來的queue的優先級是默認的,名字也是默認的@"com.ReactiveCocoa.RACScheduler.backgroundScheduler"。
注意,scheduler和mainThreadScheduler,immediateScheduler這兩個單例不一樣的是,scheduler每次都會建立一個新的Concurrent Dispatch Queue。
+ (instancetype)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}複製代碼
首先,在ReactiveCocoa 中定義了這麼一個key,@"RACSchedulerCurrentSchedulerKey",這個用來從線程字典裏面存取出對應的RACScheduler。
NSString * const RACSchedulerCurrentSchedulerKey = @"RACSchedulerCurrentSchedulerKey";複製代碼
在currentScheduler這個方法裏面看到的是從線程字典裏面取出一個RACScheduler。至於何時存的,下面會解釋到。
若是能從線程字典裏面取出一個RACScheduler,就返回取出的RACScheduler。若是字典裏面沒有,再判斷當前的scheduler是不是在主線程上。
+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}複製代碼
判斷方法如上,只要是NSOperationQueue在mainQueue上,或者NSThread是主線程,都算是在主線程上。
若是是在主線程上,就返回mainThreadScheduler。
若是既不在主線程上,線程字典裏面也找不到對應key值對應的value,那麼就返回nil。
RACScheduler除了有6個類方法,還有4個實例方法:
- (RACDisposable *)schedule:(void (^)(void))block;
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block;
- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block;
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block;複製代碼
這4個方法其實從名字上就知道是用來幹嗎的。
schedule:是爲RACScheduler添加一個任務,入參是一個閉包。
after: schedule:是爲RACScheduler添加一個定時任務,在date時間以後才執行任務。
afterDelay: schedule:是爲RACScheduler添加一個延時執行的任務,延時delay時間以後才執行任務。
after: repeatingEvery: withLeeway: schedule:是爲RACScheduler添加一個定時任務,在date時間以後纔開始執行,而後每隔interval秒執行一次任務。
這四個方法會分別在RACScheduler的各個子類裏面進行重寫。
好比以前提到的immediateScheduler,schedule:方法中會直接當即執行閉包。after: schedule:方法中添加一個定時任務,在date時間以後才執行任務。after: repeatingEvery: withLeeway: schedule:這個方法在RACImmediateScheduler中就直接返回nil。
還有其餘子類在下面會分析這4個方法的實現。
另外還有最後3個方法
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock;
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable
- (void)performAsCurrentScheduler:(void (^)(void))block;複製代碼
前兩個方法是實現RACSequence中signalWithScheduler:方法的,具體分析見這篇文章
performAsCurrentScheduler:方法是在RACQueueScheduler中使用到了,在下面子類分析裏面詳細分析。
RACSequence總共有如下5個子類。
這個類主要是一個測試類,主要用在單元測試中,它是用來驗證異步調用沒有花費大量的時間等待。RACTestScheduler也能夠用在多線程當中,當時一次只能在排隊的方法隊列中選擇一個方法執行。
@interface RACTestSchedulerAction : NSObject
@property (nonatomic, copy, readonly) NSDate *date;
@property (nonatomic, copy, readonly) void (^block)(void);
@property (nonatomic, strong, readonly) RACDisposable *disposable;
- (id)initWithDate:(NSDate *)date block:(void (^)(void))block;
@end複製代碼
在單元測試中,ReactiveCocoa爲了方便比較每一個方法的調用,新建了一個RACTestSchedulerAction對象,用來更加方便的比較和描述測試的全過程。RACTestSchedulerAction的定義如上。如今再來解釋一下參數。
date是一個時間,時間主要是用來比較和決定下一次該輪到哪一個閉包要開始執行了。
void (^block)(void)閉包是RACScheduler中的一個任務。
disposable是控制一個action是否能夠執行的。一旦disposed了,那麼這個action就不會被執行。
initWithDate: block: 方法是初始化一個新的action。
在單元測試過程當中,須要調用step方法來進行查看每次調用閉包的狀況。
- (void)step {
[self step:1];
}
- (void)stepAll {
[self step:NSUIntegerMax];
}複製代碼
step和stepAll方法都是調用step:方法。step只是執行一次RACScheduler中的任務,stepAll是執行全部的RACScheduler中的任務。既然都是調用step:,那接下來分析一下step:的具體實現。
- (void)step:(NSUInteger)ticks {
@synchronized (self) {
for (NSUInteger i = 0; i < ticks; i++) {
const void *actionPtr = NULL;
if (!CFBinaryHeapGetMinimumIfPresent(self.scheduledActions, &actionPtr)) break;
RACTestSchedulerAction *action = (__bridge id)actionPtr;
CFBinaryHeapRemoveMinimumValue(self.scheduledActions);
if (action.disposable.disposed) continue;
RACScheduler *previousScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
action.block();
if (previousScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
}
}複製代碼
step:的實現主要就是一個for循環。循環的次數就是入參ticks決定的。首先const void *actionPtr是一個指向函數的指針。在上述實現中有一個很重要的函數——CFBinaryHeapGetMinimumIfPresent。該函數的原型以下:
Boolean CFBinaryHeapGetMinimumIfPresent(CFBinaryHeapRef heap, const void **value)複製代碼
這個函數的主要做用的是在二分堆heap中查找一個最小值。
static CFComparisonResult RACCompareScheduledActions(const void *ptr1, const void *ptr2, void *info) {
RACTestSchedulerAction *action1 = (__bridge id)ptr1;
RACTestSchedulerAction *action2 = (__bridge id)ptr2;
return CFDateCompare((__bridge CFDateRef)action1.date, (__bridge CFDateRef)action2.date, NULL);
}複製代碼
比較規則如上,就是比較二者的date的值。從二分堆中找出這樣一個最小值,對應的就是scheduler中的任務。若是最小值有幾個相等最小值,就隨機返回一個最小值。返回的函數放在actionPtr中。整個函數的返回值是一個BOOL值,若是二分堆不爲空,能找到最小值就返回YES,若是二分堆爲空,就找不到最小值了,就返回NO。
stepAll方法裏面傳入了NSUIntegerMax,這個for循環也不會死循環,由於到堆中全部的任務都執行完成以後,CFBinaryHeapGetMinimumIfPresent返回NO,就會執行break,跳出循環。
這裏會把currentScheduler保存到線程字典裏面。接着會執行action.block,執行任務。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != nil);
@synchronized (self) {
NSDate *uniqueDate = [NSDate dateWithTimeIntervalSinceReferenceDate:self.numberOfDirectlyScheduledBlocks];
self.numberOfDirectlyScheduledBlocks++;
RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:uniqueDate block:block];
CFBinaryHeapAddValue(self.scheduledActions, (__bridge void *)action);
return action.disposable;
}
}
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != nil);
@synchronized (self) {
RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:date block:block];
CFBinaryHeapAddValue(self.scheduledActions, (__bridge void *)action);
return action.disposable;
}
}複製代碼
schedule:方法裏面會累加numberOfDirectlyScheduledBlocks值,這個值也會初始化成時間,以便比較各個方法該調度的時間。numberOfDirectlyScheduledBlocks最終會表明總共有多少個block任務產生了。而後用CFBinaryHeapAddValue加入到堆中。
after:schedule:就是直接新建RACTestSchedulerAction對象,而後再用CFBinaryHeapAddValue把block閉包加入到堆中。
after: repeatingEvery: withLeeway: schedule:一樣也是新建RACTestSchedulerAction對象,而後再用CFBinaryHeapAddValue把block閉包加入到堆中。
RACSubscriptionScheduler是RACScheduler最後一個單例。RACScheduler中惟一的三個單例如今就齊全了:RACImmediateScheduler,RACTargetQueueScheduler ,RACSubscriptionScheduler。
+ (instancetype)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});
return subscriptionScheduler;
}複製代碼
RACSubscriptionScheduler 的名字是@"com.ReactiveCocoa.RACScheduler.subscriptionScheduler"
- (id)init {
self = [super initWithName:@"com.ReactiveCocoa.RACScheduler.subscriptionScheduler"];
if (self == nil) return nil;
_backgroundScheduler = [RACScheduler scheduler];
return self;
}複製代碼
RACSubscriptionScheduler初始化的時候會新建一個Global Dispatch Queue。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}複製代碼
若是RACScheduler.currentScheduler爲nil就用backgroundScheduler去調用block閉包,不然就執行block閉包。
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date schedule:block];
}複製代碼
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date repeatingEvery:interval withLeeway:leeway schedule:block];
}複製代碼
兩個after方法都有取出RACScheduler.currentScheduler,若是爲空就用self.backgroundScheduler去調用各自的after的方法。
RACSubscriptionScheduler中的backgroundScheduler的意義就在此,當RACScheduler.currentScheduler不存在的時候就會替換成self.backgroundScheduler。
這個子類在分析immediateScheduler方法的時候,詳細分析過了,這裏再也不贅述。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}複製代碼
schedule:會調用performAsCurrentScheduler:方法。
- (void)performAsCurrentScheduler:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACScheduler *previousScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
@autoreleasepool {
block();
}
if (previousScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}複製代碼
performAsCurrentScheduler:方法會先在調用block( )以前,把當前的scheduler存入線程字典中。
試想,若是如今在一個Concurrent Dispatch Queue中,在執行block( )以前須要先切換線程,切換到當前scheduler中。當執行完block閉包以後,previousScheduler若是不爲nil,那麼就還原現場,線程字典裏面再存回原來的scheduler,反之previousScheduler爲nil,那麼就移除掉線程字典裏面的key。
這裏須要值得注意的是:
scheduler本質實際上是一個quene,並非一個線程。它只能保證裏面的線程都是串行執行的,可是它不能保證每一個線程不必定都是在同一個線程裏面執行。
如上面這段performAsCurrentScheduler:的實現所表現的那樣。因此
在scheduler使用Core Data很容易崩潰,極可能跑到子線程上面去了。一旦寫數據的時候到了子線程上,很容易就Crash了。必定要記得回到main queue上。
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}複製代碼
在after中調用dispatch_after方法,通過date時間以後再調用performAsCurrentScheduler:。
wallTimeWithDate:的實現以下:
+ (dispatch_time_t)wallTimeWithDate:(NSDate *)date {
NSCParameterAssert(date != nil);
double seconds = 0;
double frac = modf(date.timeIntervalSince1970, &seconds);
struct timespec walltime = {
.tv_sec = (time_t)fmin(fmax(seconds, LONG_MIN), LONG_MAX),
.tv_nsec = (long)fmin(fmax(frac * NSEC_PER_SEC, LONG_MIN), LONG_MAX)
};
return dispatch_walltime(&walltime, 0);
}複製代碼
dispatch_walltime函數是由POSIX中使用的struct timespec類型的時間獲得dispatch_time_t類型的值。dispatch_time函數一般用於計算相對時間,而dispatch_walltime函數用於計算絕對時間。
這段代碼其實很簡單,就是把date的時間轉換成一個dispatch_time_t類型的。由NSDate類對象獲取能傳遞給dispatch_after函數的dispatch_time_t類型的值。
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(interval > 0.0 && interval < INT64_MAX / NSEC_PER_SEC);
NSCParameterAssert(leeway >= 0.0 && leeway < INT64_MAX / NSEC_PER_SEC);
NSCParameterAssert(block != NULL);
uint64_t intervalInNanoSecs = (uint64_t)(interval * NSEC_PER_SEC);
uint64_t leewayInNanoSecs = (uint64_t)(leeway * NSEC_PER_SEC);
dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue);
dispatch_source_set_timer(timer, [self.class wallTimeWithDate:date], intervalInNanoSecs, leewayInNanoSecs);
dispatch_source_set_event_handler(timer, block);
dispatch_resume(timer);
return [RACDisposable disposableWithBlock:^{
dispatch_source_cancel(timer);
}];
}複製代碼
after: repeatingEvery: withLeeway: schedule:方法裏面的實現就是用GCD在self.queue上建立了一個Timer,時間間隔是interval,修正時間是leeway。
leeway這個參數是爲dispatch source指定一個指望的定時器事件精度,讓系統可以靈活地管理並喚醒內核。例如系統可使用leeway值來提早或延遲觸發定時器,使其更好地與其它系統事件結合。建立本身的定時器時,應該儘可能指定一個leeway值。不過就算指定leeway值爲0,也不能完徹底全指望定時器可以按照精確的納秒來觸發事件。
這個定時器在interval執行入參閉包。在取消任務的時候調用dispatch_source_cancel取消定時器timer。
這個子類在分析mainThreadScheduler方法的時候,詳細分析過了,這裏再也不贅述。
既然RACScheduler是對GCD的封裝,那麼在GCD的上層能夠實現一些GCD所沒法完成的「特性」。這裏的「特性」是打引號的,由於底層是GCD,上層的特性只能經過一些特殊手段來實現看似是新的特性。在這一點上,RACScheduler就實現了GCD沒有的特性——「取消」任務。
Operation Queues :
相對 GCD來講,使用 Operation Queues 會增長一點點額外的開銷,可是卻換來了很是強大的靈活性和功能,它能夠給 operation 之間添加依賴關係、取消一個正在執行的 operation 、暫停和恢復 operation queue 等;
GCD:
是一種更輕量級的,以 FIFO的順序執行併發任務的方式,使用 GCD時咱們能夠並不關心任務的調度狀況,而讓系統幫咱們自動處理。可是 GCD的缺陷也是很是明顯的,想要給任務之間添加依賴關係、取消或者暫停一個正在執行的任務時就會變得很是棘手。
既然GCD不方便取消一個任務,那麼RACScheduler是怎麼作到的呢?
這就體如今RACQueueScheduler上。回頭看看RACQueueScheduler的schedule:實現 和 after: schedule:實現。
最核心的代碼:
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});複製代碼
在調用performAsCurrentScheduler:以前,加了一個判斷,判斷當前是否取消了任務,若是取消了任務,就return,不會調用block閉包。這樣就實現了取消任務的「假象」。
在整個ReactiveCocoa中,利用RACScheduler實現了不少操做,和RAC是深度整合的。這裏就來總結總結ReactiveCocoa中總共有哪些地方用到了RACScheduler。
在ReactiveCocoa 中全局搜索RACScheduler,遍歷完全部庫,RACScheduler就用在如下10個類中。下面就來看看是如何用在這些地方的。
從下面這些地方使用了Scheduler中,咱們就能夠了解到哪些操做是在子線程,哪些是在主線程。區分出了這些,對於線程不安全的操做,咱們就能心有成足的處理好它們,讓它們回到主線程中去操做,這樣就能夠減小不少莫名的Crash。這些Crash都是由於線程問題致使的。
- (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock複製代碼
這個方法十分複雜,裏面用到了RACScheduler.immediateScheduler,deliverOn:RACScheduler.mainThreadScheduler。具體的源碼分析會在下一篇RACCommand源碼分析裏面詳細分析。
- (RACSignal *)execute:(id)input複製代碼
在這個方法中,會調用subscribeOn:RACScheduler.mainThreadScheduler。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}複製代碼
在RACDynamicSignal的subscribe:訂閱過程當中會用到subscriptionScheduler。因而對這個scheduler調用schedule:就會執行下面這段代碼:
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}複製代碼
若是currentScheduler不爲空,閉包會在currentScheduler中執行,若是currentScheduler爲空,閉包就會在backgroundScheduler中執行,這是一個Global Dispatch Queue,優先級是RACSchedulerPriorityDefault。
同理,在RACEmptySignal,RACErrorSignal,RACReturnSignal,RACSignal的相關的signal的訂閱中也都會調用subscriptionScheduler。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
[subscriber sendNext:self.currentValue];
}
}];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[schedulingDisposable dispose];
}];
}複製代碼
在RACBehaviorSubject的subscribe:訂閱過程當中會用到subscriptionScheduler。因而對這個scheduler調用schedule:,代碼在上面分析過了。
同理,若是currentScheduler不爲空,閉包會在currentScheduler中執行,若是currentScheduler爲空,閉包就會在backgroundScheduler中執行,這是一個Global Dispatch Queue,優先級是RACSchedulerPriorityDefault。
它的訂閱也同上面信號的訂閱同樣,會調用subscriptionScheduler。
因爲RACReplaySubject是在子線程上,因此建議在使用Core Data這些不安全庫的時候必定要記得加上deliverOn。
在RACSequence中,如下兩個方法用到了RACScheduler:
- (RACSignal *)signal {
return [[self signalWithScheduler:[RACScheduler scheduler]] setNameWithFormat:@"[%@] -signal", self.name];
}複製代碼
- (RACSignal *)signalWithScheduler:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block RACSequence *sequence = self;
return [scheduler scheduleRecursiveBlock:^(void (^reschedule)(void)) {
if (sequence.head == nil) {
[subscriber sendCompleted];
return;
}
[subscriber sendNext:sequence.head];
sequence = sequence.tail;
reschedule();
}];
}] setNameWithFormat:@"[%@] -signalWithScheduler: %@", self.name, scheduler];
}複製代碼
上面兩個方法會調用RACScheduler中的scheduleRecursiveBlock:方法。關於這個方法的源碼分析能夠看RACSequence的源碼分析。
這裏總共有9個方法用到了Scheduler。
第一個方法:
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *))複製代碼
在上面這個方法裏面用到了
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];複製代碼
取出currentScheduler或者一個Global Dispatch Queue,而後調用scheduleRecursiveBlock:。
第二個方法:
- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate複製代碼
在上面這個方法中會調用
RACScheduler *scheduler = [RACScheduler scheduler];
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler複製代碼
在delayScheduler中調用afterDelay: schedule:方法,這也是throttle:valuesPassingTest:方法實現的很重要的一步。
第三個方法:
- (RACSignal *)delay:(NSTimeInterval)interval複製代碼
因爲這是一個延遲方法,確定是會調用Scheduler的after方法。
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];複製代碼
RACScheduler.currentScheduler ?: scheduler 這個判斷在上述幾個時間相關的方法都用到了。
因此,這裏給一個建議:
delay因爲不必定會回到當前線程中,因此delay以後再去訂閱可能就在子線程中去執行。因此使用delay的時候最好追加一個deliverOn。
第四個方法:
- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler複製代碼
在這個方法中理所固然的須要調用[scheduler afterDelay:interval schedule:flushValues]這個方法,來達到延遲的目的,從而實現緩衝buffer的效果。
第五個方法:
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler複製代碼
第六個方法:
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway { }複製代碼
第五個方法 和 第六個方法都用傳進去的入參scheduler去調用after: repeatingEvery: withLeeway: schedule:方法。
第七個方法:
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler { }複製代碼
在這個方法中會用入參scheduler調用afterDelay: schedule:,延遲一段時候後,執行[disposable dispose],從而也實現了超時發送sendError:。
第八個方法:
- (RACSignal *)deliverOn:(RACScheduler *)scheduler { }複製代碼
第九個方法:
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler { }複製代碼
第八個方法 和 第九個方法都是根據入參scheduler去調用schedule:方法。入參是什麼類型的scheduler決定了schedule:執行在哪一個queue上。
在RACSignal也有積極計算和惰性求值的信號。
+ (RACSignal *)startEagerlyWithScheduler:(RACScheduler *)scheduler block:(void (^)(id<RACSubscriber> subscriber))block {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(block != NULL);
RACSignal *signal = [self startLazilyWithScheduler:scheduler block:block];
[[signal publish] connect];
return [signal setNameWithFormat:@"+startEagerlyWithScheduler: %@ block:", scheduler];
}複製代碼
startEagerlyWithScheduler中會調用startLazilyWithScheduler產生一個信號signal,而後緊接着轉換成熱信號。經過startEagerlyWithScheduler產生的信號就直接是一個熱信號。
+ (RACSignal *)startLazilyWithScheduler:(RACScheduler *)scheduler block:(void (^)(id<RACSubscriber> subscriber))block {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(block != NULL);
RACMulticastConnection *connection = [[RACSignal
createSignal:^ id (id<RACSubscriber> subscriber) {
block(subscriber);
return nil;
}]
multicast:[RACReplaySubject subject]];
return [[[RACSignal
createSignal:^ id (id<RACSubscriber> subscriber) {
[connection.signal subscribe:subscriber];
[connection connect];
return nil;
}]
subscribeOn:scheduler]
setNameWithFormat:@"+startLazilyWithScheduler: %@ block:", scheduler];
}複製代碼
上述是startLazilyWithScheduler:的源碼實現,在這個方法中和startEagerlyWithScheduler最大的區別就出來了,connect方法在return的信號中,因此Lazily就體如今,經過startLazilyWithScheduler創建出來的信號,只有訂閱它以後才能調用到connect,轉變成熱信號。
在這裏調用了subscribeOn:scheduler,這裏用到了scheduler。
+ (RACSignal *)rac_readContentsOfURL:(NSURL *)URL options:(NSDataReadingOptions)options scheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
RACReplaySubject *subject = [RACReplaySubject subject];
[subject setNameWithFormat:@"+rac_readContentsOfURL: %@ options: %lu scheduler: %@", URL, (unsigned long)options, scheduler];
[scheduler schedule:^{
NSError *error = nil;
NSData *data = [[NSData alloc] initWithContentsOfURL:URL options:options error:&error];
if (data == nil) {
[subject sendError:error];
} else {
[subject sendNext:data];
[subject sendCompleted];
}
}];
return subject;
}複製代碼
在這個方法中,會傳入RACQueueScheduler或者RACTargetQueueScheduler的RACScheduler。那麼調用schedule方法就會執行到這裏:
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}複製代碼
+ (RACSignal *)rac_readContentsOfURL:(NSURL *)URL usedEncoding:(NSStringEncoding *)encoding scheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
RACReplaySubject *subject = [RACReplaySubject subject];
[subject setNameWithFormat:@"+rac_readContentsOfURL: %@ usedEncoding:scheduler: %@", URL, scheduler];
[scheduler schedule:^{
NSError *error = nil;
NSString *string = [NSString stringWithContentsOfURL:URL usedEncoding:encoding error:&error];
if (string == nil) {
[subject sendError:error];
} else {
[subject sendNext:string];
[subject sendCompleted];
}
}];
return subject;
}複製代碼
同NSData+RACSupport中的rac_readContentsOfURL: options: scheduler:同樣,也會傳入RACQueueScheduler或者RACTargetQueueScheduler的RACScheduler。
RACScheduler *scheduler = [RACScheduler scheduler];複製代碼
在這個方法中也會新建RACTargetQueueScheduler,一個Global Dispatch Queue。優先級是RACSchedulerPriorityDefault。
關於RACScheduler底層實現分析都已經分析完成。最後請你們多多指教。
本次徵文活動的連接:
gold.xitu.io/post/58522d…