presto執行過程,sql支持及hive異同

Prestodb概述及性能測試

概述內容html

(1)簡介node

(2)Hive and Prestodb, comparison of functionalitymysql

(3)Hive and Prestodb, comparison of performancegit

 

(1)簡介github

Presto是由facebook開發的一個分佈式SQL查詢引擎, 它被設計爲用來專門進行高速、實時的數據分析。它支持標準的ANSI SQL,包括複雜查詢、聚合(aggregation)、鏈接(join)和窗口函數(window functions)。算法

Presto框架圖以下:spring



     下面的架構圖中展示了簡化的Presto系統架構。客戶端(client)將SQL查詢發送到Presto的協調員(coordinator)。協調員會進行語法檢查、分析和規劃查詢計劃。計劃員(scheduler)將執行的管道組合在一塊兒,將任務分配給那些裏數據最近的節點,而後監控執行過程。客戶端從輸出段中將數據取出,這些數據是從更底層的處理段中依次取出的。sql

     Presto的運行模型和Hive或MapReduce有着本質的區別。Hive將查詢翻譯成多階段的MapReduce任務,一個接着一個地運行。每個任務從磁盤上讀取輸入數據而且將中間結果輸出到磁盤上。然而Presto引擎沒有使用MapReduce。它使用了一個定製的查詢和執行引擎和響應的操做符來支持SQL的語法。除了改進的調度算法以外,全部的數據處理都是在內存中進行的。不一樣的處理端經過網絡組成處理的流水線。這樣會避免沒必要要的磁盤讀寫和額外的延遲。這種流水線式的執行模型會在同一時間運行多個數據處理段, 一旦數據可用的時候就會將數據從一個處理段傳入到下一個處理段。這樣的方式會大大的減小各類查詢的端到端響應時間。數據庫



 

 

(2)Hive and Prestodb, comparison of functionality數組

        √: Yes; ×: No; Blue: The main differences between hive and presto

 

 

hive 0.11.0

presto 0.56

Implement

Java

Java

DataType

 

integer

string

floating point

boolean

map

list

struct

uniontype

×

timestamp

DDL(數據定義語言)

 

create/alter/drop table

×

create view

×

truncate table

×

desc

create index

×

DML(數據操做語言)

 

load data

×

insert

explain

tablesample(基於column作bucket)

group by

order by

having

limit

inner/left/right/full join

union

sub queries

Enhanced Aggregation, Cube, Grouping and Rollup

×

lateral view

×

Function

 

UDF

×

Mathematical Functions

String Functions

Date and Time Functions

Regex

Type Conversion Functions

×

Conditional Functions

Aggregate Functions

Windowing

Distinct

Url

Json

 

功能上,Presto與Hive有幾個不一樣的地方,也能夠說是Presto功能不完善,畢竟Presto推出時間不長,詳見以下:

1. Presto完成沒有數據寫入功能,不能使用create語句建表(可經過CREATE TABLE tablename AS query),創建視圖、導數據。

2. Presto不支持UDF(用戶自定義函數)。

       3. Presto支持窗口函數,但比Hive相對較少。

 

(3)Hive and Prestodb, comparison of performance

測試環境以下:



     因爲部分機器涉及應用,暫用4臺機器做爲prestodb的集羣,prestodb全部運算都在內存,因此配置大內存有助於提升prestodb的運算速度(現配置4G)。

如下爲具體的測試結果:

記錄數:169984827

DML

Hive(s)

Prestodb(s)

limit

5.493

0.05

where

49.255

0.05

count(*)

184.974

86

group by

161.633

110

sub queries

105.686

0.09

join

657.006

177

注:prestodb查詢時間只精確到秒,後帶小數忽略



 

參考資料

Prestodb官網:http://prestodb.io/

ZOL頻道:http://jishu.zol.com.cn/78874.html

 

1 Presto概覽
1.1 presto設計思想及特色
多數據源:且支持擴展

計算方式:徹底基於內存進行計算,並無使用mapReduce。

支持標準SQL:

pipeLine設計:

這個pipeLine如何理解???

 

1.2 基礎架構及執行過程
典型的主從架構,coordinator負責調度,worker上的進程負責接受調度,執行具體的task。每一個task讀入具體的split,並進行處理。

 

