小規模的流處理框架.Part 1: thread pools

原文連接 做者:Tomasz Nurkiewicz 譯者:simonwang
(譯者:強力推薦這篇文章,做者設計了一個用於小流量的流式數據處理框架,並詳細給出了每個須要注意的設計細節,對比了不一樣設計方案的優缺點,可以讓你對流處理過程,某些設計模式和設計原則以及指標度量工具備一個更深入的認識!)
GeeCON 2016上我爲個人公司準備了一個編程競賽,此次的任務是設計並實現一個可以知足如下要求的系統:
html

系統可以每秒處理1000個任務,每個Event至少有2個屬性:java

  • clientId-咱們但願每一秒有多個任務是在同一個客戶端下處理的(譯者:不一樣的clientId對應不一樣的ClientProjection,即對應不一樣的一系列操做)
  • UUID-全局惟一的

消費一個任務要花費10毫秒,爲這樣的流設計一個消費者:算法

  1. 可以實時的處理任務
  2. 和同一個客戶端有關的任務應該被有序地處理,例如你不能對擁有同一個clientId的任務序列使用並行處理
  3. 若是10秒內出現了重複的UUID,丟棄它。假設10秒後不會重複

有幾個關於以上要求的重要細節:數據庫

  1. 1000events/s的任務量,消耗一個event要10ms,1s內能消耗100個event,那麼爲了保證明時性,就須要10個併發的消費者。
  2. events擁有彙集的ID(clientId),在1s內咱們但願多個event可以被指定到同一個給定的client上,而且咱們不可以併發地或無序地處理這些event。
  3. 咱們必須以某種方式忽略重複的信息,最可能的方法就是記住最近10s內全部的ID,這就須要暫時保存一萬個UUID。

在這篇文章中,我會引導大家使用一些成功的方案並作一些小小的突破,你將要學習如何使用精確地有針對性的度量器來解決問題。編程

Naive sequential processing

咱們能夠在迭代器中處理這個問題,首先咱們能夠對API作一些假設,想象一下它會是這個樣子:設計模式

01 interface EventStream {
02  
03     void consume(EventConsumer consumer);
04  
05 }
06  
07 @FunctionalInterface
08 interface EventConsumer {
09     Event consume(Event event);
10 }
11  
12 @Value
13 class Event {
14  
15     private final Instant created = Instant.now();
16     private final int clientId;
17     private final UUID uuid;
18  
19 }

一個典型的推送式API,和JMS很像。須要注意的是EventConsumer是阻塞的,這就意味着它不會返回新的Event,除非前一個已經被處理完畢了。這僅僅是我作出的一個假設,並且它沒有太大的違反以前的要求,這也是JMS中消息偵聽者的工做機制。下面是一個簡單的實現,這個實現只是簡單的添加了一個工做間隔爲10ms的偵聽器:安全

1 class ClientProjection implements EventConsumer {
2  
3     @Override
4     public Event consume(Event event) {
5         Sleeper.randSleep(101);//譯者:這裏只是用睡眠來代替實際編程中一些耗時的操做
6         return event;
7     }
8  
9 }

固然在現實生活中這個consumer可能會在數據庫中作一些存儲操做,或者進行遠程調用等等。我在睡眠時間的分佈上添加了一些隨機性,目的是使得手動測試更加貼近實際狀況(譯者:實際狀況中耗時操做的用時不盡相同,因此要隨機化):數據結構

