Spring Reactor介紹

Reactor - A Foundation for Reactive FastData Appli該文簡單介紹了Spring reactor 1.0的基本特性。
目前reactor是做爲Spring.io核心包下面項目。
Reactor 是一個基礎性庫包
–定位在用戶級和低級之間的灰色區域的抽象。
– 可以在Reactor上創建組件和應用核心
– 驅動器 服務器和數據整合庫,領域整合庫,事件驅動架構
Reactor的應用是reactive的。
– 屬於Reactive Extensions in .NET
– 相似Netflix RxJava
– 觀察者模式Observer pattern
Reactor應用基於一個Selector發送事件。
– 象一個消息系統中routing topic, 可是它是一個對象
– 支持Regex, URI template, Class.isAssingableFrom, 定製邏輯
Reactor Core內部封裝了LMAX Disruptor的RingBuffer,再經過Reactor-Spring等支持支持各類Spring應用,以下圖:


Reactor演示代碼

Environment env =  Environment();
Reactor reactor = Reactors.reactor()
                               .env(env)
                               .dispatcher(RING_BUFFER)
                               .get();

reactor.on($(「topic」), (Event<String> ev) → {
                             System.out.println(「Hello 「 + ev.getData());
                  });

reactor.notify(「topic」, Event.wrap(「John Doe」));
RING_BUFFER是Disruptor的RingBuffer操做,熟悉Disruptor的應該知道。
reactor.notify發送一個事件,而reactor.on可以接受到這個事件即時響應。
Reactor 的分發器 Dispatchers 相似Akka的分發器
● 分發器管理任務執行,有下面幾種:
– ThreadPoolExecutorDispatcher
● 標準的 ThreadPoolExecutor
– BlockingQueueDispatcher
● 可以進行事件輪詢
– RingBufferDispatcher
● LMAX Disruptor RingBuffer
– SynchronousDispatcher
Reactor的 Selectors
● Selectors 是一個等式的左邊。
– 一個Selector可以被任何對象使用$(obj)建立
(或者: Selectors.object(obj))
– 一個Selector可以從匹配的key中釋放數據
– Predicate<T> Selectors 可以建立匹配特定領域準則
(domain-specific criteria)

好比RegexSelector:
reactor.on(R(「some.(.+)」), (Event<String> ev) → {
// s will be 'topic'
String s = ev.getHeaders().get(「group1」);
});

reactor.notify(「some.topic」, Event.wrap(「John Doe」));

其中R(「some.(.*)」)匹配事件發送者「some.topic」。

UriTemplateSelector可以從URI匹配字符串:
reactor.on(U(「/some/{topic}」), (Event<String> ev) → {
// s will be 'topic'
String s = ev.getHeaders().get(「topic」);
});
reactor.notify(「/some/topic」, Event.wrap(「John Doe」));

Reactor 的Stream
● Streams容許基於數據的函數組合composition
– Callback++
– 相似Netflix RxJava Observable, JDK 8 Stream


Stream<String> str;
str.map(String::toUpperCase)
     .filter( Predicate<String>() {
                 test(String s) { … }
     })
    .consume(s → log.info(「consumed string {}」, s));
Reactor 的 Promise
容許在Stream之間分享函數

Promise<String> p;
String s = p
        .onSuccess(s → log.info(「consumed string {}」, s))
        .onFailure(t → log.error(t.getMessage(), t))
        .onComplete(t → log.info(「complete」))
        .await(5, SECONDS);

p.map(String::toUpperCase).consume(s → log.info(「UC: {}」, s));
Reactor 的 Processor
乾脆直接將Disruptor API轉爲Reactor API
對於#UberFastData有超級快性能


Processor<Buffer> proc;
Operation<Buffer> op = proc.prepare();
op.get().append(data).flip();
op.commit();
proc.batch(512, buff → buff.append(data).flip());
與Spring整合:
首先使用@EnableReactor 激活reactor

@Configuration
@EnableReactor
  ReactorConfiguration {

  @Bean
    Reactor input(Environment env) {
         Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
   }

   @Bean
    Reactor output(Environment env) {
         Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
}
而後在監聽者或觀察者寫入:
@Component
  SimpleHandler {
    @Autowired
     Reactor reactor;

    @Selector(「test.topic」)
      onTestTopic(String s) {
reactor的groovy整合:
@CompileStatic
def welcome(){
    reactor.on('greetings') { String s ->
            reply 「hello $s」
            reply 「how are you?」
}
reactor.notify 'greetings', 'Jon'
           reactor.send('greetings', 'Stephane'){
                  println it
            cancel()
           }
}

相關文章
相關標籤/搜索