http://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/41378.pdfweb
由於當前的其餘的流式系統,沒法同時知足 fault tolerance, versatility, and scalability 的需求。express
Spark Streaming [34] and Sonora [32] do excellent jobs of efficient checkpointing, but limit the space of operators that are available to user code. windows
S4 [26] does not provide fully fault-tolerant persistent state後端
Storm’s [23] exactly-once mechanism for record delivery, Trident [22], requires strict transaction ordering to operate. 網絡
Streaming SQL systems [1] [2] [5] [6] [21] [24] provide succinct and simple solutions to many streaming problems, but intuitive state abstractions and complex application logic (e.g. matrix multiplication) are more naturally expressed using the operational flow of an imperative language rather than a declarative language like SQL.架構
Note,imperative language, declarative language, function language。refer:http://stackoverflow.com/questions/1784664/what-is-the-difference-between-declarative-and-imperative-programmingapp
先描述應用場景,async
Google’s Zeitgeist pipeline is used to track trends in web queries.
This pipeline ingests a continuous input of search queries and performs anomaly detection, outputting queries which are spiking or dipping as quickly as possible.分佈式
Google’s Zeitgeist 這個服務用於 track web 查詢的趨勢的,對持續的 search queries 進行 anomaly detection,儘量快的發現spiking or dipping。ide
架構以下,
Our approach is to bucket records into one-second intervals and to compare the actual traffic for each time bucket to the expected traffic that the model predicts.
If these quantities are consistently different over a non-trivial number of buckets, then we have high confidence that a query is spiking or dipping.
In parallel, we update the model with the newly received data and store it for future use.
場景中關鍵的幾點,
Persistent Storage: It is important to note that this implementation requires both short- and long-term storage.
A spike may only last a few seconds, and thus depend on state from a small window of time, whereas model data can correspond to months of continuous updates.
LowWatermarks:
在現實的場景中,網絡環境是很複雜的,當一個時間點出現dipping的時候,有兩種可能性,
真正的dipping,這個點query確實變少了
因爲網絡或其餘問題,數據被delay了,尚未收到
那麼天然產生的問題,我如何知道這個時間點的數據是否到齊?
MillWheel addresses this by providing a low watermark for incoming data for each processing stage (e.g. Window Counter, Model Calculator), which indicates that all data up to a given timestamp has been received.
MillWheel提供 low watermark機制來告訴你何時數據會到齊。
固然low watermark每每也是啓發式獲得的,其實並不能完美的解這個問題,只能說若是過了 low watermark 尚未數據來,咱們有 high confidence 來講應該是沒有數據,而不是被delay
Duplicate Prevention: For Zeitgeist, duplicate record deliveries could cause spurious spikes.
咱們要在平臺層面保證exactly-once
整理出的詳細需求以下:
• Data should be available to consumers as soon as it is published (i.e. there are no system-intrinsic barriers to ingesting inputs and providing output data). 好比micro-batch就是種 system-intrinsic barriers
• Persistent state abstractions should be available to user code, and should be integrated into the system’s overall consistency model.
• Out-of-order data should be handled gracefully by the system. 能夠處理時間亂序的數據
• A monotonically increasing low watermark of data timestamps should be computed by the system. 系統會生成 low watermarker
• Latency should stay constant as the system scales to more machines. 保證 latency
• The system should provide exactly-once delivery of records. 保證 exactly-once 語義
Abstractly, inputs and outputs in MillWheel are represented by (key, value, timestamp) triples.
Computations,等同於Bolt
Application logic lives in computations, which encapsulate arbitrary user code.
Keys
Keys are the primary abstraction for aggregation and comparison between different records in MillWheel.
For every record in the system, the consumer specifies a key extraction function, which assigns a key to the record.
注意在,millwhell中,相同key的record是被串行處理的,只有不一樣key的record才能夠被並行處理
Streams,等同於Storm裏面的流
Streams are the delivery mechanism between different computations in MillWheel.
Persistent State
In its most basic form, persistent state in MillWheel is an opaque byte string that is managed on a per-key basis.
The user provides serialization and deserialization routines (such as translating a rich data structure in and out of its wire format), for which a variety of convenient mechanisms (e.g. Protocol Buffers [13]) exist.
Persistent state is backed by a replicated, highly available data store (e.g. Bigtable [7] or Spanner [9]), which ensures data integrity in a way that is completely transparent to the end user.
Common uses of state include counters aggregated over windows of records and buffered data for a join.
這裏persistent state,能夠認爲是checkpoint,注意,MillWheel的checkpoint是 per-key basis的,能夠在MillWheel起到很關鍵的做用
用戶須要提供序列號和反序列化的邏輯,這些checkpoint每每被存到像bigtable這樣的分佈式存儲中
每每像有狀態的computation就須要存persistent state,好比基於窗口的聚合計數,或流join
Low Watermarks
對於computation,當給定low watermark,就不該該收到比它還早的數據
Definition: We provide a recursive definition of low watermarks based on a pipeline’s data flow.
min(oldest work of A, low watermark of C : C outputs to A)
oldest work of A,是A中最老的record的時間戳
而C是A的父節點,那麼A的low watermark不可能比C遲,由於A必定比C遲收到數據,因此A的low watermark必定是小於等於C的low watermark的
這樣遞歸的結果是,最終low watermark會取決於injector(即,源),而對於injector的input,確定是外部系統好比kafka這樣的隊列,或文件系統,那麼injector怎麼知道它的low watermark
injector實際上是不知道的,只能作estimate,好比對於文件系統,能夠以文件的create時間做爲low watermark,文件裏面必定不會有比create time更早的記錄
因此low watermark機制,是沒法完美解這個問題的,都會有too fast,too late的問題
Timers,即trigger,解決when的問題
A simple implementation of dips in Zeitgeist would set a low watermark timer for the end of a given time bucket, and report a dip if the observed traffic falls well below the model’s prediction.
終於到了關鍵的地方了,
Exactly-Once Delivery
MillWheel是如何保證exactly-once語義的,
Upon receipt of an input record for a computation, the MillWheel framework performs the following steps:
• The record is checked against deduplication data from previous deliveries; duplicates are discarded.
• User code is run for the input record, possibly resulting in pending changes to timers, state, and productions.
• Pending changes are committed to the backing store.
• Senders are ACKed.
• Pending downstream productions are sent.
兩點須要注意的,
一是,它會去重,這樣能夠保證exactly-once,如何去後面說
其實通常的streaming系統均可以作到at-least once,因此作到exactly-once,只須要作到去重便可
你能夠依賴外部存儲,或者系統裏面直接作掉
二是,對中間狀態作checkpoint
MillWheel如何在系統層面作去重,
The system assigns unique IDs to all records at production time.
We identify duplicate records by including this unique ID for the record in the same atomic write as the state modification.
If the same record is later retried, we can compare it to the journaled ID, and discard and ACK the duplicate.
經過爲每一個record增長unique id
爲了快速知道這個id是否出現過,使用bloom filter
Since we cannot necessarily store all duplication data in-memory, we maintain a Bloom filter of known record fingerprints, to provide a fast path for records that we have provably never seen before.
若是filter miss,咱們須要讀後端存儲才能判斷是不是duplicate
In the event of a filter miss, we must read the backing store to determine whether a record is a duplicate.
這個怎麼實現?怎麼判斷是filter miss,仍是新出現的record?出現duplicate畢竟不是常常發生的
爲了防止record id爆掉,須要回收,有個問題?回收後,bloom filter須要從新初始化嗎,仍是說bloom filter自己是支持過時的
Record IDs for past deliveries are garbage collected after MillWheel can guarantee that all internal senders have finished retrying.
Strong Productions
We checkpoint produced records before delivery in the same atomic write as state modification.
We call this pattern of checkpointing before record production strong productions.
這部用以保證at-least once,storm是經過spout超時重發的,後續的系統不多繼續沿用這個方式,由於這樣作週期太長
Millwheel或Linkedin的Samza都是採用local重發的方式,好比MillWheel,在produce record以前,會把checkpoint和狀態修改放在一個原子寫中作掉,checkpoint每每寫入bigtable中
固然下層節點,成功處理完該record,會send回acker,這時,咱們能夠把checkpoint刪除
若是這時crash,咱們能夠come back時,從checkpoint中讀出record,從新produce
若是以前不作checkpoint,當come back時,會以當前狀態(好比計數,有可能新到數據已產生更新)來produce,這樣就會產生不一致
另外區別於persistent state,這裏checkpoint特指produced record
Weak Productions and Idempotency
MillWheel經過 record id 和 Strong Production 來保證 exactly-once 語義,這其中也是有不少代價的,有些場景不須要保證exactly-once,at-least onces就足夠了,好比不少無狀態的場景
因此他提供Weak production來知足這種需求。
不須要保證exactly-once,就不去重就ok了,disabling exactly-once can be accomplished simply by skipping the deduplication pass
是否是checkpoint produced records也能夠徹底去掉了,直接produce,而後等ack,失敗或超時就重發,那這樣就和storm同樣了,鏈路長的時候,週期會很長
MillWheel提供的優化就是 weak productions,
好比,對於A-》B-》C的鏈路
B-》C的produce,超過1s尚未返回
咱們這時候,對該produce進行checkpoint,而後直接ack A,避免A繼續等待
固然B會繼續等待,直到收到C的ack,纔將該checkpoint刪除
若是此時B Crash,那麼當B restart,他會本身去replay上次的produce,對A透明,直到成功,纔會刪除checkpoint
In implementing mechanisms to manipulate user state in MillWheel, we discuss both the 「hard」 state that is persisted to our backing store and the 「soft」 state which includes any in-memory caches or aggregates.
We must satisfy the following user-visible guarantees:
• The system does not lose data.
• Updates to state must obey exactly-once semantics.
• All persisted data throughout the system must be consistent at any given point in time.
• Low watermarks must reflect all pending state in the system.
• Timers must fire in-order for a given key.
首先,爲了不不一致,全部per-key的操做,包含persist,checkpoint,狀態更新,都會在一個原子寫中完成
To avoid inconsistencies in persisted state (e.g. between timers, user state, and production checkpoints), we wrap all per-key updates in a single atomic operation.
再者,對於殭屍writer或因爲網絡延遲致使的延遲寫,採用sequencer的方式,每一個寫都有sequence id,過時的寫請求會被丟棄;而且在新的workers啓動時須要invalid以前的sequencers
As work may shift between machines (due to load balancing, failures, or other reasons) a major threat to our data consistency is the possibility of zombie writers and network remnants issuing stale writes to our backing store.
To address this possibility, we attach a sequencer token to each write, which the mediator of the backing store checks for validity before allowing the write to commit.
New workers invalidate any extant sequencers before starting work, so that no remnant writes can succeed thereafter.
因此,對於MillWheel,對於一個給定的key,只能有一個worker writer有權限執行寫操做,這個是MillWheel保證寫一致性的關鍵
Thus, we can guarantee that, for a given key, only a single worker can write to that key at a particular point in time.
In order to quickly recover from unplanned process failures, each computation worker in MillWheel can checkpoint its state at an arbitrarily fine granularity (in practice, sub-second or per-record granularity is standard, depending on input volume). Our use of always-consistent soft state allows us to minimize the number of occasions when we must scan these checkpoints to specific cases – machine failures or load-balancing events. When we do perform scans, these can often be asynchronous, allowing the computation to continue processing input records while the scan progresses.