【原創】大數據基礎之Impala(2)實現細節

一 架構

Impala is a massively-parallel query execution engine, which runs on hundreds of machines in existing Hadoop clusters. It is decoupled from the underlying storage engine, unlike traditional relational database management systems where the query processing and the underlying storage engine are components of a single tightly-coupled system.java

impala是一個大規模並行查詢引擎,能夠在現有hadoop集羣上部署運行,與底層存儲引擎解耦;node

An Impala deployment is comprised of three services.redis

一個impala集羣由三部分組成:impalad、statestored、catalogd;算法

impalad

The Impala daemon (impalad) service is dually responsible for accepting queries from client processes and orchestrating their execution across the cluster, and for executing individual query fragments on behalf of other Impala daemons. When an Impala daemon operates in the First role by managing query execution, it is said to be the coordinator for that query. However, all Impala daemons are symmetric; they may all operate in all roles. This property helps with fault-tolerance, and with load-balancing.
One Impala daemon is deployed on every machine in the cluster that is also running a datanode process - the block server for the underlying HDFS deployment - and therefore there is typically one Impala daemon on every machine. This allows Impala to take advantage of data locality, and to read blocks from the Filesystem without having to use the network.sql

全部的impalad是對等的,沒有主從結構,這樣自然具備容錯和負載均衡的好處,impalad負責:
1)接收client的query請求(做爲該query的coordinator)並將query分解到整個集羣並行執行;
2)配合其餘impalad(即其餘query的coordinator)執行query的一部分;
impalad一般和datanode部署在一塊兒,這樣能夠利用數據本地性,避免網絡開銷;數據庫

statestored

The Statestore daemon (statestored) is Impala's meta-data publish-subscribe service, which disseminates cluster-wide metadata to all Impala processes. There is a single statestored instanceexpress

statestored是impala的metadata發佈訂閱服務,會將集羣中的metadata變動推送到全部的impalad,statestore是單點;數組

catalogd

Finally, the Catalog daemon (catalogd), serves as Impala's catalog repository and metadata access gateway. Through the catalogd, Impala daemons may exe-cute DDL commands that are reected in external catalog stores such as the Hive Metastore. Changes to the system catalog are broadcast via the statestore.promise

catalogd是impala的catalog倉庫和metadata訪問網關,impalad執行ddl操做時經過catalogd同步操做外部catalog好比hive metastore;全部的catalog變動也經過statestored通知全部的impalad;網絡

實現細節

1 state

A major challenge in the design of an MPP database that is intended to run on hundreds of nodes is the coordination and synchronization of cluster-wide metadata. Impala's symmetric-node architecture requires that all nodes must be able to accept and execute queries. Therefore all nodes must have, for example, up-to-date versions of the system catalog and a recent view of the Impala cluster's membership so that queries may be scheduled correctly.

MPP(Massively Parallel Processing)數據庫設計的一個主要挑戰是在幾百個節點的集羣中協調和同步元數據;impala的對等結構要求全部的節點都能接收和執行query,因此全部的節點都必須有最新的catalog和集羣中全部節點的狀態;

We might approach this problem by deploying a separate cluster-management service, with ground-truth versions of all cluster-wide metadata. Impala daemons could then query this store lazily (i.e. only when needed), which would ensure that all queries were given up-to-date responses. However, a fundamental tenet in Impala's design has been to avoid synchronous RPCs wherever possible on the critical path of any query. Without paying close attention to these costs, we have found that query latency is often compromised by the time taken to establish a TCP connection, or load on some remote service. Instead, we have designed Impala to push updates to all interested parties, and have designed a simple publish-subscribe service called the statestore to disseminate metadata changes to a set of subscribers.

要解決這個問題能夠部署一套單獨的服務,每次須要元數據的時候主動調用服務查詢(pull),這樣能夠保證每次查詢都能拿到最新版本的元數據;可是impala設計的基本原則就是要避免在查詢的關鍵路徑上有同步rpc調用(創建鏈接和遠程服務調用的時間開銷);因此impala在設計的時候使用一個叫作statestore的發佈訂閱服務來推送全部的元數據更新(push);

The statestore maintains a set of topics, which are arrays of (key, value, version) triplets called entries where 'key' and 'value' are byte arrays, and 'version' is a 64-bit integer. A topic is defined by an application, and so the statestore has no understanding of the contents of any topic entry. Topics are persistent through the lifetime of the statestore, but are not persisted across service restarts. Processes that wish to receive updates to any topic are called subscribers, and express their interest by registering with the statestore at start-up and providing a list of topics. The statestore responds to registration by sending the subscriber an initial topic update for each registered topic, which consists of all the entries currently in that topic.

