設計數據密集型應用第三部分:派生數據

  《Designing Data-Intensive Applications》的第一部分,基於單點(single node)介紹了數據系統的基礎理論與知識;在第二部分,則是將視野擴展到了分佈式數據系統,主要是Partition和Repliacation。在第三部分,則聚焦於派生數據系統。html

integrating multiple different data systems, potentially with different data models and optimized for different access patterns, into one coherent application architecture.node

  對於目前日益複雜的應用,沒有哪種單一的數據系統能夠知足應用的全部需求,因此本章就是介紹如何將不一樣的數據系統整合到單一應用中。數據系統能夠分爲兩類程序員

  system of record:原始數據,source of truthweb

  derived data system:派生數據系統,即數據來自其餘數據系統。派生數據系統包括但不限於:cache、索引、視圖,本質上派生數據是原始數據的冗餘,爲了性能而作的冗餘。數據庫

  值得注意的是,原始數據系統與派生數據系統的區別並不在於對應的工具,而在於在應用中的具體使用方式。windows

The distinction between system of record and derived data system depends not on the tool, but on how you use it in your application.api

本文地址:http://www.javashuo.com/article/p-gdfduprh-hr.html緩存

Batch Processing

  在某個層面,能夠數據系統進行如下分類服務器

  • Services (online systems)

    追求response time網絡

  • Batch processing systems (offline systems)

    追求throughput

  • Stream processing systems (near-real-time systems)

    sth between online and batch system

  本章討論的是批處理系統(Batch processing),MapReduce是批處理系統的典型表明,在MapReduce的諸多設計中,均可以看到unix的一些影子。

unix哲學

  • 一個程序只作一件事,作好這件事,若是有新需求,那麼從新寫一個程序,而不是在原來的程序上修修補補
  • 儘可能讓每一個程序的輸出作其餘程序的輸入,即輸出要通用、普適性,不要堅持交互式輸入
  • 設計和構造軟件,即便是操做系統,都要及早嘗試,對於不合理的地方,要絕不猶豫的推到重構
  • 儘早藉助工具來避免重複性的勞動,自動化

automation, rapid prototyping, incremental iteration, being friendly to experimentation, and breaking down large projects into manageable chunks

Separating the input/output wiring from the program logic makes

  unix pipeline的最大缺陷在於這些組合工具只能在單個機器上運行,須要擴展到多個節點時,就須要Hadoop這樣的分佈式系統

MapReduce

  與unix區別

  • 從單機到多機
  • stdin stdout 到 file

  關於MapReduce的原理與框架,以前在《典型分佈式系統分析:MapReduce》一文中描述過。下面關注一些不是在MapReduce論文中出現的一些討論。

Joins and Grouping

  批處理中,常常也須要join操做,經過join操做來補充完整一個事件(event)。在批處理中,既能夠在Map的時候join,也能夠在Reduce的時候join,以下所示

  

  event log中只記錄uid,而其餘屬性須要從user database(profile information)讀取,這樣避免了profile數據的冗餘

  每次經過網路去讀取user profile 顯然是不切實際的,拖慢批處理速度;並且因爲profile 是可變的,致使批處理 結果不是肯定性的。一個友好的解決辦法是:冗餘一份數據,放到批處理系統中。

  

  下面是一個reduce_side join的例子。稱之爲sort-merge join,由於無論是User event 仍是 User profile都按照userID進行hash,所以都一個用戶的event 和 profile會分配到都一個reducer。

  

 

批處理使用場景

  • 搜索引擎 search index

  關於增量建立search index,寫入新的segment文件,後臺批量合併壓縮。

  new segment files and asynchronously merges and compacts segment files in the background.

  • 機器學習與推薦

  通常來講,將數據寫入到一個key value store,而後給用戶查詢

  怎麼講批處理的結果導入到kvs? 直接導入是不大可能的。寫入到一個新的db,而後切換。

  build a brand-new database inside the batch job and write it as files to the job’s output directory in the distributed filesystem

Beyond MapReduce

  MapReduce的問題

  (1)比較底層,須要寫大量代碼:using the raw MapReduce APIs is actually quite hard and laborious

  解決辦法:higher-level programming models (Pig, Hive, Cascading, Crunch) were created as abstractions on top of MapReduce.

  (2) mapreduce execution model的問題,以下

Materialization of Intermediate State

  materiallization(物化)是指:每個MapReduce的輸出都須要寫入到文件再給下一個MapReduce Task Job。

  顯然,materiallization是提早計算,而不是按需計算。而Unix pipleline 是經過stream按需計算,只佔用少許內存空間。

  MapReduce相比unix pipeline缺陷

  1.   MapReduce job完成以後才能進行下一個,而unix pipeline是同時執行的
  2.   Mapper常常是多餘的:不少時候僅僅是出去上一個reducer的輸出
  3.   中間狀態的存儲也是要冗餘的,有點浪費

  dataflow engines如Spark、Tez、Flink試圖解決Mapreduce問題的

