ReactiveCocoa 中 RACSignal 是如何發送信號的

前言

ReactiveCocoa是一個(第一個?)將函數響應式編程範例帶入Objective-C的開源庫。ReactiveCocoa是由Josh AbernathyJustin Spahr-Summers 兩位大神在對GitHub for Mac的開發過程當中編寫的。Justin Spahr-Summers 大神在2011年11月13號下午12點35分進行的第一次提交,直到2013年2月13日上午3點05分發布了其1.0 release,達到了第一個重要里程碑。ReactiveCocoa社區也很是活躍,目前最新版已經完成了ReactiveCocoa 5.0.0-alpha.3,目前在5.0.0-alpha.4開發中。react

ReactiveCocoa v2.5 是公認的Objective-C最穩定的版本,所以被廣大的以OC爲主要語言的客戶端選中使用。ReactiveCocoa v3.x主要是基於Swift 1.2的版本,而ReactiveCocoa v4.x 主要基於Swift 2.x,ReactiveCocoa 5.0就全面支持Swift 3.0,也許還有之後的Swift 4.0。接下來幾篇博客先以ReactiveCocoa v2.5版本爲例子,分析一下OC版的RAC具體實現(也許分析完了RAC 5.0就到來了)。也算是寫在ReactiveCocoa 5.0正式版到來前夕的祝福吧。git

目錄

  • 1.什麼是ReactiveCocoa?
  • 2.RAC中的核心RACSignal發送與訂閱流程
  • 3.RACSignal操做的核心bind實現
  • 4.RACSignal基本操做concat和zipWith實現
  • 5.最後

一. 什麼是ReactiveCocoa?

ReactiveCocoa(其簡稱爲RAC)是由Github 開源的一個應用於iOS和OS X開發的新框架。RAC具備函數式編程(FP)和響應式編程(RP)的特性。它主要吸收了.Net的 Reactive Extensions的設計和實現。github

ReactiveCocoa 的宗旨是Streams of values over time ,隨着時間變化而不斷流動的數據流。編程

ReactiveCocoa 主要解決了如下這些問題:api

  • UI數據綁定

UI控件一般須要綁定一個事件,RAC能夠很方便的綁定任何數據流到控件上。數組

  • 用戶交互事件綁定

RAC爲可交互的UI控件提供了一系列能發送Signal信號的方法。這些數據流會在用戶交互中相互傳遞。安全

  • 解決狀態以及狀態之間依賴過多的問題

有了RAC的綁定以後,能夠不用在關心各類複雜的狀態,isSelect,isFinish……也解決了這些狀態在後期很難維護的問題。閉包

  • 消息傳遞機制的大統一

OC中編程原來消息傳遞機制有如下幾種:Delegate,Block Callback,Target-Action,Timers,KVO,objc上有一篇關於OC中這5種消息傳遞方式改如何選擇的文章Communication Patterns,推薦你們閱讀。如今有了RAC以後,以上這5種方式均可以統一用RAC來處理。框架

二. RAC中的核心RACSignal

ReactiveCocoa 中最核心的概念之一就是信號RACStream。RACRACStream中有兩個子類——RACSignal 和 RACSequence。本文先來分析RACSignal。jsp

咱們會常常看到如下的代碼:

RACSignal *signal = [RACSignal createSignal:
                     ^RACDisposable *(id<RACSubscriber> subscriber)
{
    [subscriber sendNext:@1];
    [subscriber sendNext:@2];
    [subscriber sendNext:@3];
    [subscriber sendCompleted];
    return [RACDisposable disposableWithBlock:^{
        NSLog(@"signal dispose");
    }];
}];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
    NSLog(@"subscribe value = %@", x);
} error:^(NSError *error) {
    NSLog(@"error: %@", error);
} completed:^{
    NSLog(@"completed");
}];

[disposable dispose];複製代碼

這是一個RACSignal被訂閱的完整過程。被訂閱的過程當中,究竟發生了什麼?

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
 return [RACDynamicSignal createSignal:didSubscribe];
}複製代碼

RACSignal調用createSignal的時候,會調用RACDynamicSignal的createSignal的方法。

RACDynamicSignal是RACSignal的子類。createSignal後面的參數是一個block。