statestore維護一個topic的集合,每一個topic其實是(key、value、version)的數組,topic數據並無作持久化,重啓statestore後以前的topic數據就沒了;每一個impalad啓動後都會到statestored註冊,同時提供要訂閱的topic列表,statestored會把訂閱的topic中全部的數據都推送給impalad;

After registration, the statestore periodically sends two kinds of messages to each subscriber. The first kind of message is a topic update, and consists of all changes to a topic (new entries, modified entries and deletions) since the last update was successfully sent to the subscriber. Each subscriber maintains a per-topic most-recent-version identifier which allows the statestore to only send the delta between updates. In response to a topic update, each subscriber sends a list of changes it wishes to make to its subscribed topics. Those changes are guaranteed to have been applied by the time the next update is received.
The second kind of statestore message is a keepalive. The statestore uses keepalive messages to maintain the connection to each subscriber, which would otherwise time-out its subscription and attempt to re-register. Previous versions of the statestore used topic update messages for both purposes, but as the size of topic updates grew it became difficult to ensure timely delivery of updates to each subscriber, leading to false-positives in the subscriber's failure-detection process.

impalad向statestored註冊以後,statestored會週期性的向impalad發送兩種消息:1)topic更新消息;2)keepalive消息(保持長鏈接);

If the statestore detects a failed subscriber (for example, by repeated failed keepalive deliveries), it will cease sending updates. Some topic entries may be marked as 'transient', meaning that if their 'owning' subscriber should fail, they will be removed. This is a natural primitive with which to maintain liveness information for the cluster in a dedicated topic, as well as per-node load statistics.
The statestore provides very weak semantics: subscribers may be updated at diffierent rates (although the statestore tries to distribute topic updates fairly), and may therefore have very diffierent views of the content of a topic. However, Impala only uses topic metadata to make decisions locally, without any coordination across the cluster. For example, query planning is performed on a single node based on the catalog metadata topic, and once a full plan has been computed, all information required to execute that plan is distributed directly to the executing nodes. There is no requirement that an executing node should know about the same version of the catalog metadata topic.

statestore設計使得不一樣的impalad中的元數據可能在不一樣的時間被更新,儘管如此,impalad收到query以後只使用本地的topic元數據來生成執行計劃,一旦一個執行計劃生成,執行所須要的所有信息都會直接下發到執行節點,因此不要求執行節點和生成執行計劃的節點擁有一樣的元數據;

Although there is only a single statestore process in existing Impala deployments, we have found that it scales well to medium sized clusters and, with some configuration, can serve our largest deployments. The statestore does not persist any metadata to disk: all current metadata is pushed to the statestore by live subscribers (e.g. load information).

雖然statestored只部署一個,可是足以支撐中等規模的集羣;

Therefore, should a statestore restart, its state can be recovered during the initial subscriber registration phase. Or if the machine that the statestore is running on fails, a new statestore process can be started elsewhere, and subscribers may fail over to it. There is no built-in failover mechanism in Impala, instead deployments commonly use a retargetable DNS entry to force subscribers to automatically move to the new process instance.

statestore不會持久化元數據到磁盤,當一個statestore重啓時,狀態很容易恢復;即便statestore所在機器掛了,一個新的statestore也很容易在其餘機器上啓動,可是impala並無內置failover機制使得impalad可以切換到新的statestore,這裏一般使用dns服務來實現statestore切換;

2 catalog

Impala's catalog service serves catalog metadata to Impala daemons via the statestore broadcast mechanism, and executes DDL operations on behalf of Impala daemons. The catalog service pulls information from third-party metadata stores (for example, the Hive Metastore or the HDFS Namenode), and aggregates that information into an Impala-compatible catalog structure. This architecture allows Impala to be relatively agnostic of the metadata stores for the storage engines it relies upon, which allows us to add new metadata stores to Impala relatively quickly (e.g. HBase support). Any changes to the system catalog (e.g. when a new table has been loaded) are disseminated via the statestore.

catalogd經過statestored將catalog元數據(包括後續更新)推送給impalad,同時代替impalad執行DDL操做;catalogd會從第三方元數據系統中拉取信息,這種設計使得impala很容易兼容第三方元數據;

The catalog service also allows us to augment the system catalog with Impala-specfic information. For example, we register user-defined-functions only with the catalog service (without replicating this to the Hive Metastore, for example), since they are specific to Impala.

impala udf只註冊到catalogd中;

Since catalogs are often very large, and access to tables is rarely uniform, the catalog service only loads a skeleton entry for each table it discovers on startup. More detailed table metadata can be loaded lazily in the background from its third-party stores. If a table is required before it has been fully loaded, an Impala daemon will detect this and issue a prioritization request to the catalog service. This request blocks until the table is fully loaded.

