主要的思想,
將全部的系統均可以看做兩部分,真正的數據log系統和各類各樣的query engine
全部的一致性由log系統來保證,其餘各類query engine不須要考慮一致性,安全性,只須要不停的從log系統來同步數據,若是數據丟失或crash能夠從log系統replay來恢復
能夠看出kafka系統在linkedin中的重要地位,不光是data aggregation,而是整個系統的核心node
log定義
很簡單的結構,最關鍵的屬性是,append-only和有序性
A log is perhaps the simplest possible storage abstraction.
It is an append-only, totally-ordered sequence of records ordered by time.react
The ordering of records defines a notion of "time" since entries to the left are defined to be older then entries to the right.
The log entry number can be thought of as the "timestamp" of the entry. Describing this ordering as a notion of time seems a bit odd at first, but it has the convenient property that it is decoupled from any particular physical clock. This property will turn out to be essential as we get to distributed systems.git
log和分佈式系統有何關係
The answer is that logs have a specific purpose: they record what happened and when.
For distributed data systems this is, in many ways, the very heart of the problem.github
和普通log的區別
Application log using syslog or log4j is unstructured error messages or trace info
The biggest difference is that text logs are meant to be primarily for humans to read and the "journal" or "data logs" I'm describing are built for programmatic access.
這裏討論的是普遍的log的概念,而平常咱們經常使用到的應用log是它的特例,最大的區別是
平常的log是非結構化的,用於人閱讀的,而咱們討論的log是結構化的可用於機器閱讀的算法
Logs in databases
Logs最先用於db,用於保證ACID,漸漸的也被用於數據備份
The usage in databases has to do with keeping in sync the variety of data structures and indexes in the presence of crashes. To make this atomic and durable, a database uses a log to write out information about the records they will be modifying, before applying the changes to all the various data structures it maintains.
Over-time the usage of the log grew from an implementation detail of ACID to a method for replicating data between databases. It turns out that the sequence of changes that happened on the database is exactly what is needed to keep a remote replica database in sync.apache
Logs in distributed systems
分佈式系統須要解決的核心問題就是一致性問題,即全序問題
在同樣的初始狀態下,執行相同順序的指令序列,會獲得相同的最終狀態,這就保證了一致性
而log能夠用於model這樣的一致性問題,由於log決定了其中每一個entry之間的全序關係
但解決分佈式一致性問題,須要使用paxos協議,而算法結果能夠用log來記錄bootstrap
State Machine Replication Principle:
If two identical, deterministic processes begin in the same state and get the same inputs in the same order, they will produce the same output and end in the same state. windows
The purpose of the log here is to squeeze all the non-determinism out of the input stream to ensure that each replica processing this input stays in sync.
The distributed log can be seen as the data structure which models the problem of consensus. A log, after all, represents a series of decisions on the "next" value to append. You have to squint a little to see a log in the Paxos family of algorithms, though log-building is their most common practical application.api
There are a multitude of ways of applying this principle in systems depending on what is put in the log.
For example, we can log the incoming requests to a service, or the state changes the service undergoes in response to request, or the transformation commands it executes. Theoretically, we could even log a series of machine instructions for each replica to execute or the method name and arguments to invoke on each replica. As long as two processes process these inputs in the same way, the processes will remaining consistent across replicas.
使用log來備份分佈式數據的方式有兩種,
"state machine model" ,同步全部的request,優勢傳輸數據小,缺點若是發生數據丟失或錯誤沒法補救
"primary-backup model" ,直接同步最終數據,缺點傳輸數據大,優勢是數據丟失沒有關係,由於每次都會更新最新數據
Changelog 101: Tables and Events are Dual
在DB使用中,既保留存儲最終狀態的tables,也保留反映中間過程的events(log)
這樣不但能夠簡單的從初始狀態replay出最終結果,並且也能夠獲得全部的中間結果
在這點上source control and databases很類似,由於他們都是須要管理分佈式的併發更新
The magic of the log is that if it is a complete log of changes, it holds not only the contents of the final version of the table, but also allows recreating all other versions that might have existed. It is, effectively, a sort of backup of every previous state of the table.
This might remind you of source code version control. There is a close relationship between source control and databases. Version control solves a very similar problem to what distributed data systems have to solve—managing distributed, concurrent changes in state.
什麼是數據集成
"Data integration is making all the data an organization has available in all its services and systems."
This phrase "data integration" isn't all that common, but I don't know a better one. The more recognizable term ETL usually covers only a limited part of data integration—populating a relational data warehouse. But much of what I am describing can be thought of as ETL generalized to cover real-time systems and processing flows.
數據處理的幾個步驟
1. capturing all the relevant data, being able to put it together, and modeled in a uniform way to make it easy to read and process
2. work on infrastructure to process this data in various ways—MapReduce, real-time query systems, etc with reliable and complete data flow
3. good data models and consistent well understood semantics
4. better visualization, reporting, and algorithmic processing and prediction
問題
你們每每忽略第二步中的reliable complete data flow,而直接跳到3, 4步,急功近利的表現
In my experience, most organizations have huge holes in the base of this pyramid—they lack reliable complete data flow—but want to jump directly to advanced data modeling techniques. This is completely backwards.
So the question is, how can we build reliable data flow throughout all the data systems in an organization? 答案是經過log系統
Log-structured data flow
The log is the natural data structure for handling data flow between systems. The recipe is very simple:
Take all the organization's data and put it into a central log for real-time subscription.
其實在linkedin中,就是基於Kafka系統,咱們使用Kafka系統有2年時間,雖然也相似這樣使用,可是對它的做用到此才理解的那麼深入
1. 首先用於子系統間的解耦合,每一個子系統都只知道從cental log中異步的讀寫數據,而不須要知道其餘的子系統
2. kafka系統做爲中間的buffer,能夠調節各個子系統之間的produce和consume速度上的差別,而且支持某些consumer crash後的replay
3. 全部數據都在Kafka上,使得數據集成工做變得很是有效,並能夠實時的進行數據集成
在以前,數據集成工做是額外的工做,數據的產生者其實並不關心,因此也不肯意配合,工做很難作
但如今使用Kafka,全部的子系統間的數據交互都要經過這個central log來完成,因此把數據以通用的格式放到kafka中,是個必須的過程
從下面兩幅圖能夠看出,有無central log system對系統架構和數據flow的影響
Relationship to ETL and the Data Warehouse
A data warehouse containing clean, integrated data is a phenomenal asset, but the mechanics of getting this are a bit out of date.
The key problem for a data-centric organization is coupling the clean integrated data to the data warehouse.
n my view, ETL is really two things. First, it is an extraction and data cleanup process—essentially liberating data locked up in a variety of systems in the organization and removing an system-specific non-sense. Secondly, that data is restructured for data warehousing queries
The clean, integrated repository of data should be available in real-time as well for low-latency processing as well as indexing in other real-time storage systems.
A better approach is to have a central pipeline, the log, with a well defined API for adding data. The responsibility of integrating with this pipeline and providing a clean, well-structured data feed lies with the producer of this data feed. This means that as part of their system design and implementation they must consider the problem of getting data out and into a well structured form for delivery to the central pipeline.
傳統的數據倉庫技術中,耦合了乾淨數據與數據倉庫自己,乾淨的數據是很好的,但數據倉庫技術有些out沒法適應real-time和low-latency的需求
而且ETL和建立數據倉庫是額外的工做,數據產生的team並不care,因此ETL和數據集成的工做量很大
提出使用central pipeline, the log, with a well defined API for adding data這樣的架構來解決這個問題
Log Files and Events
反覆談這種架構的好處,解耦合,事件驅動的系統
Let's talk a little bit about a side benefit of this architecture: it enables decoupled, event-driven systems.
At LinkedIn, we have built our event data handling in a log-centric fashion.
We are using Kafka as the central, multi-subscriber event log. We have defined several hundred event types, each capturing the unique attributes about a particular type of action. This covers everything from page views, ad impressions, and searches, to service invocations and application exceptions.
一個例子,在job page顯示job posting,在原來的系統中,除了最簡單的顯示邏輯,還有不少附加的和其餘系統交互的邏輯,以下
To understand the advantages of this, imagine a simple event—showing a job posting on the job page. The job page should contain only the logic required to display the job.
明顯這是很是糟糕的設計,代碼會變得很難維護,在基於kafka的系統中,只須要顯示job posting,併發布一條event,全部其餘系統都經過subscribe來進行他們本身的操做
The "event-driven" style provides an approach to simplifying this. The job display page now just shows a job and records the fact that a job was shown along with the relevant attributes of the job, the viewer, and any other useful facts about the display of the job. Each of the other interested systems—the recommendation system, the security system, the job poster analytics system, and the data warehouse—all just subscribe to the feed and do their processing. The display code need not be aware of these other systems, and needn't be changed if a new data consumer is added.
So far, I have only described what amounts to a fancy method of copying data from place-to-place. But shlepping bytes between storage systems is not the end of the story. It turns out that "log" is another word for "stream" and logs are at the heart of stream processing.
什麼是Stream processing?
Stream processing has nothing to do with SQL. Nor is it limited to real-time processing.
I see stream processing as something much broader: infrastructure for continuous data processing. I think the computational model can be as general as MapReduce or other distributed processing frameworks, but with the ability to produce low-latency results.
持續的數據處理,併產生低延遲的結果
產生Stream processing 的動機
The real driver for the processing model is the method of data collection. Data which is collected in batch is naturally processed in batch. When data is collected continuously, it is naturally processed continuously.
數據收集的方式,傳統方式因爲工具的落後,數據每每是按期批量收集的,好比人口普查數據,那麼批量的收集固然就須要批量的處理
LinkedIn, for example, has almost no batch data collection at all. The majority of our data is either activity data or database changes, both of which occur continuously. In fact, when you think about any business, the underlying mechanics are almost always a continuous process—events happen in real-time, as Jack Bauer would tell us. When data is collected in batches, it is almost always due to some manual step or lack of digitization or is a historical relic left over from the automation of some non-digital process. Transmitting and reacting to data used to be very slow when the mechanics were mail and humans did the processing.
時代在進步,隨着數字化和自動化的發展,數據的收集愈來愈流化,而非批量化,因此基於stream的處理方法有了很強的動機
好比Linkedin幾乎沒有用批量的方式來收集數據,而都是以流的方式進行持續收集
It turns out that the log solves some of the most critical technical problems in stream processing, which I'll describe, but the biggest problem that it solves is just making data available in real-time multi-subscriber data feeds.
Data flow graphs
固然在stream process中,log最重要的做用仍然是做爲data flow infrastructure
The purpose of the log in the integration is two-fold.
First, it makes each dataset multi-subscriber and ordered.
Second, the log provides buffering to the processes.
Stateful Real-Time Processing
其次log可用於維護stream processing中的中間狀態
Some real-time stream processing is just stateless record-at-a-time transformation, but many of the uses are more sophisticated counts, aggregations, or joins over windows in the stream.
好比count或join,這樣的比較複雜的流操做須要維持一些狀態
this kind of processing ends up requiring some kind of state to be maintained by the processor: for example, when computing a count, you have the count so far to maintain. How can this kind of state be maintained correctly if the processors themselves can fail?
The simplest alternative would be to keep state in memory. However if the process crashed it would lose its intermediate state.
An alternative is to simply store all state in a remote storage system and join over the network to that store. The problem with this is that there is no locality of data and lots of network round-trips.
這些中間狀態放在memory中,crash時會丟失,若是放在remote storage,明顯效率上有問題
這裏的方案是,把這些中間狀態放在local的table中,而且用changelog記錄local table的歷史記錄
A stream processor can keep it's state in a local "table" or "index"—a bdb, leveldb, or even something more unusual such as a Lucene or fastbit index. The contents of this this store is fed from its input streams (after first perhaps applying arbitrary transformation). It can journal out a changelog for this local index it keeps to allow it to restore its state in the event of a crash and restart. This mechanism allows a generic mechanism for keeping co-partitioned state in arbitrary index types local with the incoming stream data.
The final topic I want to discuss is the role of the log in data system design for online data systems.
Unbundling?
這個觀點很特別,能夠將整個organization中的系統看作是一個單個的distributed database,其中全部的query-oriented systems (Redis, SOLR, Hive tables, and so on)只是數據的各類indexes,而stream processing systems like Storm or Samza只是一些trigger或view工具
You can see the whole of your organization's systems and data flows as a single distributed database. You can view all the individual query-oriented systems (Redis, SOLR, Hive tables, and so on) as just particular indexes on your data. You can view the stream processing systems like Storm or Samza as just a very well-developed trigger and view materialization mechanism.
當前各類數據系統爆炸式的出現,之因此這樣是由於搭建分佈式系統過於複雜,沒法用一個系統handle全部的case,因此不一樣的系統都是focus在某一場景下
因此大數據中不存在one answer fits all的狀況,全部的data system都是在balance和取捨
There is undeniably now an explosion of types of data systems, but in reality, this complexity has always existed.
There are many motivations for segregating data into multiple systems: scale, geography, security, and performance isolation are the most common.
My take is that the explosion of different systems is caused by the difficulty of building distributed data systems.
那麼將來的狀況會怎麼樣?
1.大量的data system會繼續存在下去,那麼log系統用於數據集成將變的很是重要
2.出現一個大而全的系統,這個可能性不大
3.這種狀況更爲現實和有吸引力,幾乎全部的data system都是開源的,因此咱們能夠搭lego玩具同樣,使用不一樣的開源系統,根據本身的需求搭出本身的分佈式系統
I see three possible directions this could follow in the future.
The first possibility is a continuation of the status quo: the separation of systems remains more or less as it is for a good deal longer. In this case, an external log that integrates data will be very important.
The second possibility is that there could be a re-consolidation in which a single system with enough generality starts to merge back in all the different functions into a single uber-system. I think the practical difficulties of building such a system make this unlikely
Open source allows another possibility: data infrastructure could be unbundled into a collection of services and application-facing system apis. You already see this happening to a certain extent in the Java stack:
It starts to look a bit like a lego version of distributed data system engineering. You can piece these ingredients together to create a vast array of possible systems. If the implementation time for a distributed system goes from years to weeks because reliable, flexible building blocks emerge, then the pressure to coalesce into a single monolithic system disappears.
The place of the log in system architecture
其實不管將來怎麼發展,log系統都會發揮重要的做用
A system that assumes an external log is present allows the individual systems to relinquish a lot of their own complexity and rely on the shared log. Here are the things I think a log can do:
系統分爲兩部分,log和service層
全部的write都先寫到log上,由log來保證數據的一致性,能夠想象成kafka系統收集各類更新
serving layer存儲了各類的index,index從log subscribe更新並同步indexes
最終client以read-your-write semantics從services node讀取數據
Here is how this works. The system is divided into two logical pieces: the log and the serving layer.
The log captures the state changes in sequential order.
The serving nodes store whatever index is required to serve queries (for example a key-value store might have something like a btree or sstable, a search system would have an inverted index).The serving nodes subscribe to the log and apply writes as quickly as possible to its local index in the order the log has stored them.
The client can get read-your-write semantics from any node by providing the timestamp of a write as part of its query—a serving node receiving such a query will compare the desired timestamp to its own index point and if necessary delay the request until it has indexed up to at least that time to avoid serving stale data.
the serving nodes can be completely without leaders, since the log is the source of truth.
By having this log system, you get a fully developed subscription API for the contents of the data store which feeds ETL into other systems. In fact, many systems can share the same the log while providing different indexes, like this:
I find this view of systems as factored into a log and query api to very revealing, as it lets you separate the query characteristics from the availability and consistency aspects of the system.
有人以爲,單獨保存一份log數據很浪費,其實否則 The idea of having a separate copy of data in the log (especially if it is a complete copy) strikes many people as wasteful. First, the log can be a particularly efficient storage mechanism. We store up to 5TB on our production Kafka servers. The serving system may also use optimized hardware. In contrast, the log system just using large multi-TB hard drives. Finally, as in the picture above, in the case where the data is served by multiple systems, the cost of the log is amortized over multiple indexes.