(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe複製代碼

block的返回值是RACDisposable類型,block名叫didSubscribe。block的惟一一個參數是id 類型的subscriber,這個subscriber是必須遵循RACSubscriber協議的。

RACSubscriber是一個協議,其下有如下4個協議方法:

@protocol RACSubscriber <NSObject>
@required

- (void)sendNext:(id)value;
- (void)sendError:(NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

@end複製代碼

因此新建Signal的任務就所有落在了RACSignal的子類RACDynamicSignal上了。

@interface RACDynamicSignal ()
// The block to invoke for each subscriber.
@property (nonatomic, copy, readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);
@end複製代碼

RACDynamicSignal這個類很簡單,裏面就保存了一個名字叫didSubscribe的block。

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
     RACDynamicSignal *signal = [[self alloc] init];
     signal->_didSubscribe = [didSubscribe copy];
     return [signal setNameWithFormat:@"+createSignal:"];
}複製代碼

這個方法中新建了一個RACDynamicSignal對象signal,並把傳進來的didSubscribe這個block保存進剛剛新建對象signal裏面的didSubscribe屬性中。最後再給signal命名+createSignal:。

- (instancetype)setNameWithFormat:(NSString *)format, ... {
 if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;

   NSCParameterAssert(format != nil);

   va_list args;
   va_start(args, format);

   NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
   va_end(args);

   self.name = str;
   return self;
}複製代碼

setNameWithFormat是RACStream裏面的方法,因爲RACDynamicSignal繼承自RACSignal,因此它也能調用這個方法。

RACSignal的block就這樣被保存起來了,那何時會被執行呢?

block閉包在訂閱的時候纔會被「釋放」出來。

RACSignal調用subscribeNext方法,返回一個RACDisposable。

- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
   NSCParameterAssert(nextBlock != NULL);
   NSCParameterAssert(errorBlock != NULL);
   NSCParameterAssert(completedBlock != NULL);

   RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
   return [self subscribe:o];
}複製代碼

在這個方法中會新建一個RACSubscriber對象,並傳入nextBlock,errorBlock,completedBlock。

@interface RACSubscriber ()

// These callbacks should only be accessed while synchronized on self.
@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end複製代碼

RACSubscriber這個類很簡單,裏面只有4個屬性,分別是nextBlock,errorBlock,completedBlock和一個RACCompoundDisposable信號。

+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
 RACSubscriber *subscriber = [[self alloc] init];

   subscriber->_next = [next copy];
   subscriber->_error = [error copy];
   subscriber->_completed = [completed copy];

   return subscriber;
}複製代碼

subscriberWithNext方法把傳入的3個block都保存分別保存到本身對應的block中。

RACSignal調用subscribeNext方法,最後return的時候,會調用[self subscribe:o],這裏實際是調用了RACDynamicSignal類裏面的subscribe方法。

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
 NSCParameterAssert(subscriber != nil);

   RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
   subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

   if (self.didSubscribe != NULL) {
      RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
      RACDisposable *innerDisposable = self.didSubscribe(subscriber);
      [disposable addDisposable:innerDisposable];
  }];

    [disposable addDisposable:schedulingDisposable];
 }

 return disposable;
}複製代碼

RACDisposable有3個子類,其中一個就是RACCompoundDisposable。

@interface RACCompoundDisposable : RACDisposable
+ (instancetype)compoundDisposable;
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;
- (void)addDisposable:(RACDisposable *)disposable;
- (void)removeDisposable:(RACDisposable *)disposable;
@end複製代碼

RACCompoundDisposable雖然是RACDisposable的子類,可是它裏面能夠加入多個RACDisposable對象,在必要的時候能夠一口氣都調用dispose方法來銷燬信號。當RACCompoundDisposable對象被dispose的時候,也會自動dispose容器內的全部RACDisposable對象。

RACPassthroughSubscriber是一個私有的類。

@interface RACPassthroughSubscriber : NSObject <RACSubscriber>
@property (nonatomic, strong, readonly) id<RACSubscriber> innerSubscriber;
@property (nonatomic, unsafe_unretained, readonly) RACSignal *signal;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable;
@end複製代碼

RACPassthroughSubscriber類就只有這一個方法。目的就是爲了把全部的信號事件從一個訂閱者subscriber傳遞給另外一個尚未disposed的訂閱者subscriber。

RACPassthroughSubscriber類中保存了3個很是重要的對象,RACSubscriber,RACSignal,RACCompoundDisposable。RACSubscriber是待轉發的信號的訂閱者subscriber。RACCompoundDisposable是訂閱者的銷燬對象,一旦它被disposed了,innerSubscriber就再也接受不到事件流了。