catalog一般很大,因此catalogd啓動時只加載全部庫表的基本信息,詳細信息會在須要的時候再加載(lazily);若是一個表在被查詢時尚未被加載,impalad會發送一個請求給catalogd,這個請求會一直卡住直到表被加載完;

二 查詢流程

一個查詢從開始到結束由6步組成

實現細節

frontend

The Impala frontend is responsible for compiling SQL text into query plans executable by the Impala backends. It is written in Java and consists of a fully-featured SQL parser and cost-based query optimizer, all implemented from scratch. In addition to the basic SQL features (select, project, join, group by, order by, limit), Impala supports inline views, uncorrelated and correlated subqueries (that are rewritten as joins), all variants of outer joins as well as explicit left/right semi- and anti-joins, and analytic window functions.

frontend負責將sql編譯爲backend能夠執行的查詢計劃,使用java編寫,包括sql解析器和查詢優化器;

The query compilation process follows a traditional division of labor: Query parsing, semantic analysis, and query planning/optimization. We will focus on the latter, most challenging, part of query compilation. The Impala query planner is given as input a parse tree together with query-global information assembled during semantic analysis (table/column identifiers, equivalence classes, etc.). An executable query plan is constructed in two phases: (1) Single node planning and (2) plan parallelization and fragmentation.

查詢編譯過程包括:查詢解析、語義分析、查詢計劃、查詢優化;一個可執行的查詢計劃的建立有兩個階段:單節點計劃和分佈式計劃;

In the first phase, the parse tree is translated into a non-executable single-node plan tree, consisting of the following plan nodes: HDFS/HBase scan, hash join, cross join, union, hash aggregation, sort, top-n, and analytic evaluation. This step is responsible for assigning predicates at the lowest possible plan node, inferring predicates based on equivalence classes, pruning table partitions, setting limits/offsets, applying column projections, as well as performing some cost-based plan optimizations such as ordering and coalescing analytic window functions and join reordering to minimize the total evaluation cost. Cost estimation is based on table/partition cardinalities plus distinct value counts for each column; histograms are currently not part of the statistics. Impala uses simple heuristics to avoid exhaustively enumerating and costing the entire join-order space in common cases.

第一階段,sql會被翻譯爲一個不可執行的單節點的計劃樹;優化的因素包括:predicate pushdown(謂詞下推,基於分區、limit、列預測等)和cost estimate(成本預估,基於全部列的基數計數);

The second planning phase takes the single-node plan as input and produces a distributed execution plan. The general goal is to minimize data movement and maximize scan locality: in HDFS, remote reads are considerably slower than local ones. The plan is made distributed by adding exchange nodes between plan nodes as necessary, and by adding extra non-exchange plan nodes to minimize data movement across the network (e.g., local aggregation nodes). During this second phase, we decide the join strategy for every join node (the join order is fixed at this point). The supported join strategies are broadcast and partitioned. The former replicates the entire build side of a join to all cluster machines executing the probe, and the latter hash-redistributes both the build and probe side on the join expressions. Impala chooses whichever strategy is estimated to minimize the amount of data exchanged over the network, also exploiting existing data partitioning of the join inputs.

第二階段,單節點計劃樹會被用來生成一個分佈式執行計劃;優化的因素包括:最小化數據移動,最大化的利用數據本地性,儘量的先在本地聚合減小後續的數據移動,決定join策略(broadcast or partitioned);

All aggregation is currently executed as a local pre-aggregation followed by a merge aggregation operation. For grouping aggregations, the pre-aggregation output is partitioned on the grouping expressions and the merge aggregation is done in parallel on all participating nodes. For non-grouping aggregations, the merge aggregation is done on a single node. Sort and top-n are parallelized in a similar fashion: a distributed local sort/top-n is followed by a single-node merge operation. Analytic expression evaluation is parallelized based on the partition-by expressions. It relies on its input being sorted on the partition-by/order-by expressions. Finally, the distributed plan tree is split up at exchange boundaries. Each such portion of the plan is placed inside a plan fragment, Impala's unit of backend execution. A plan fragment encapsulates a portion of the plan tree that operates on the same data partition on a single machine.

全部的聚合在執行時都會分解爲本地pre-aggregation和merge aggregation(須要shuffle);
最終,分佈式計劃樹根據exchange(shuffle)被拆分爲多個plan fragement(一個plan fragment是一個backend的執行單元);

backend

