F1 Query: Declarative Querying at Scale

距離 Google 的上一篇 F1 論文,也就是 F1: A Distributed SQL Database That Scales 已經 5 年過去了,Google 在今年的 VLDB 上終於發佈了 F1 的新版本 F1 Query: Declarative Querying at Scale,咱們今天就來看一下這篇論文。安利一下,在 PingCAP 的 paper party 上,黃東旭,主頁連接大神對這篇論文的講解很是精彩,文章中也部分引用了他的觀點,在此鳴謝。算法

2013 年的 F1 是基於 Spanner,主要提供 OLTP 服務,而新的 F1 則定位則是大一統:旨在處理 OLTP/OLAP/ETL 等多種不一樣的 workload。可是這篇新的 F1 論文對 OLTP 的討論則是少之又少,據八卦是 Spanner 開始原生支持以前 F1 的部分功能,致使 F1 對 OLTP 的領地被吞併了。下面看一下論文的具體內容,疏漏之處歡迎指正。網絡

0. 摘要

F1 Query 是一個大一統的 SQL 查詢處理平臺,能夠處理存儲在 Google 內部不一樣存儲介質(Bigtable, Spanner, Google Spreadsheet)上面的不一樣格式文件。簡單來講,F1 Query 能夠同時支持以下功能:OLTP 查詢,低延遲 OLAP 查詢,ETL 工做流。F1 Query 的特性包括:多線程

  • 爲不一樣數據源的數據提供統一視圖
  • 利用數據中心的資源提供高吞吐和低延遲的查詢
  • High Scalability
  • Extensibility

1. 背景

在 Google 內部的數據處理和分析的 use case 很是複雜,對不少方面都有不一樣的要求,好比數據大小、延遲、數據源以及業務邏輯支持。結果致使了許多數據處理系統都只 focus 在一個點上,好比事務式查詢、OLAP 查詢、ETL 工做流。這些不一樣的系統每每具備不一樣的特性,不論是使用仍是開發上都會有極大的不便利。架構

F1 Query 就在這個背景下誕生了,用論文中的話說就是併發

we present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis.

F1 Query 旨在覆蓋數據處理和分析的全部方面。F1 Query 在內部已經應用到了多個產品線,好比 Advertising, Shopping, Analytics 和 Payment。框架

在 F1 Query 的系統設計過程當中,下面幾點考量具備很是關鍵的做用。機器學習

  • Data Fragmentation: Google 內部的數據因爲自己的特性不一樣,會被存儲到不一樣的存儲系統中。這樣會致使一個應用程序依賴的數據可能橫跨多個數據存儲系統中,甚至以不一樣的文件格式。對於這個問題,F1 Query 對於這些數據提供一個統一的數據視圖。
  • Datacenter Architecture: F1 Query 的目標是多數據中心,這個和傳統的 shared nothing 架構的數據處理系統不一樣相同。傳統的模式爲了下降延遲,每每須要考慮 locality,也就是數據和計算越近越好。因爲 Google 內部的網絡環境優點,locality 的優點顯得不是那麼重要。因此 F1 Query 更強調計算和存儲分離,這樣計算節點和存儲節點的擴展性(scalability)都會更好。畢竟 Google 內部的系統,scalability 纔是第一法則。還有一點值得一提的是,因爲使用了 GFS 的更強版本: Colossue File System,磁盤不會成爲瓶頸。
  • Scalability: 在 F1 Query 中,short query 會在單個節點上執行,larger query 會以分佈式的模式執行,largest query 以批處理 MapReduce 模式執行。對於這些模式,F1 Query 能夠經過增長運算的並行度來優化。
  • Extensibility: 對於那些沒法用 SQL 語義來表達的查詢需求,F1 經過提供 user-defined functions (UDF)、user-defined aggregate functions (UDAs) 和 table-valued functions (TVF) 來支持。

2. 架構

F1 的架構圖以下所示:異步

