使用Reactor完成相似的Flink的操做

1、背景

Flink在處理流式任務的時候有很大的優點,其中windows等操做符能夠很方便的完成聚合任務,可是Flink是一套獨立的服務,業務流程中若是想使用須要將數據發到kafka,用Flink處理完再發到kafka,而後再作業務處理,流程很繁瑣。html

好比在業務代碼中想要實現相似Flink的window按時間批量聚合功能,若是純手動寫代碼比較繁瑣,使用Flink又過重,這種場景下使用響應式編程RxJava、Reactor等的window、buffer操做符能夠很方便的實現。react

響應式編程框架也早已有了背壓以及豐富的操做符支持,能不能用響應式編程框架處理相似Flink的操做呢,答案是確定的。git

本文使用Reactor來實現Flink的window功能來舉例,其餘操做符理論上相同。文中涉及的代碼:githubgithub

2、實現過程

Flink對流式處理作的很好的封裝,使用Flink的時候幾乎不用關心線程池、積壓、數據丟失等問題,可是使用Reactor實現相似的功能就必須對Reactor運行原理比較瞭解,而且通過不一樣場景下測試,不然很容易出問題。apache

下面列舉出實現過程當中的核心點:編程

一、建立Flux和發送數據分離

入門Reactor的時候給的示例都是建立Flux的時候同時就把數據賦值了,好比:Flux.just、Flux.range等,從3.4.0版本後先建立Flux,再發送數據可以使用Sinks完成。有兩個比較容易混淆的方法:windows

  • Sinks.many().multicast() 若是沒有訂閱者,那麼接收的消息直接丟棄
  • Sinks.many().unicast() 若是沒有訂閱者,那麼保存接收的消息直到第一個訂閱者訂閱
  • Sinks.many().replay() 無論有多少訂閱者,都保存全部消息

在此示例場景中,選擇的是Sinks.many().unicast()緩存

官方文檔:https://projectreactor.io/docs/core/release/reference/#processors多線程

二、背壓支持

上面方法的對象背壓策略支持兩種:BackpressureBuffer、BackpressureError,在此場景確定是選擇BackpressureBuffer,須要指定緩存隊列,初始化方法以下:Queues. get(queueSize).get() 架構

數據提交有兩個方法:

  • emitNext 指定提交失敗策略同步提交
  • tryEmitNext 異步提交,返回提交成功、失敗狀態

在此場景咱們不但願丟數據,可自定義失敗策略,提交失敗無限重試,固然也能夠調用異步方法本身重試。

Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();

在此以後就就能夠調用Sinks.asFlux開心的使用各類操做符了。

三、窗口函數

Reactor支持兩類窗口聚合函數:

  • window類:返回Mono(Flux )
  • buffer類:返回List

在此場景中,使用buffer便可知足需求,bufferTimeout(int maxSize, Duration maxTime)支持最大個數,最大等待時間操做,Flink中的keys操做能夠用groupBy、collectMap來實現。

四、消費者處理

Reactor通過buffer後是一個一個的發送數據,若是使用publishOn或subscribeOn處理的話,只等待下游的subscribe處理完成纔會從新request新的數據,buffer操做符纔會從新發送數據。若是此時subscribe消費者耗時較長,數據流會在buffer流程阻塞,顯然並非咱們想要的。

理想的操做是消費者在一個線程池裏操做,可多線程並行處理,若是線程池滿,再阻塞buffer操做符。解決方案是自定義一個線程池,而且固然線程池若是任務滿submit支持阻塞,能夠用自定義RejectedExecutionHandler來實現:

RejectedExecutionHandler executionHandler = (r, executor) -> {
     try {
         executor.getQueue().put(r);
     } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RejectedExecutionException("Producer thread interrupted", e);
     }
 };
 
 new ThreadPoolExecutor(poolSize, poolSize,
         0L, TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
         executionHandler);

3、總結

一、總結一下總體的執行流程

  1. 提交任務:提交數據支持同步異步兩種方式,支持多線程提交,正常狀況下響應很快,同步的方法若是隊列滿則阻塞。
  2. 豐富的操做符處理流式數據。
  3. buffer操做符產生的數據多線程處理:同步提交到單獨的消費者線程池,線程池任務滿則阻塞。
  4. 消費者線程池:支持阻塞提交,保證不丟消息,同時隊列長度設置成0,由於前面已經有隊列了。
  5. 背壓:消費者線程池阻塞後,會背壓到buffer操做符,並背壓到緩衝隊列,緩存隊列滿背壓到數據提交者。

二、和Flink的對比

實現的Flink的功能:

  • 不輸Flink的豐富操做符
  • 支持背壓,不丟數據

優點:輕量級,可直接在業務代碼中使用

劣勢:

  • 內部執行流程複雜,容易踩坑,不如Flink傻瓜化
  • 沒有watermark功能,也就意味着只支持無序數據處理
  • 沒有savepoint功能,雖然咱們用背壓解決了部分問題,可是宕機後開始會丟失緩存隊列和消費者線程池裏的數據,補救措施是添加Java Hook功能
  • 只支持單機,意味着你的緩存隊列不能設置無限大,要考慮線程池的大小,且沒有flink globalWindow等功能
  • 需考慮對上游數據源的影響,Flink的上游通常是mq,數據量大時可自動堆積,若是本文的方案上游是http、rpc調用,產生的阻塞影響就不能忽略。補償方案是每次提交數據都使用異步方法,若是失敗則提交到mq中緩衝並消費該mq無限重試。

4、附錄

本文源碼地址:https://github.com/sofn/reactor-window-like-flink

Reactor官方文檔:https://projectreactor.io/docs/core/release/reference/

Flink文檔:https://ci.apache.org/projects/flink/flink-docs-stable/

Reactive操做符:http://reactivex.io/documentation/operators.html


本文做者:木小豐,美團Java高級工程師,關注架構、軟件工程、全棧等,不按期分享軟件研發過程當中的實踐、思考。歡迎關注公共號:Java研發

本文連接:https://lesofn.com/archives/shi-yong-reactor-wan-cheng-lei-shi-de-flink-de-cao-zuo

相關文章
相關標籤/搜索