橫向的一條表明:不一樣階段的任務, 下面的work表明執行的實體。左邊的work負責對SqlQueryExecution進行解析,輸出的結果是SqlStageExecution。這個SqlStageExecution其實表明的是多個subPlan。其中有源數據讀取的,也有進行彙總計算的。

而後根據各個節點的狀況進行任務分發,到被分發的到節點上就是SqlTaskExecution。而後執行。

note:presto造成的邏輯執行計劃進一步進行了拆分,這個拆分是爲了基於內存的併發計算。

1.3 基本概念
模型:

connector

note: presto-main/etc/catalog/**properties

coordinator

worker

catalog:表明數據源好比tcph和hive之類

schema:表明一張二維表

 

查詢:

概念

實體

功能

 

Stagement

StagementResource

SQL語句

getQueryReuslt/createQuery

Query

QueryResource

查詢執行

getQueryReuslt/createQuery

除了語句還附加了配置信息,執行和優化信息

Stage

coordinator

ddl/dml

 

single

頂層聚合

返回結果給client

fixed

中間聚合

中間計算

source

讀取源數據

數據的scan/filter/project

Exchange

 

完成stage之間的數據交換

Exchange Client 和output Buffer

Task

 

包含一個或者多個Driver

stage拆分紅多個task能夠併發執行

每一個task有對應的輸入輸出,

每一個task處理一個或者多個split

Driver

 

Driver表示對某個split的Operator的集合,

1 一個Driver處理一個split,擁有一個輸入和輸出

2 是一個split上一系列操做的集合

3 沒有子類

Operator

Limit/Orderby/HashJoin

TableScan

表明對split的一種操做

1 好比過濾,加權,轉換

2 輸入和輸出都是Page對象

Split

 

數據分片

 

Page

 

表明小型二維表

根據字段數的大小肯定block對象去多少行。

一個Page不超過1M

否則MySQL上的split怎麼理解??

block

 

接口

數組(long/int/byte)

一個block對象存儲一個字段的若干行

1 stage是個在coordinator生成的抽象的概念,task及其如下是運行在具體的work上的。

2 task之間是流式的,task內部並非流式的

1.4 demo
https://cloud.tencent.com/developer/article/1032986

2 源碼分析
2.1 工程結構


模塊說明:

presto 客戶端代碼在presto-cli模塊 但presto服務端代碼不在presto-server,打開presto-server模塊能夠發現他採用maven的插件進行編譯打包,改模塊只有該規則文件。

 presto服務端代碼在presto-main模塊,PrestoServer類啓動服務器。

 presto-base-jdbc 關係型數據庫鏈接器的公共模塊 

presto-mysql mysql鏈接器用到了presto-base-jdbc presto-jdbc jdbc客戶端/另外一種是cli 

presto-hive-* hive鏈接器相關代碼 presto-orc hive鏈接器讀取hdfs的orc文件,

並作了一些優化 presto-ml machine learning,未實現,打算ing presto-kafka/cassadra/jmx 各類鏈接器

通常使用presto可能須要進行功能拓展以進行二次開發,好比會有一些自定義高級函數

presto_main結構:

普通函數和窗口函數集中在此。

 

2.2 執行過程
2.2.1 模塊概覽
module

class

method

note

client

presto

console.run

構建query,向server發起query

Console

executeCommand()

process(queryRunner, split.statement(), outputFormat, () -> {}, false)

queryRunner.startQuery(finalSql)

QueryRunner

new Query(startInternalQuery(session.get(), query), debug)

newStatementClient(client, session, query)

StatementClientFactory

new StatementClientV1(httpClient, session, query)

StatementClientV1

url = url.newBuilder().encodedPath("/v1/statement").build()

coordinate

StatementResource

coordinator上的重要類,爲client和worker提供restful服務

createQuery:

將query存入一個對象

SimpleLocalMemoryContext?

StatementResource

getQueryResults

cancalQuery

該構造函數從client來的時候貌似沒有傳參?構造函數中執行調度

PurgeQueriesRunnable

run

爲啥沒有執行query,下一步往哪執行

QueryResource

createQuery

爲啥到了QueryResource,從哪裏執行到該方法的。

SqlQueryManager

createQuery

建立QueryExecution

QueryExecution

(sqlQueryExecution)

start

1 分析查詢獲得執行計劃

2 建立調度器

3 scheduler.start();

SqlQueryScheduler

start

QueryExecution.submit

分發plan成task到worker上

 

TaskResource

 

 

 

...

 

 

ResultQuery

StatementResource

asyncQueryResults()

發起查詢以後就一直獲取查詢狀態

 

2.2.2 代碼執行鏈
接上圖:

Rer*********************************  statementResource生成    *********************************************************
StatementResource.createQuery    //建立並執行查詢
    Query.create(statement,...)
        Query result = new Query()  //返回結果Result
            QueryInfo queryInfo = queryManager.createQuery(sessionContext, query)
                Statement wrappedStatement = sqlParser.createStatement(query, createParsingOptions(session));
                queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement, parameters);
                resourceGroupManager.submit(statement, queryExecution, selectionContext, queryExecutor);
                    groups.get(selectionContext.getResourceGroupId()).run(queryExecution);
                        executor.execute(query::start);
                            PlanRoot plan = analyzeQuery();
                            metadata.beginQuery(getSession(), plan.getConnectors());
                            planDistribution(plan); //由plan構建task並進行分發調度計劃,這個調度計劃體如今Scheduler上
                                                                StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession()); // 得到stage的執行計劃
                                SqlQueryScheduler scheduler = new SqlQueryScheduler();//完成了sqlStageExecution和stageSchedulers的建立
                                    List<SqlStageExecution> stages = createStages();  //根據不一樣的狀況,生成對應的stage;體現爲不一樣的StageScheduler和SqlStageExecution
                                                                                SqlStageExecution stage = new SqlStageExecution() //
                                        stageSchedulers.put(stageId, SourcePartitionedScheduler||new FixedCountScheduler(stage, partitionToNode||FixedCountScheduler)); //根據handle類型
                            SqlQueryScheduler.start();//分發計劃體如今plan建立的Scheduler上
                                SqlQueryScheduler##executor.submit(this::schedule); //Async, 循環執行,直到 execution 完成.
                                    SqlQueryScheduler##stageSchedulers.get(stageId).schedule();  //使用上述 StageScheduler 的實現類來分發 Task 到工做節點
                                                                            taskScheduler.apply(entry.getValue(), entry.getKey())); //對該函數(this.taskScheduler = stage::scheduleTask;)使用這兩個參數執行
                                                                                sqlStageExecution::scheduleTask; // 在new StateSchedule中構造函數中獲取remoteTasks賦值給其屬性taskScheduler
                                          RemoteTask(HttpRemoteTask) task = remoteTaskFactory.createRemoteTask() //
                                          task.start(); //HttpRemoteTask-%s
                                            scheduleUpdate(); 
                                                HttpRemoteTask.sendUpdate(); // 異步請求
                                                     HttpClient.executeAsync(): // 發送請求到 TaskResource.createOrUpdateTask()                            
    asyncQueryResults    
StatementResource.getQueryResutl()    //  分批獲取查詢結果,client向coor不斷請求,每次得到部分結果,就是由該方法處理的。
    asyncQueryResults
note:查詢請求上的token是用來保證分批查詢結果的順序的
 
*********************************  SqlTask    *********************************************************
Reponse TaskResource.createOrUpdateTask(TaskId,TaskUpdateRequest,UriInfo) // 這個request請求的中就含有Fragment信息,後續會根據Fragment信息建立SqlTaskExecution
    TaskInfo taskInfo = taskManager.updateTask(session,...)
        sqlTask.updateTask(session, fragment, sources, outputBuffers);
            SqlTaskExecution SqlTaskExecutionFactory.create(Session session, QueryContext queryContext, TaskStateMachine taskStateMachine, OutputBuffer outputBuffer, PlanFragment fragment, List<TaskSource> sources)
                localExecutionPlan = LocalExecutionPlaner.plan( taskContext, fragment.getRoot(), fragment.getSymbols(), fragment.getPartitioningScheme(), fragment.getPipelineExecutionStrategy() == GROUPED_EXECUTION, fragment.getPartitionedSources(), outputBuffer);
                    PhysicalOperation physicalOperation = plan.accept(new Visitor(session, planGrouped), context); //physicalOperation這個持有OperatorFactory
                    context.addDriverFactory(); //將physicalOperation放到DriveFactory,其實DriveFactory也就是在localExecutionPlan中
                SqlTaskExecutionFactory.createSqlTaskExecution( taskStateMachine, taskContext, outputBuffer, sources, localExecutionPlan, taskExecutor, taskNotificationExecutor, queryMonitor); //physicalOperation在其中localExecutionPlan的DriveFactory中
                    SqlTaskExecution.createDriver(DriverContext driverContext, @Nullable ScheduledSplit partitionedSplit)   // 將physicalOperation放到了driverContext中
                        new SqlTaskExecution() //重要重要
                            taskExecutor.addTask() //添加task
 
*********************************  Task執行    *********************************************************
TaskExecutor.start() //@PostConstruct 
    ExecutorService.execute()
        TaskRunner.run()
            PrioritizedSplitRunner.process() // 
                DriverSplitRunner.processFor() // DriverSplitRunner是SqlTaskExe內部類
                    DriverSplitRunnerFactory.createDriver()
                    Driver.processFor()
                        Driver.processInternal()
                            Driver.processNewSources()
                            Operator.getOutput(); // 會根據不一樣的 sql 操做,獲得不一樣的 Operator 實現類.而後根據實現,調用對應的 connector . 該方法返回的是一個 Page, Page 至關於一張 RDBMS 的表,只不過 Page 是列存儲的. 獲取 page 的時候,會根據 [Block 類型,文件格式]等,使用相應的 Loader 來 load 取數據. 
                             Operator.addInput(); //下一個 Operator 若是須要上一個 Operator 的輸出,則會調用該方法
 

SetThreadName: 是 presto 模塊開始執行的標誌,如:

 

- try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {

 

- try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {

 

- try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {

 

- try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {

2.2.3 問題說明
PlanRoot plan = analyzeQuery(); //生成了邏輯計劃

planDistribution(plan); // 劃分stage準則,創建task,task分發策略?

 

 

 

邏輯計劃的層次以下類結構圖所示

而後經過 plan方法處理獲得一個Stage執行計劃集合,最上層是outputStage(StageExecutionPlan),每一個StageExecutionPlan內部持有

List<StageExecutionPlan>。StageExecutionPlan的這種層級結構和前面的邏輯計劃的層次很是相似,可是是否一一對應沒有肯定。

 

sqlQueryExecution

scheduler = new SqlQueryScheduler;

scheduler.start();

 

SqlQueryScheduler:

new:

createStages 建立了5個stage(sqlStageExecution),

put 同時將對應的stage封裝在對應的(根據stage的類型決定schedule的類型)StageScheduler中

start():

StageScheduler.schedule();

 

NodeScheduler.createNodeSelector

NodeSelector內部維護了NodeMap,存了active節點。

 

 

2.3 重要概念結構
2.3.1 plan、Frag及Node結構


############################################################################################################

LogicalPlanner.plan()

PlanNode root = planStatement(analysis, analysis.getStatement());

這一步的解析出來的Plan只是初步的單純SQL層面的節點樹。沒有Exchange信息。在下面的這個迭代使用優化器的過程當中,對節點進行了分析,在對應的位置補上了ExchangeNode,這個過程其實也就決定了Fragment的劃分,進一步決定了後面stage的劃分。

for (PlanOptimizer optimizer : planOptimizers) {
                root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
                requireNonNull(root, format("%s returned a null plan", optimizer.getClass().getName()));
            }
具體的是按照什麼規則去決定了哪一個位置添加ExchangeNode????

 

############################################################################################################

綜上:整體的plan fragment的內部node結構以下:

 

---mt官方博客的node圖---->

 本例示意圖

上述示意圖對應的SQL: select nationkey,sum(totalprice) from orders a left outer join customer b on a.custkey=b.custkey where nationkey > 15 group by nationkey;

planRoot表明整個邏輯計劃;

SubPlan表明邏輯計劃樹;subPlan是由Fragment和subPlan構成的。所以這條鏈條上其實都是Fragment。Fragment纔是邏輯執行計劃的核心,SubPlan是一種邏輯上的一種劃分。

PlanFragment表明一串planNode節點(通常是嵌套節點);每一個subPlan都有一個PlanFrag,所以Fragment其實表明這這個subPlan 。因此mt博客上的說的subplan的partition屬性和咱們如今的frag中的partition的屬性是一致的。Fragment表明這subPlan,本例中有五個Fragment,對應着後面五個Stage(三類,具體見下文重要概念部分),所以stage其實這創建Fragment的時候就已經肯定了。

planNode:單純的SQL解析後的節點,單純是指僅僅是SQL節點,連字段類型這種meta信息都不包括,這種meta信息在Fragment中提供了。上圖和mt官方博客的subPlan劃分在 aggr partial部分,project和Aggr node的前後順序不同。

 

2.3.2 Stage及StageScheduler


五個sqlStageExecutionsqlStage對應着前面的五個Fragment,Execution中的stageMachine持有的Fragment。

sqlStageExecutionsqlStage到底意味着什麼?fragment相對於planNode多了meta信息,sqlStageExecutionsqlStage和Frag存在對應關係,那麼sqlStageExecutionsqlStage在Frag基礎了有多了什麼?

location?memory?task?

stege如何建立task?task如何調度

DistributedExecutionPlanner.doPlan()     //遞歸調用doPlan方法,把以前的樹形結構的Plan取出單個Plan加入到dependencies中
 
return new StageExecutionPlan(currentFragment, splitSources, dependencies.build());  //構造樹形的StageExecutionPlan,StageExecutionPlan是由Fragment和子StageExecutionPlan構成的,能夠看出這個StageExecutionPlan和SubPlan的結構基本是同樣的。
 

 

//createStages方法的方法體,遞歸調用CreateStages;簡單來講就是遍歷節點數把節點上的全部的Fragment都取出來,建立SqlStageExecution。即一個Frag對應一個SqlStageExecution
//在建立SqlStageExecution的同時,也把SqlStageExecution放到schedulers中,後面在schedule中啓動。    
new SqlStageExecution
for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
            List<SqlStageExecution> subTree = createStages();
            stages.addAll(subTree);
            SqlStageExecution childStage = subTree.get(0);
            childStagesBuilder.add(childStage);
        }
以下圖所示,在new的構造函數中,執行了SqlStageExecution的scheduleTask,在這個方法使用工廠類建立了RemoteTask,並start啓動,同時將這個remoteTask返回。

 

2.3.3 Task


如上圖所示,咱們總共生成了和Fragment一一對應的5個RemoteTask。除了帶有session、Fragment信息,還肯定了執行節點。

PhysicalOperation physicalOperation = plan.accept(new Visitor(session, planGrouped), context);

在LocalExecutionPlanner中根據Frag經過Visitor遍歷生成Operator。可是爲什麼這個只執行了ScanFilterAndPro的生成。???

 

2.3.4 Driver


Driver表明着對一個split的處理,講道理這個Driver的建立代碼應該和split有所體現。

其次Driver上游的Task的內容仍是Fragment,這裏如何將Fragment轉換爲Operator,也應該有所體現。

debug 本次有46Driver

最開始主要是這種Operator,

Note:最複雜的的這個最後一個僅有一個

#############

LocalExchangeSourceOperator的輸出Page 並非sourceOperator,

source只有兩個

2.3.5 Operator結構


2.3.6 page結構


 

2.6 代碼相關框架
airlift/airlift restful服務

Guice相似spring的輕量級框架

內存管理 https://github.com/airlift/slice

 

3 工程debug
A:編譯

源碼拉下來以後

build方案1 ./mvnw clean install 執行的時候出現問題

build方案2 ./mvnw clean install -DskipTests

B: 跨數據源測試

tpch及hive

tpch自帶,安裝一個hive,執行跨數據源測試。

9 參考資料
【】https://github.com/prestodb/presto

【】https://tech.meituan.com/presto.html

【】https://blog.csdn.net/lnho2015/article/details/78628433?locationnum=9&fps=1

【Operator】https://blog.csdn.net/sinat_27545249/article/details/52450689

【示例】https://cloud.tencent.com/developer/article/1032986

相關文章
相關標籤/搜索