下面的方框裏面是每一個 Datacenter 一套。關於各個組件的介紹以下:分佈式

  • 用戶經過 client libary 和 F1 Server 交互
  • F1 Master 負責 query 的狀態的運行時監控和其餘組件的管理
  • F1 Server 收到用戶請求,對於 short query 直接單機執行查詢;對於 larger query 轉發到多臺 worker 上並行執行查詢。最後再彙總結果返回給 client。
  • F1 Worker 負責具體查詢執行
  • F1 Server 和 Worker 都是無狀態的,方便擴展

2.1 query 執行

用戶經過 client libary 提交 query 到 F1 Server 上,F1 Server 首先解析和分析 SQL,而後將涉及到的數據源提取出來,若是某些數據源在當前 datacenter 不存在,則直接將 query 返回給 client 並告知哪些 F1 Server 距離哪些數據源更近。這裏直接將請求返回給業務層,由業務層去 retry,設計的也是很是的簡單。儘管前面說到要將存儲和計算分離,可是這個地方的設計仍是考慮到了 locality,datacenter 級別的 locality,畢竟 locality 對查詢延遲的影響仍是巨大的。ide

F1 Server 將 query 解析並優化成 DAG,而後由執行層來執行,具體執行模式(interactive 仍是 batch)由用戶指定。原文是: Based on a client- specified execution mode preference, F1 Query executes queries on F1 servers and workers in an interactive mode or in a batch mode.

對於交互式查詢模式(interactive mode)有單節點集中執行模式和多節點分佈式執行模式,query 優化會根據啓發式的算法來決定採用哪一種模式。集中式下,F1 Server 解析分析 query,而後在當前節點上直接執行並接收查詢結果。分佈式下,接收 query 的 F1 Server 充當一個 query coordinator 的角色,將 query 拆解並下發給 worker。交互式查詢在數據量不太大的狀況下每每具備不錯的性能和高效的資源利用率。

除了交互式查詢還有一種模式是批處理模式(batch mode)。批處理模式使用 MapReduce 框架異步提交執行執行,相比交互式這種 long-running 方式,批處理模式的可靠性(reliabitly)更高。

2.2 數據源

數據查詢支持跨 datacenter。存儲計算分離模式使得多數據源的支持更加簡單,好比 Spanner, Bigtable, CSV, columnar file 等。爲了支持多數據源,F1 Query 在他們之上抽象出了一層,讓數據看起來都是存儲在關係型表裏面。而各個數據源的元數據就存儲在 catalog service 裏面。

對於沒有存儲到 catalog service 裏面的表數據,只要提供一個DEFINE TABLE便可查詢。

DEFINE TABLE People(
      format = ‘csv’,
      path = ‘/path/to/peoplefile’,
      columns = ‘name:STRING,
                 DateOfBirth:DATE’);
    SELECT Name, DateOfBirth FROM People
    WHERE Name = ‘John Doe’;

論文中沒有提到的是單看這個 DEFINE TABLE 能夠表現力不夠,所說這些信息並不足以表現出數據的行爲:

  • 是否支持 partition?
  • 是否支持 邏輯下推?
  • 是否支持索引?
  • 是否支持多種 掃描模式?
  • 對於新數據源的支持能夠經過 Table-Valued Function (TVF) 的方式來支持。

2.3 Data Sink

query 的結果能夠直接返回給 client,也能夠插入到另一個表裏面。

2.4 SQL

SQL 2011。之因此是 2011 是由於其餘老的系統使用的是 2011。

3. 交互式查詢

交互式查詢模式是默認的查詢模式。如前所述,交互式查詢有集中式和分佈式,具體使用哪一種由優化器分析 client 的 query 而後決定。

3.1 Single Threaded Execution Kernel

集中式的查詢以下圖所示,是一種 pull-based 的單線程執行方式。

3.2 Distributed Execution

如前所述,由優化器分析完 query 決定是否採用分佈式模式。在分佈式這種模式下接收到 query 的 F1 Server 充當一個 coordinator 的角色,將執行 plan 推給 worker。worker 是多線程的,能夠併發執行單個 query 的無依賴的 fragment。Fragment 是執行計劃切分出來的執行計劃片斷,很是像 MR 或者 Spark 中的 stage。Fragment 之間經過 Exchange Operator (數據重分佈) 鏈接。