Impala's backend receives query fragments from the frontend and is responsible for their fast execution. It is designed to take advantage of modern hardware. The backend is written in C++ and uses code generation at runtime to produce efficient codepaths (with respect to instruction count) and small memory overhead, especially compared to other engines implemented in Java.

backend從frontend接收query fragment,而後負責query fragment的快速執行;它被設計爲儘量的利用現代硬件;使用C++編寫,使用運行時代碼生成來產生高效的代碼路徑(根據cpu指令數量來衡量高效)和更少的內存佔用;

Impala leverages decades of research in parallel databases. The execution model is the traditional Volcano-style with Exchange operators. Processing is performed batch-at-a-time: each GetNext() call operates over batches of rows. With the exception of "stop-and-go" operators (e.g. sorting), the execution is fully pipeline-able, which minimizes the memory consumption for storing intermediate results. When processed in memory, the tuples have a canonical in-memory row-oriented format.

impala執行模型是Volcano;處理過程是一次一批,每次調用GetNext()都會在一批數據上進行操做;除了‘stop-and-go’類型的operator,執行過程是管道化的(多個operator一塊兒執行),這樣避免存儲中間結果能夠最小化內存佔用;

Operators that may need to consume lots of memory are designed to be able to spill parts of their working set to disk if needed. The operators that are spillable are the hash join, (hash-based) aggregation, sorting, and analytic function evaluation.

一些可能消耗不少內存的operator被設計爲能夠根據須要將部分中間結果輸出磁盤;

Impala employs a partitioning approach for the hash join and aggregation operators. That is, some bits of the hash value of each tuple determine the target partition and the remaining bits for the hash table probe. During normal operation, when all hash tables fit in memory, the overhead of the partitioning step is minimal, within 10% of the performance of a non-spillable non-partitioning-based implementation. When there is memory-pressure, a \victim" partition may be spilled to disk, thereby freeing memory for other partitions to complete their processing. When building the hash tables for the hash joins and there is reduction in cardinality of the build-side relation, we construct a Bloom filter which is then passed on to the probe side scanner, implementing a simple version of a semi-join.

針對hash join和aggregation操做,impala使用一種分區方法:每一行記錄的hash值中的一些位用於肯定目標分區,其餘位用於hash table探查;當hash table數據所有在內存中時,分區的開銷很小,大概只有10%;當有內存壓力時,一些分區可能被輸出到磁盤,這樣釋放內存使得其餘分區能夠完成計算;

 

1 Runtime Code Generation

Runtime code generation using LLVM is one of the techniques employed extensively by Impala's backend to improve execution times. Performance gains of 5x or more are typical for representative workloads.

基於LLVM的運行時代碼生成能夠提高5倍性能;

2 I/O Management

Efficiently retrieving data from HDFS is a challenge for all SQL-on-Hadoop systems. In order to perform data scans from both disk and memory at or near hardware speed, Impala uses an HDFS feature called short-circuit local reads to bypass the DataNode protocol when reading from local disk. Impala can read at almost disk bandwidth (approx.100MB/s per disk) and is typically able to saturate all available disks. We have measured that with 12 disks, Impala is capable of sustaining I/O at 1.2GB/sec. Furthermore, HDFS caching allows Impala to access memory-resident data at memory bus speed and also saves CPU cycles as there is no need to copy data blocks and/or checksum them.
The effectiveness of Impala's I/O manager was that Impala's read throughput is from 4x up to 8x higher than the other tested systems.

impala使用short-circuit local reads來繞過DataNode來直接讀取本地磁盤,這使得impala能夠用磁盤帶寬(每一個磁盤100M/s)來讀;
impala高效的IO管理能夠將讀性能提高4到8倍;

3 Storage Formats

Impala supports most popular file formats: Avro, RC, Sequence, plain text, and Parquet. These formats can be combined with diffierent compression algorithms, such as snappy, gzip, bz2.
In most use cases we recommend using Apache Parquet, a state-of-the-art, open-source columnar file format offering both high compression and high scan efficiency. It was co-developed by Twitter and Cloudera with contributions from Criteo, Stripe, Berkeley AMPlab, and LinkedIn. In addition to Impala, most Hadoop-based processing frameworks including Hive, Pig, MapReduce and Cascading are able to process Parquet.
Parquet consistently outperforms by up to 5x all the other formats.

impala支持大部分常見的文件格式以及壓縮算法;
Parquet能夠提高5倍性能;

 

與其餘engine的對比

 

volcano:https://paperhub.s3.amazonaws.com/dace52a42c07f7f8348b08dc2b186061.pdf

 

參考:http://cidrdb.org/cidr2015/Papers/CIDR15_Paper28.pdf

相關文章
相關標籤/搜索