they handle an entire workflow as one job, rather than breaking it up into independent subjobs. 

  dataflow engines 沒有明顯的map reduce , 而是一個接一個的operator。其優點:

  • 避免了無謂的sort(mr 在map和reduce之間老是要sort)
  • 較少非必需的map task
  • 因爲知道整個流程,能夠實現locality optimizations
  • 中間狀態寫入內存或者本地文件,而不是HDFS
  • operator流水線工做,不一樣等到上一個stage徹底結束
  • 在運行新的operator時,能夠複用JVM

Stream Processing

  批處理與流處理的最大區別在於,批處理的輸入是肯定的、有限的,而流處理的輸入是源源不斷的,所以流處理系統通常比批處理系統有更好的實時性。

  流處理相關術語

  event:In a stream processing context, a record is more commonly known as an event

  producer、publisher、sender

  consumer、subscriber、recipient

  topic、stream,一組相關event

messaging system

  用於事件發生時,通知消費者,對於某個topic 通常是多生產者 多消費者。

  如何對消息系統分類:

  (1)What happens if the producers send messages faster than the consumers can process them?

  第一個問題,生產速度大於消費速度,對應的處理方式包括:丟包、緩存、流控(限制寫入速度)

  (2)What happens if nodes crash or temporarily go offline—are any messages lost?

  第二個問題,當節點crash或者臨時故障,消息會不會丟

event如何從producer到達consumer

(1)直達消息系統(沒有中間商)

  即一個event直接從producer到達consumer,如UDP廣播,brokerless : zeroMQ,這樣的系統有消息丟失的風險。

(2)message broker(message queue)

  定製化的DB

  異步過程

  保證消息可靠性

multi consumer

  shared subscriptions,一條消息任意一個consumer處理便可;負載均衡;可擴展性

  topic subscriptions 一條消息須要被不一樣的comsumer消費

  

  上圖(a)中的event只須要被任意一個consumer消費便可,而(b)中的每個event則須要被全部關注該topic的consumer處理

Acknowledgments and redelivery

  須要consumer的ack來保證消息已被消費,消息可能會被重複投遞,所以須要冪等性

  當 load balancing趕上redeliver,可能會出現messgae 亂序

logbased message brokers

  通常的消息隊列都是一次性消費,基於log的消息隊列能夠重複消費

The log-based approach trivially supports fan-out messaging, because several consumers can independently read the log without affecting each other—reading a message does not delete it from the log

  其優勢在於:持久化且immutable的日誌容許comsumer從新處理全部的事件

This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organization

 

Databases and Streams

  在log-based message broker有數據庫的影子,即數據在log中,那麼反過來呢,可否將message的思想應用於db,或者說db中是否自己就有message的思想?

  實際上是有的,在primary-secondary 中,primary寫oplog, produce event;secondary讀oplog, consume event。

Keep Systems in Sync

  一份數據以不一樣的形式保存多分,db、cache、search index、recommend system、OLAP

  不少都是使用full database dumps(batch process),這個速度太慢,又有滯後; 多寫(dual write)也是不現實的,增長應用層負擔、耦合嚴重。

Change Data Capture

  通常來講,應用(db_client)按照db的約束來使用db,而不是直接讀取、解析replication log。但若是能夠直接讀取,則有不少用處,例如用來建立serach index、cache、data warehouse。

  以下圖所示

  

  前面是DB(leader),中間是log-based message broker,後面是derived data system(cache, data warehouse) as followers

  這樣作的潛在問題是,日誌會愈來愈多,耗光磁盤,直接刪除就的log也是不行的,能夠週期性的log compaction:處理對一個key重複的操做,或者說已經被刪除的key。這樣也能解決新增長一個consumber,且consumber須要全部完整數據的狀況。

Event Sourcing

event sourcing involves storing all changes to the application state as a log of change events.

  CDC在數據層記錄,增刪改查,一個event可能對應多個data change;mutable

  event sourcing 在應用層記錄,immutable(不該該修改 刪除)

  event soucing 通常只記錄操做,不記錄操做後的結果,所以須要全部數據才能恢復當前的狀態

  週期性的snapshot有助於性能

  Commands and events: 兩者並不等價,Command只是意圖(好比想預約座位),只有經過檢查,執行成功,纔會生成對應的event,event 表明 fact