Fragment 的切分過程以下:優化器使用一種基於數據分佈依賴的 bottom-up 策略。具體來講每一個算子對於輸入數據的分佈都有要求,好比 hash 或者依賴其餘字段的分佈。典型的例子有 group by key 和 hash join。若是當前的數據分佈知足先後兩個算子的要求,則兩個算子就被放到一個 Fragment 裏面,不然就被分到兩個 Fragment 裏面,而後經過 Exchange Operator 來鏈接。

下一步就是計算每一個 Fragment 的並行度,Fragment 之間並行度互相獨立。葉子節點的 Fragment 的底層 table scan 決定最初的並行度,而後上層經過 width calculator 逐步計算。好比 hash-join 的底層兩個 Fragment 分別是 100-worker 和 50-worker,則 hash-join 這個 Fragment 會使用 100-worker 的並行度。下面是一個具體的例子。

SELECT Clicks.Region, COUNT(*) ClickCount
  FROM Ads JOIN Clicks USING (AdId)
  WHERE Ads.StartDate > ‘2018-05-14’ AND
        Clicks.OS = ‘Chrome OS’
  GROUP BY Clicks.Region
  ORDER BY ClickCount DESC;

上面 SQL 對應的 Fragment 和一種可能 worker 並行度以下圖所示:

3.3 Partitioning Strategy

數據重分佈也就是 Fragment 之間的 Exchange Operator,對於每條數據,數據發送者經過分區函數來計算數據的目的分區數,每一個分區數對應一個 worker。Exchange Operator 經過 RPC 調用,擴展能夠支持到每一個 Fragment 千級的 partion 併發。要求再高就須要使用 batch mode。

查詢優化器將 scan 操做做爲執行計劃的葉子節點和 N 個 worker 節點併發。爲了併發執行 scan 操做,數據必需要被併發分佈,而後由全部 worker 一塊兒產生輸出結果。有時候數據的 partition 會超過 N,而 scan 併發度爲 N,多餘的 partition 就交由空閒的 worker 去處理,這樣能夠避免數據傾斜。

3.4 Performance Considerations

F1 Query 的主要性能問題在於數據傾斜和訪問模式不佳。Hash join 對於 hot key 尤其敏感。當 hot key 被 worker 載入到內存的時候可能會由於數據量太大而寫入磁盤,從而致使性能降低。

論文中舉了一個 lookup join 的例子,這裏不打算詳述了。

對於這種數據傾斜的問題,F1 Query 的解決方案是 Dynamic Key Range,可是論文中對其描述仍是不夠詳細。

F1 Query 對於交互式查詢採用存內存計算,並且沒有 check point。由於是內存計算,因此速度很是的快,可是因爲沒有 checkpoint 等 failover 的機制,只能依賴於業務層的重試。

4. 批處理

像 ETL,都是經過 Batch Mode 來處理的。Google 之前都是經過 MapReduce 或者 FlumeJava 來開發的,開發成本通常比較高。相比 SQL 這種方式,不能有效複用 SQL 優化,因此 F1 Query 選擇使用 SQL 來作。

如前所述,交互式查詢不適合處理 worker failure,而 batch mode,也就是批處理這種模式特別適合處理 failover(每個 stage 結果落盤)。批處理模式複用交互式 SQL query 的一些特性,好比 query 優化,執行計劃生成。交互式模式和批處理模式的核心區別在於調度方式不一樣:交互式模式是同步的,而批處理模式是異步的。

4.1 Batch Execution Framework

批處理使用的框架是 MapReduce,Fragment 被抽象成 MapReduce 中的 stage,stage 的輸出結果被存儲到 Colossus file system (GFS 二代)。

在 Fragment 映射有一點值得注意的是嚴格來講,Fragment 的 DAG 映射到 mr 是 map-reduce-reduce,對這種模式作一個簡單的變通變成:map-reduce-map<identity>-reduce,以下圖:

關於 MapReduce 的更詳細信息能夠參考 Google 03 年那篇論文。

4.2 Batch Service Framework

Framework 會對 batch mode query 的執行進行編排。具體包括:query 註冊,query 分發,調度已經監控 mr 做業的執行。當 F1 Server 接收到一個 batch mode query,它會先生成執行計劃並將 query 註冊到 Query Registry,全局惟一的 Spanner db,用來 track batch mode query。Query Distributor 而後將 query 分發給 datacenter。Query Scheduler 會按期從 Registry 拿到 query,而後生成執行計劃並交給 Query Executor 來處理。

Service Framework 的健壯性很是好:Query Distributor 是選主(master-elect)模式;Query Scheduler 在每一個 datacenter 有多個。query 的全部執行狀態都是保存在 Query Registry,這就保證其餘的組件是無狀態的。容錯處理:MapReduce 的 stage 會被重試,若是 datacenter 出問題,query 會被分配到新的 datacenter 上從新執行。

5. 優化器

SQL 優化器相似 Spark Catalyst,架構以下圖,不細說了。

6. EXTENSIBILITY

對於不少複雜業務邏輯沒法用 SQL 來描述,F1 針對這種狀況提供了一種用戶自定義函數的方法,包括 UDF (user-define functions),UDA (aggrega- tion functions) 和 TVF (table-valued functions)。對於簡單的UDF需求,一般直接以SQL或者LUA的形式做爲query的一部分;對於更復雜或者性能要求高的UDF需求,則能夠用其它高級語言以UDF Server的形式實現。

UDF Server 和 F1 Query 是 RPC 調用關係,有 client 單獨部署在同一個 datacenter。udf server 徹底有 client 來控制,無狀態,基本能夠無限擴展。

6.1 Scalar Functions

UDF 並非新的概念,UDF Server 這種部署方式看上去還算新穎一點。可是 UDF Server 這種單獨部署模式一個可能的問題是延遲問題,這裏經過批量流水線的方式來減小延遲。下面是 UDF 的一個例子。

local function string2unixtime(value)
  local y,m,d = match("(%d+)%-(%d+)%-(%d+)")
  return os.time({year=y, month=m, day=d})
end

6.2 Aggregate Functions

UDA 是對多行輸入產生一個單一的輸出,要實現 UDA,用戶須要實現算子 Initialize, Accumulate, and Finalize。另外如要要對多個 UDA 的子聚合結果進行再聚合,用戶能夠實現 Reaccumulate。

6.3 Table-Valued Functions

TVF 的輸入是一個 table,輸出是另一個 table。這種在機器學習的模型訓練場景下比較有用。下面是論文中的具體的一個例子:EventsFromPastDays 就是一個 TVF。

SELECT * FROM EventsFromPastDays(
     3, TABLE Clicks);

固然 TVF 也支持用 SQL 來描述,以下。

CREATE TABLE FUNCTION EventsFromPastDays(
     num_days INT64, events ANY TABLE) AS
     SELECT * FROM events
     WHERE date >= DATE_SUB(
         CURRENT_DATE(),
         INTERVAL num_days DAY);

7. Production Metric

下面是 F1 Query 在 Production 環境下的幾個 metrics。

8. 總結

回過頭來看 F1 Query 最新的這篇論文給人最大的啓發就是大一統的思想,這個頗有多是行業發展趨勢。回想一下 MapReduce 論文由 Google 於 2003 年發表,開源實現 Hadoop 於 2005 問世。不妨期待了一下將來的 3 到 5 年的 F1 Query 的開源產品。

做者介紹:陶克路,花名敵琺,阿里巴巴技術專家。Apache Pulsar 等開源軟件 Contrijiesbutor。技術領域包括大數據和雲原生技術棧,目前致力於構建大數據領域業界領先的 APM 產品。


本文做者:陶克路

閱讀原文

本文爲阿里雲內容,未經容許不得轉載。

相關文章
相關標籤/搜索