01 class Sleeper {
02  
03     private static final Random RANDOM = new Random();
04  
05     static void randSleep(double mean, double stdDev) {
06         final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
07         try {
08             TimeUnit.MICROSECONDS.sleep((long) micros);
09         catch (InterruptedException e) {
10             throw new RuntimeException(e);
11         }
12     }
13  
14 }
15  
16 //...
17  
18 EventStream es = new EventStream();  //some real implementation here
19 es.consume(new ClientProjection());

以上的代碼可以編譯並運行,但爲了知足設計要求咱們必需要插入一些度量器。最重要的度量器就是有關於信息消費的潛伏期,這個潛伏期指的是從信息的產生到開始處理的這段時間。咱們使用 Dropwizard Metrics來實現這個潛伏期的度量:多線程

01 class ClientProjection implements EventConsumer {
02  
03     private final ProjectionMetrics metrics;
04  
05     ClientProjection(ProjectionMetrics metrics) {
06         this.metrics = metrics;
07     }
08  
09     @Override
10     public Event consume(Event event) {
11         metrics.latency(Duration.between(event.getCreated(), Instant.now()));
12         Sleeper.randSleep(101);
13         return event;
14     }
15  
16 }

ProjectionMetrics類的功能以下(主要就是將event的潛伏期用柱狀圖的形式表現出來):併發

01 import com.codahale.metrics.Histogram;
02 import com.codahale.metrics.MetricRegistry;
03 import com.codahale.metrics.Slf4jReporter;
04 import lombok.extern.slf4j.Slf4j;
05  
06 import java.time.Duration;
07 import java.util.concurrent.TimeUnit;
08  
09 @Slf4j
10 class ProjectionMetrics {
11  
12     private final Histogram latencyHist;
13  
14     ProjectionMetrics(MetricRegistry metricRegistry) {
15         final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
16                 .outputTo(log)
17                 .convertRatesTo(TimeUnit.SECONDS)
18                 .convertDurationsTo(TimeUnit.MILLISECONDS)
19                 .build();
20         reporter.start(1, TimeUnit.SECONDS);
21         latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class"latency"));
22     }
23  
24     void latency(Duration duration) {
25         latencyHist.update(duration.toMillis());
26     }
27 }

如今當你運行這個解決方案時,你很快就會發現潛伏期的中值和第99.9%的值(分別指的是第count/2個值和第99.9%*count個值)都在無限增加:

1 type=HISTOGRAM, [...] count=84,   min=0,  max=795,   mean=404.88540608274104, [...]
2     median=414.0,   p75=602.0,   p95=753.0,   p98=783.0,   p99=795.0,   p999=795.0
3 type=HISTOGRAM, [...] count=182,  min=0,  max=1688,  mean=861.1706371990878,  [...]
4     median=869.0,   p75=1285.0,  p95=1614.0,  p98=1659.0,  p99=1678.0,  p999=1688.0
5  
6 [...30 seconds later...]
7  
8 type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]
9     median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0

在運行了30s以後咱們的應用程序處理event會出現平均15s的延遲,所以它並不具有完整的實時性,顯然缺乏併發纔是緣由所在。咱們的ClientProjection事件消費者會花費10ms去完成事件處理,因此它每秒最多能夠處理100個event,然而咱們須要更多的處理量。咱們必需要加強ClientProjection同時不違反其餘的設計要求!

Naive thread pool

最顯而易見的解決方法是對EventConsumer使用多線程技術,最簡單的實現途徑就是利用ExecutorService:

01 import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03  
04 class NaivePool implements EventConsumer, Closeable {
05  
06     private final EventConsumer downstream;
07     private final ExecutorService executorService;
08  
09     NaivePool(int size, EventConsumer downstream) {
10         this.executorService = Executors.newFixedThreadPool(size);
11         this.downstream = downstream;
12     }
13  
14     @Override
15     public Event consume(Event event) {
16         executorService.submit(() -> downstream.consume(event));
17         return event;
18     }
19  
20     @Override
21     public void close() throws IOException {
22         executorService.shutdown();
23     }
24 }

這裏咱們使用了裝飾者模式。最初的ClientProjection實現EventConsumer是可行的,但咱們利用加入了併發的另外一個EventConsumer實現對ClientProjection進行包裝。這就容許咱們可以將更復雜的行爲組合起來而不用更改ClientProjection自己,這種設計能夠:

  • 解耦:不一樣的EventConsumer互不影響,但它們卻能夠自由地組合在一塊兒,在同一個線程池中工做
  • 單一職責:每一個EventConsumer只作一項工做,並將本身委託給下一個組件即線程池
  • 開放/關閉原則:咱們能夠改變系統的行爲卻不用修改現有實現