State, Streams, and Immutability

  

  上圖很是有意思:state是event stream的累計值,積分的效果,而stream是state的瞬時值,微分的效果

  Advantages of immutable events

  • immutable event log 有利於追溯到任意時間點,也能夠更容易從錯誤中恢復
  • immutable event log 比當前狀態有更多的信息:用戶添加物品到購物籃,而後從購物籃移除;從狀態來看,什麼都沒有發生,但event log卻意義豐富
  • Deriving several views from the same event log  當有event log,很容易回放event,產生新的數據視圖,而不用冒險修改當前使用的數據視圖,作到灰度升級

Processing Streams

  數據流應用普遍:

  1. 寫到其餘數據系統:db、cache等
  2. 推送給用戶,或者實時展現
  3. 產生其餘的數據流,造成鏈路

  stream processing 一般用於監控:風控、實時交易系統、機器狀態、軍事系統

  CEP(Complex event processing)是對特定事件的監控,對於stream,設置匹配規則,知足條件則觸發 complex event

  In these systems, the relationship between queries and data is reversed compared to normal databases.

  DB持久化數據,查詢是臨時的

  而CEP持久化的是查詢語句,數據時源源不斷的

Reasoning About Time

  批處理通常使用event time,而流處理可能採用本地時間(stream processing time),這可能致使不許確的時間窗口(尤爲兩個時間差抖動的時候)

  以event time做爲時間窗口的問題:不肯定是否收到了某個window內全部的event。

  一般,須要結合使用本地時鐘與服務器時鐘,考慮一個狀況,客戶端採集日誌發送到服務器,在未聯網的時候本地緩存。若是用本地時間,本地時間可能不許,用服務器時間,不能反映事件發生的時刻(可能過了很長時間才從緩存發送到服務器),解決辦法:

  1.   用device clock記錄事件發生時間;
  2.   用device clock記錄事件上傳時間;
  3.   用server clock記錄服務器收到event的時間

  用(3)減去(2)能夠獲得時間誤差(忽略了網絡延時),在用(1)加上這個時間誤差就獲得了事件的真正發生時間。

Types of windows

  • 滾動 tumbling window

  正交的時間塊,一個5分鐘,接下來又一個5分鐘

  • 跳動 hopping window

  相交的時間塊,5分鐘,而後前進1分鐘,有一個5分鐘

  • 滑動 Sliding window

  無固定的邊界,一點點向前滑

Stream Joins

  流處理系統中也是須要一些join操做

  • stream-stream joins

  for example.click-through rate 網頁搜索、點擊事件

  • stream-table joins(stream enrichment)

  a set of user activity events and a database of user profiles

  • Time-dependence of joins

  if events on different streams happen around a similar time, in which order are they processed?

  好比跨境交易,匯率是實時變化的,那麼交易事件與事件發生時間的匯率綁定。解決辦法是 交易事件裏面維護當時的匯率id,但這致使無法作log compaction

Fault Tolerance

  stream processing system的容錯性,batch processing 能夠作到 exactly-once semantics,雖然可能會有失敗-重試。對於流處理

  • Microbatching and checkpointing 作到了可重試
  • Idempotence 保證了可重複執行

The Future of Data Systems

Data Integration

  the most appropriate choice of software tool also depends on the circumstances.

  in complex applications, data is often used in several different ways. There is unlikely to be one piece of software that is suitable for all the different circumstances in which the data is used

  複雜的應用可能須要集成多個數據系統,讓單個數據系統各司其職,那麼如何保證多個數據系統數據的一致性:以其中一個數據系統爲準(Primary),而後經過CDC或者event sourcing複製到其餘數據系統。

Derived data versus distributed transactions

  分佈式事務經過互斥鎖決定寫操做順序;而CDC 使用log來保證順序

  分佈式事務經過atomic保證只生效一次;而log based依賴於肯定性的重試與冪等

The lambda architecture

batch processing is used to reprocess historical data, and stream processing is used to process recent updates, then how do you combine the two?

  結合批處理與流處理

  •   批處理:慢可是準確
  •   流處理:快速但不必定精確

  潛在的問題:

  •   在batch、stream processing framework上維護兩份同樣的邏輯
  •   batch pipeline、stream pipeline輸出不一樣,致使讀取的時候須要merge
  •   增量batch 須要解決時間窗口、stragglers 問題

  解決辦法以下:Unifying batch and stream processing

Unbundling Databases

Unix and relational databases have approached the information management problem with very different philosophies.

  Unix 提供了是比較底層的對硬件的封裝,thin wrapper; 而relationaldb 對程序員提供high-level抽象

Composing Data Storage Technologies