這裏須要注意的是內部還保存了一個RACSignal,而且它的屬性是unsafe_unretained。這裏和其餘兩個屬性有區別, 其餘兩個屬性都是strong的。這裏之因此不是weak,是由於引用RACSignal僅僅只是一個DTrace probes動態跟蹤技術的探針。若是設置成weak,會形成不必的性能損失。因此這裏僅僅是unsafe_unretained就夠了。

- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
   NSCParameterAssert(subscriber != nil);

   self = [super init];
   if (self == nil) return nil;

   _innerSubscriber = subscriber;
   _signal = signal;
   _disposable = disposable;

   [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
   return self;
}複製代碼

回到RACDynamicSignal類裏面的subscribe方法中,如今新建好了RACCompoundDisposable和RACPassthroughSubscriber對象了。

if (self.didSubscribe != NULL) {
  RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
   RACDisposable *innerDisposable = self.didSubscribe(subscriber);
   [disposable addDisposable:innerDisposable];
  }];

  [disposable addDisposable:schedulingDisposable];
 }複製代碼

RACScheduler.subscriptionScheduler是一個全局的單例。

+ (instancetype)subscriptionScheduler {
   static dispatch_once_t onceToken;
   static RACScheduler *subscriptionScheduler;
   dispatch_once(&onceToken, ^{
    subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
   });

   return subscriptionScheduler;
}複製代碼

RACScheduler再繼續調用schedule方法。

- (RACDisposable *)schedule:(void (^)(void))block {
   NSCParameterAssert(block != NULL);
   if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
   block();
   return nil;
}複製代碼
+ (BOOL)isOnMainThread {
 return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}

+ (instancetype)currentScheduler {
 RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
 if (scheduler != nil) return scheduler;
 if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

 return nil;
}複製代碼

在取currentScheduler的過程當中,會判斷currentScheduler是否存在,和是否在主線程中。若是都沒有,那麼就會調用後臺backgroundScheduler去執行schedule。

schedule的入參就是一個block,執行schedule的時候會去執行block。也就是會去執行:

RACDisposable *innerDisposable = self.didSubscribe(subscriber);
   [disposable addDisposable:innerDisposable];複製代碼

這兩句關鍵的語句。以前信號裏面保存的block就會在此處被「釋放」執行。self.didSubscribe(subscriber)這一句就執行了信號保存的didSubscribe閉包。

在didSubscribe閉包中有sendNext,sendError,sendCompleted,執行這些語句會分別調用RACPassthroughSubscriber裏面對應的方法。

- (void)sendNext:(id)value {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_NEXT_ENABLED()) {
  RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
 }
 [self.innerSubscriber sendNext:value];
}

- (void)sendError:(NSError *)error {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_ERROR_ENABLED()) {
  RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
 }
 [self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
 if (self.disposable.disposed) return;
 if (RACSIGNAL_COMPLETED_ENABLED()) {
  RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
 }
 [self.innerSubscriber sendCompleted];
}複製代碼

這個時候的訂閱者是RACPassthroughSubscriber。RACPassthroughSubscriber裏面的innerSubscriber纔是最終的實際訂閱者,RACPassthroughSubscriber會把值再繼續傳遞給innerSubscriber。

- (void)sendNext:(id)value {
 @synchronized (self) {
  void (^nextBlock)(id) = [self.next copy];
  if (nextBlock == nil) return;

  nextBlock(value);
 }
}

- (void)sendError:(NSError *)e {
 @synchronized (self) {
  void (^errorBlock)(NSError *) = [self.error copy];
  [self.disposable dispose];

  if (errorBlock == nil) return;
  errorBlock(e);
 }
}

- (void)sendCompleted {
 @synchronized (self) {
  void (^completedBlock)(void) = [self.completed copy];
  [self.disposable dispose];

  if (completedBlock == nil) return;
  completedBlock();
 }
}複製代碼

innerSubscriber是RACSubscriber,調用sendNext的時候會先把本身的self.next閉包copy一份,再調用,並且整個過程仍是線程安全的,用@synchronized保護着。最終訂閱者的閉包在這裏被調用。

sendError和sendCompleted也都是同理。