開放/關閉原則一般能夠經過注入策略模式和模板方法模式來實現,這很簡單。總體的代碼以下:

01 MetricRegistry metricRegistry =
02         new MetricRegistry();
03 ProjectionMetrics metrics =
04         new ProjectionMetrics(metricRegistry);
05 ClientProjection clientProjection =
06         new ClientProjection(metrics);
07 NaivePool naivePool =
08         new NaivePool(10, clientProjection);
09 EventStream es = new EventStream();
10 es.consume(naivePool);

咱們寫的度量器顯示這種改良的方案確實表現的更好:

1 type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]
2     median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0
3 type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]
4     median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0
5  
6 [...30 seconds later...]
7  
8 type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]
9     median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0

咱們能夠看到延遲雖然也在增加但規模卻小得多,30s後潛伏期達到了364ms。這種潛伏期增加是系統問題,咱們須要更多的度量器。注意到NaivePool(你會明白爲何這裏是naive-初級的)會開啓10條線程,這應該足以處理1000個event,每一個要花費10ms。在實際狀況下,咱們須要一點額外的處理容量來避免因垃圾回收或小規模峯值負荷所帶來的問題。爲了證實線程池纔是咱們的瓶頸,咱們要監控它內部的隊列,這須要一點小小的工做量:

01 class NaivePool implements EventConsumer, Closeable {
02  
03     private final EventConsumer downstream;
04     private final ExecutorService executorService;
05  
06     NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
07         LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
08         String name = MetricRegistry.name(ProjectionMetrics.class"queue");
09         Gauge<Integer> gauge = queue::size;
10         metricRegistry.register(name, gauge);
11         this.executorService =
12                 new ThreadPoolExecutor(
13                         size, size, 0L, TimeUnit.MILLISECONDS, queue);
14         this.downstream = downstream;
15     }
16  
17     @Override
18     public Event consume(Event event) {
19         executorService.submit(() -> downstream.consume(event));
20         return event;
21     }
22  
23     @Override
24     public void close() throws IOException {
25         executorService.shutdown();
26     }
27 }

這裏使用ThreadPoolExecutor的目的是爲了可以提供自定義的LinkedBlockingQueue實例,接下來就能夠監控隊列的長度(see:ExecutorService – 10 tips and tricks)。Gauge會週期性地調用queue::size,你須要的時候就會提供隊列的長度。度量器顯示線程池的大小確實是一個問題:

1 type=GAUGE, name=[...].queue, value=35
2 type=GAUGE, name=[...].queue, value=52
3  
4 [...30 seconds later...]
5  
6 type=GAUGE, name=[...].queue, value=601

不斷增加的隊列長度進一步加重了隊列內正在等待着的task的潛伏期,將線程池的大小增長到10到20之間,最終隊列的長度顯示合理而且沒有失控。然而咱們仍然沒有解決重複ID問題,而且也沒有解決同一個clientId可能會對它的events進行併發處理的問題。

Obscure locking

讓咱們從避免對擁有相同clientId的events使用並行處理開始。若是兩個有相同clientId的event一個接一個地來,相繼進入線程池隊列,那麼NaivePool會幾乎同時將它們取出隊列實現並行處理。開始的時候咱們可能會想到對每個clientId加一個Lock:

01 @Slf4j
02 class FailOnConcurrentModification implements EventConsumer {
03  
04     private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
05     private final EventConsumer downstream;
06  
07     FailOnConcurrentModification(EventConsumer downstream) {
08         this.downstream = downstream;
09     }
10  
11     @Override
12     public Event consume(Event event) {
13         Lock lock = findClientLock(event);
14         if (lock.tryLock()) {
15             try {
16                 downstream.consume(event);
17             finally {
18                 lock.unlock();
19             }
20         else {
21
相關文章
相關標籤/搜索