it seems that there are parallels between the features that are built into databases and the derived data systems that people are building with batch and stream processors.

  數據庫的feature(secondary index、view、replication log, full-text search index)與 derived data system有一些相似之處

  以建立新索引爲例:

  • 快照,而後處理已有數據
  • 處理在上一步過程當中新加入的數據
  • 索引建立完畢,處理後續數據

  這個過程相似於

  • 增長新的secondary(follower)
  • 在流處理系統中增長新的消費者

Observing Derived State

  

  write path:precomputed; eager evaluation

   whenever some piece of information is written to the system, it may go through multiple stages of batch and stream processing, and eventually every derived dataset is updated to incorporate the data that was written

  read path:lazy evaluation

   when serving a user request you read from the derived dataset, perhaps perform some more processing on the results, and construct the response to the user.

  The derived dataset is the place where the write path and the read path meet, as illustrated in Figure 12-1. It represents a trade-off between the amount of work that needs to be done at write time and the mount that needs to be done at read time.

  derived dataset是write path與read path鏈接的地方,是寫入時處理與讀取時工做量的折中。caches, indexes, and materialized views 都是在write path上多作一些工做,減輕read path負擔

  寫 與 讀的折中;twinter的例子,名人 普通人策略不同

Stateful, offline-capable clients

  當前互聯網應用都是client server模式,client無狀態,數據都在server;但single path application或者mobile app在斷網的時候也能使用,提供更好的用戶體驗;並且,web-socket等技術提供了server主動向client推送的能力,這就是的write-path 進一步擴展到了客戶端

  大多數的db,lib,framework、protocol都是按照staleless and request/response的思想來設計的,根深蒂固

In order to extend the write path all the way to the end user, we would need to fundamentally rethink the way we build many of these systems: moving away from request response interaction and toward publish/subscribe dataflow

Aiming for Correctness

  build applications that are reliable and correct

  In this section I will suggest some ways of thinking about correctness in the context of dataflow architectures.

The End-to-End Argument for Databases

  即便使用了強事務性,也不能保證數據不會有問題,由於因爲代碼bug、人爲錯誤,致使數據的損壞 丟失,immutable and append-only data helps

  考慮一個複雜的問題 exactly-once

  其中一個解決辦法:Idempotent(冪等)。 但須要額外的一些工做,並且須要很是細心的實現

 

  TCP的seq number也是爲了保證excat once, 但這隻對單個TCP鏈接生效

In many databases, a transaction is tied to a client connection。 If the client suffers a network interruption and connection timeout after sending the COMMIT, but before hearing back from the database server, it does not know whether the transaction has been committed or aborted。

  2pc break the 1:1 mapping between a TCP connection and a transaction,所以 suppress duplicate transactions between the database client and server;可是end-user與 application server之間呢

  終極解決辦法,Unique Operation ID

  

  Besides suppressing duplicate requests, the requests table in Example 12-2 acts as a kind of event log, hinting in the direction of event sourcing

  除了保證點到點的約束,也充當了event log,能夠用於event sourcing

Enforcing Constraints

  約束:如unique constraint,帳戶餘額不能爲負等

  經過consume log來實現約束:

  Its fundamental principle is that any writes that may conflict are routed to the same partition and processed sequentially

Timeliness and Integrity

  consistency可能包含兩重意義

  及時性(Timeliness ):user讀取到的是實時狀態

  完整性(Integrity):user讀取到的是完整的狀態

violations of timeliness are 「eventual consistency,」 whereas violations of integrity are 「perpetual inconsistency.」

 

  ACID同時保證及時性與完整性,但基於時間的數據流通常不保證及時性,exactly-once保證完整性

  在數據流系統如何保證完整性?

  (1) 寫操做是一個單一的message,原子性寫入

  (2)derived datasystem從單一消息肯定性地提取狀態

  (3)客戶端生成reqid,在整個流程用整個reqid保證冪等性

  (4)單一消息不可變,持久化,容許derived datasystem從新處理全部消息

 

  儘可能避免協調的數據系統 Coordination-avoiding data systems

  (1)數據流系統經過derived data,無需原子性提交、線性、跨分片的同步協調就能保證完整性

  (2)雖然嚴格的unique 約束要求實時性(timeliness)和協調,不少應用能夠經過過後補償放鬆約束

Trust, but Verify

  對數據完整性的校驗,防止數據默默出錯silent corruption,多副本不能解決這個問題

  基於事件的系統提供了更好的可審計性, 如記錄A給B轉帳,比記錄A扣錢,B加錢更好

  Checking the integrity of data systems is best done in an end-to-end fashion

Doing the Right Thing

  軟件和數據大大影響了咱們生存的世界,對於咱們這些工程師,須要承擔起責任:建立一個充滿人文關懷和尊重的世界。

  科技是放大鏡:放大了善與惡

相關文章
相關標籤/搜索