總結一下:

  1. RACSignal調用subscribeNext方法,新建一個RACSubscriber。
  2. 新建的RACSubscriber會copy,nextBlock,errorBlock,completedBlock存在本身的屬性變量中。
  3. RACSignal的子類RACDynamicSignal調用subscribe方法。
  4. 新建RACCompoundDisposable和RACPassthroughSubscriber對象。RACPassthroughSubscriber分別保存對RACSignal,RACSubscriber,RACCompoundDisposable的引用,注意對RACSignal的引用是unsafe_unretained的。
  5. RACDynamicSignal調用didSubscribe閉包。先調用RACPassthroughSubscriber的相應的sendNext,sendError,sendCompleted方法。
  6. RACPassthroughSubscriber再去調用self.innerSubscriber,即RACSubscriber的nextBlock,errorBlock,completedBlock。注意這裏調用一樣是先copy一份,再調用閉包執行。

三. RACSignal操做的核心bind實現

在RACSignal的源碼裏面包含了兩個基本操做,concat和zipWith。不過在分析這兩個操做以前,先來分析一下更加核心的一個函數,bind操做。

先來講說bind函數的做用:

  1. 會訂閱原始的信號。
  2. 任什麼時候刻原始信號發送一個值,都會綁定的block轉換一次。
  3. 一旦綁定的block轉換了值變成信號,就當即訂閱,並把值發給訂閱者subscriber。
  4. 一旦綁定的block要終止綁定,原始的信號就complete。
  5. 當全部的信號都complete,發送completed信號給訂閱者subscriber。
  6. 若是中途信號出現了任何error,都要把這個錯誤發送給subscriber
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
 NSCParameterAssert(block != NULL);

 return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
  RACStreamBindBlock bindingBlock = block();

  NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

  RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

  void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) { /*這裏暫時省略*/ };
  void (^addSignal)(RACSignal *) = ^(RACSignal *signal) { /*這裏暫時省略*/ };

  @autoreleasepool {
   RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
   [compoundDisposable addDisposable:selfDisposable];

   RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
    // Manually check disposal to handle synchronous errors.
    if (compoundDisposable.disposed) return;

    BOOL stop = NO;
    id signal = bindingBlock(x, &stop);

    @autoreleasepool {
     if (signal != nil) addSignal(signal);
     if (signal == nil || stop) {
      [selfDisposable dispose];
      completeSignal(self, selfDisposable);
     }
    }
   } error:^(NSError *error) {
    [compoundDisposable dispose];
    [subscriber sendError:error];
   } completed:^{
    @autoreleasepool {
     completeSignal(self, selfDisposable);
    }
   }];

   selfDisposable.disposable = bindingDisposable;
  }

  return compoundDisposable;
 }] setNameWithFormat:@"[%@] -bind:", self.name];
}複製代碼

爲了弄清楚bind函數究竟作了什麼,寫出測試代碼:

RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];

    RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
        return ^RACSignal *(NSNumber *value, BOOL *stop){
            value = @(value.integerValue * 2);
            return [RACSignal return:value];
        };
    }];

    [bindSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];複製代碼

因爲前面第一章節詳細講解了RACSignal的建立和訂閱的全過程,這個也爲了方法講解,建立RACDynamicSignal,RACCompoundDisposable,RACPassthroughSubscriber這些都略過,這裏着重分析一下bind的各個閉包傳遞建立和訂閱的過程。

爲了防止接下來的分析會讓讀者看暈,這裏先把要用到的block進行編號。

RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        // block 1
    }

    RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
        // block 2
        return ^RACSignal *(NSNumber *value, BOOL *stop){
            // block 3
        };
    }];

    [bindSignal subscribeNext:^(id x) {
        // block 4
    }];

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
        // block 5
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        // block 6
        RACStreamBindBlock bindingBlock = block();
        NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

        void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
        // block 7
        };

        void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
        // block 8
            RACDisposable *disposable = [signal subscribeNext:^(id x) {
            // block 9
            }];
        };

        @autoreleasepool {
            RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
                // block 10
                id signal = bindingBlock(x, &stop);

                @autoreleasepool {
                    if (signal != nil) addSignal(signal);
                    if (signal == nil || stop) {
                        [selfDisposable dispose];
                        completeSignal(self, selfDisposable);
                    }
                }
            } error:^(NSError *error) {
                [compoundDisposable dispose];
                [subscriber sendError:error];
            } completed:^{
                @autoreleasepool {
                    completeSignal(self, selfDisposable);
                }
            }];
        }
        return compoundDisposable;
    }] ;
}複製代碼

先建立信號signal,didSubscribe把block1 copy保存起來。

當信號調用bind進行綁定,會調用block5,didSubscribe把block6 copy保存起來。

當訂閱者開始訂閱bindSignal的時候,流程以下:

  1. bindSignal執行didSubscribe的block,即執行block6。
  2. 在block6 的第一句代碼,就是調用RACStreamBindBlock bindingBlock = block(),這裏的block是外面傳進來的block2,因而開始調用block2。執行完block2,會返回一個RACStreamBindBlock的對象。
  3. 因爲是signal調用的bind函數,因此bind函數裏面的self就是signal,在bind內部訂閱了signal的信號。subscribeNext因此會執行block1。
  4. 執行block1,sendNext調用訂閱者subscriber的nextBlock,因而開始執行block10。
  5. block10中會先調用bindingBlock,這個是以前調用block2的返回值,這個RACStreamBindBlock對象裏面保存的是block3。因此開始調用block3。
  6. 在block3中入參是一個value,這個value是signal中sendNext中發出來的value的值,在block3中能夠對value進行變換,變換完成後,返回一個新的信號signal'。
  7. 若是返回的signal'爲空,則會調用completeSignal,即調用block7。block7中會發送sendCompleted。若是返回的signal'不爲空,則會調用addSignal,即調用block8。block8中會繼續訂閱signal'。執行block9。
  8. block9 中會sendNext,這裏的subscriber是block6的入參,因而對subscriber調用sendNext,會調用到bindSignal的訂閱者的block4中。
  9. block9 中執行完sendNext,還會調用sendCompleted。這裏的是在執行block9裏面的completed閉包。completeSignal(signal, selfDisposable);而後又會調用completeSignal,即block7。
  10. 執行完block7,就完成了一次從signal 發送信號sendNext的全過程。

bind整個流程就完成了。

四. RACSignal基本操做concat和zipWith實現

接下來再來分析RACSignal中另外2個基本操做。

1. concat

寫出測試代碼:

RACSignal *signal = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@1];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];


    RACSignal *signals = [RACSignal createSignal:
                         ^RACDisposable *(id<RACSubscriber> subscriber)
    {
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendNext:@6];
        [subscriber sendCompleted];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"signal dispose");
        }];
    }];

    RACSignal *concatSignal = [signal concat:signals];

    [concatSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];複製代碼

concat操做就是把兩個信號合併起來。注意合併有前後順序。

- (RACSignal *)concat:(RACSignal *)signal {
   return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

    RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
     // 發送第一個信號的值
     [subscriber sendNext:x];
    } error:^(NSError *error) {
     [subscriber sendError:error];
    } completed:^{
     // 訂閱第二個信號
     RACDisposable *concattedDisposable = [signal subscribe:subscriber];
     serialDisposable.disposable = concattedDisposable;
  }];

    serialDisposable.disposable = sourceDisposable;
    return serialDisposable;
 }] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}複製代碼

合併前,signal和signals分別都把各自的didSubscribe保存copy起來。
合併以後,合併以後新的信號的didSubscribe會把block保存copy起來。

當合並以後的信號被訂閱的時候:

  1. 調用新的合併信號的didSubscribe。
  2. 因爲是第一個信號調用的concat方法,因此block中的self是前一個信號signal。合併信號的didSubscribe會先訂閱signal。
  3. 因爲訂閱了signal,因而開始執行signal的didSubscribe,sendNext,sendError。
  4. 當前一個信號signal發送sendCompleted以後,就會開始訂閱後一個信號signals,調用signals的didSubscribe。
  5. 因爲訂閱了後一個信號,因而後一個信號signals開始發送sendNext,sendError,sendCompleted。

這樣兩個信號就先後有序的拼接到了一塊兒。

這裏有一點須要注意的是,兩個信號concat在一塊兒以後,新的信號的結束信號在第二個信號結束的時候才結束。看上圖描述,新的信號的發送長度等於前面兩個信號長度之和,concat以後的新信號的結束信號也就是第二個信號的結束信號。

2. zipWith

寫出測試代碼:

RACSignal *concatSignal = [signal zipWith:signals];

    [concatSignal subscribeNext:^(id x) {
        NSLog(@"subscribe value = %@", x);
    }];複製代碼

源碼以下:

- (RACSignal *)zipWith:(RACSignal *)signal {
    NSCParameterAssert(signal != nil);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block BOOL selfCompleted = NO;
        NSMutableArray *selfValues = [NSMutableArray array];

        __block BOOL otherCompleted = NO;
        NSMutableArray *otherValues = [NSMutableArray array];

        void (^sendCompletedIfNecessary)(void) = ^{
            @synchronized (selfValues) {
                BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
                BOOL otherEmpty = (otherCompleted && otherValues.count == 0);

                // 若是任意一個信號完成而且數組裏面空了,就整個信號算完成
                if (selfEmpty || otherEmpty) [subscriber sendCompleted];
            }
        };

        void (^sendNext)(void) = ^{
            @synchronized (selfValues) {

                // 數組裏面的空了就返回。
                if (selfValues.count == 0) return;
                if (otherValues.count == 0) return;

                // 每次都取出兩個數組裏面的第0位的值,打包成元組
                RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
                [selfValues removeObjectAtIndex:0];
                [otherValues removeObjectAtIndex:0];

                // 把元組發送出去
                [subscriber sendNext:tuple];
                sendCompletedIfNecessary();
            }
        };

        // 訂閱第一個信號
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            @synchronized (selfValues) {

                // 把第一個信號的值加入到數組中
                [selfValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {

                // 訂閱完成時判斷是否要發送完成信號
                selfCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];

        // 訂閱第二個信號
        RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
            @synchronized (selfValues) {

                // 把第二個信號加入到數組中
                [otherValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {

                // 訂閱完成時判斷是否要發送完成信號
                otherCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];

        return [RACDisposable disposableWithBlock:^{

            // 銷燬兩個信號
            [selfDisposable dispose];
            [otherDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}複製代碼

當把兩個信號經過zipWith以後,就像上面的那張圖同樣,拉鍊的兩邊被中間的拉索拉到了一塊兒。既然是拉鍊,那麼一一的位置是有對應的,上面的拉鍊第一個位置只能對着下面拉鍊第一個位置,這樣拉鍊才能拉到一塊兒去。

具體實現:

zipWith裏面有兩個數組,分別會存儲兩個信號的值。

  1. 一旦訂閱了zipWith以後的信號,就開始執行didSubscribe閉包。
  2. 在閉包中會先訂閱第一個信號。這裏假設第一個信號比第二個信號先發出一個值。第一個信號發出來的每個值都會被加入到第一個數組中保存起來,而後調用sendNext( )閉包。在sendNext( )閉包中,會先判斷兩個數組裏面是否都爲空,若是有一個數組裏面是空的,就return。因爲第二個信號尚未發送值,即第二個信號的數組裏面是空的,因此這裏第一個值發送不出來。因而第一個信號被訂閱以後,發送的值存儲到了第一個數組裏面了,沒有發出去。
  3. 第二個信號的值緊接着發出來了,第二個信號每發送一次值,也會存儲到第二個數組中,可是這個時候再調用sendNext( )閉包的時候,不會再return了,由於兩個數組裏面都有值了,兩個數組的第0號位置都有一個值了。有值之後就打包成元組RACTuple發送出去。並清空兩個數組0號位置存儲的值。
  4. 之後兩個信號每次發送一個,就先存儲在數組中,只要有「配對」的另外一個信號,就一塊兒打包成元組RACTuple發送出去。從圖中也能夠看出,zipWith以後的新信號,每一個信號的發送時刻是等於兩個信號最晚發出信號的時刻。
  5. 新信號的完成時間,是當二者任意一個信號完成而且數組裏面爲空,就算完成了。因此最後第一個信號發送的5的那個值就被丟棄了。

第一個信號依次發送的1,2,3,4的值和第二個信號依次發送的A,B,C,D的值,一一的合在了一塊兒,就像拉鍊把他們拉在一塊兒。因爲5無法配對,因此拉鍊也拉不上了。

五. 最後

原本這篇文章想把Map,combineLatest,flattenMap,flatten這些也一塊兒分析了,可是後來看到RACSingnal的操做實在有點多,因而按照源碼的文件分開了,這裏先把RACSignal文件裏面的操做都分析完了。RACSignal文件裏面的操做主要就bind,concat和zipWith三個操做。下一篇再分析分析RACSignal+Operations文件裏面的全部操做。

請你們多多指教。

相關文章
相關標籤/搜索