概述內容html
(1)簡介node
(2)Hive and Prestodb, comparison of functionalitymysql
(3)Hive and Prestodb, comparison of performancegit
(1)簡介github
Presto是由facebook開發的一個分佈式SQL查詢引擎, 它被設計爲用來專門進行高速、實時的數據分析。它支持標準的ANSI SQL,包括複雜查詢、聚合(aggregation)、鏈接(join)和窗口函數(window functions)。算法
Presto框架圖以下:spring
下面的架構圖中展示了簡化的Presto系統架構。客戶端(client)將SQL查詢發送到Presto的協調員(coordinator)。協調員會進行語法檢查、分析和規劃查詢計劃。計劃員(scheduler)將執行的管道組合在一塊兒,將任務分配給那些裏數據最近的節點,而後監控執行過程。客戶端從輸出段中將數據取出,這些數據是從更底層的處理段中依次取出的。sql
Presto的運行模型和Hive或MapReduce有着本質的區別。Hive將查詢翻譯成多階段的MapReduce任務,一個接着一個地運行。每個任務從磁盤上讀取輸入數據而且將中間結果輸出到磁盤上。然而Presto引擎沒有使用MapReduce。它使用了一個定製的查詢和執行引擎和響應的操做符來支持SQL的語法。除了改進的調度算法以外,全部的數據處理都是在內存中進行的。不一樣的處理端經過網絡組成處理的流水線。這樣會避免沒必要要的磁盤讀寫和額外的延遲。這種流水線式的執行模型會在同一時間運行多個數據處理段, 一旦數據可用的時候就會將數據從一個處理段傳入到下一個處理段。這樣的方式會大大的減小各類查詢的端到端響應時間。數據庫
(2)Hive and Prestodb, comparison of functionality數組
√: Yes; ×: No; Blue: The main differences between hive and presto
hive 0.11.0 |
presto 0.56 |
|
Implement |
Java |
Java |
DataType |
||
integer |
√ |
√ |
string |
√ |
√ |
floating point |
√ |
√ |
boolean |
√ |
√ |
map |
√ |
√ |
list |
√ |
√ |
struct |
√ |
√ |
uniontype |
√ |
× |
timestamp |
√ |
√ |
DDL(數據定義語言) |
||
create/alter/drop table |
√ |
× |
create view |
√ |
× |
truncate table |
√ |
× |
desc |
√ |
√ |
create index |
√ |
× |
DML(數據操做語言) |
||
load data |
√ |
× |
insert |
√ |
√ |
explain |
√ |
√ |
tablesample(基於column作bucket) |
√ |
√ |
group by |
√ |
√ |
order by |
√ |
√ |
having |
√ |
√ |
limit |
√ |
√ |
inner/left/right/full join |
√ |
√ |
union |
√ |
√ |
sub queries |
√ |
√ |
Enhanced Aggregation, Cube, Grouping and Rollup |
√ |
× |
lateral view |
√ | × |
Function |
|
|
UDF |
√ |
× |
Mathematical Functions |
√ |
√ |
String Functions |
√ |
√ |
Date and Time Functions |
√ |
√ |
Regex |
√ |
√ |
Type Conversion Functions |
√ |
× |
Conditional Functions |
√ |
√ |
Aggregate Functions |
√ |
√ |
Windowing |
√ |
√ |
Distinct |
√ |
√ |
Url |
√ |
√ |
Json |
√ |
√ |
功能上,Presto與Hive有幾個不一樣的地方,也能夠說是Presto功能不完善,畢竟Presto推出時間不長,詳見以下:
1. Presto完成沒有數據寫入功能,不能使用create語句建表(可經過CREATE TABLE tablename AS query),創建視圖、導數據。
2. Presto不支持UDF(用戶自定義函數)。
3. Presto支持窗口函數,但比Hive相對較少。
(3)Hive and Prestodb, comparison of performance
測試環境以下:
因爲部分機器涉及應用,暫用4臺機器做爲prestodb的集羣,prestodb全部運算都在內存,因此配置大內存有助於提升prestodb的運算速度(現配置4G)。
如下爲具體的測試結果:
記錄數:169984827 |
||
DML |
Hive(s) |
Prestodb(s) |
limit |
5.493 |
0.05 |
where |
49.255 |
0.05 |
count(*) |
184.974 |
86 |
group by |
161.633 |
110 |
sub queries |
105.686 |
0.09 |
join |
657.006 |
177 |
注:prestodb查詢時間只精確到秒,後帶小數忽略
參考資料
Prestodb官網:http://prestodb.io/
ZOL頻道:http://jishu.zol.com.cn/78874.html
1 Presto概覽
1.1 presto設計思想及特色
多數據源:且支持擴展
計算方式:徹底基於內存進行計算,並無使用mapReduce。
支持標準SQL:
pipeLine設計:
這個pipeLine如何理解???
1.2 基礎架構及執行過程
典型的主從架構,coordinator負責調度,worker上的進程負責接受調度,執行具體的task。每一個task讀入具體的split,並進行處理。
橫向的一條表明:不一樣階段的任務, 下面的work表明執行的實體。左邊的work負責對SqlQueryExecution進行解析,輸出的結果是SqlStageExecution。這個SqlStageExecution其實表明的是多個subPlan。其中有源數據讀取的,也有進行彙總計算的。
而後根據各個節點的狀況進行任務分發,到被分發的到節點上就是SqlTaskExecution。而後執行。
note:presto造成的邏輯執行計劃進一步進行了拆分,這個拆分是爲了基於內存的併發計算。
1.3 基本概念
模型:
connector
note: presto-main/etc/catalog/**properties
coordinator
worker
catalog:表明數據源好比tcph和hive之類
schema:表明一張二維表
查詢:
概念
實體
功能
Stagement
StagementResource
SQL語句
getQueryReuslt/createQuery
Query
QueryResource
查詢執行
getQueryReuslt/createQuery
除了語句還附加了配置信息,執行和優化信息
Stage
coordinator
ddl/dml
single
頂層聚合
返回結果給client
fixed
中間聚合
中間計算
source
讀取源數據
數據的scan/filter/project
Exchange
完成stage之間的數據交換
Exchange Client 和output Buffer
Task
包含一個或者多個Driver
stage拆分紅多個task能夠併發執行
每一個task有對應的輸入輸出,
每一個task處理一個或者多個split
Driver
Driver表示對某個split的Operator的集合,
1 一個Driver處理一個split,擁有一個輸入和輸出
2 是一個split上一系列操做的集合
3 沒有子類
Operator
Limit/Orderby/HashJoin
TableScan
表明對split的一種操做
1 好比過濾,加權,轉換
2 輸入和輸出都是Page對象
Split
數據分片
Page
表明小型二維表
根據字段數的大小肯定block對象去多少行。
一個Page不超過1M
否則MySQL上的split怎麼理解??
block
接口
數組(long/int/byte)
一個block對象存儲一個字段的若干行
1 stage是個在coordinator生成的抽象的概念,task及其如下是運行在具體的work上的。
2 task之間是流式的,task內部並非流式的
1.4 demo
https://cloud.tencent.com/developer/article/1032986
2 源碼分析
2.1 工程結構
模塊說明:
presto 客戶端代碼在presto-cli模塊 但presto服務端代碼不在presto-server,打開presto-server模塊能夠發現他採用maven的插件進行編譯打包,改模塊只有該規則文件。
presto服務端代碼在presto-main模塊,PrestoServer類啓動服務器。
presto-base-jdbc 關係型數據庫鏈接器的公共模塊
presto-mysql mysql鏈接器用到了presto-base-jdbc presto-jdbc jdbc客戶端/另外一種是cli
presto-hive-* hive鏈接器相關代碼 presto-orc hive鏈接器讀取hdfs的orc文件,
並作了一些優化 presto-ml machine learning,未實現,打算ing presto-kafka/cassadra/jmx 各類鏈接器
通常使用presto可能須要進行功能拓展以進行二次開發,好比會有一些自定義高級函數
presto_main結構:
普通函數和窗口函數集中在此。
2.2 執行過程
2.2.1 模塊概覽
module
class
method
note
client
presto
console.run
構建query,向server發起query
Console
executeCommand()
process(queryRunner, split.statement(), outputFormat, () -> {}, false)
queryRunner.startQuery(finalSql)
QueryRunner
new Query(startInternalQuery(session.get(), query), debug)
newStatementClient(client, session, query)
StatementClientFactory
new StatementClientV1(httpClient, session, query)
StatementClientV1
url = url.newBuilder().encodedPath("/v1/statement").build()
coordinate
StatementResource
coordinator上的重要類,爲client和worker提供restful服務
createQuery:
將query存入一個對象
SimpleLocalMemoryContext?
StatementResource
getQueryResults
cancalQuery
該構造函數從client來的時候貌似沒有傳參?構造函數中執行調度
PurgeQueriesRunnable
run
爲啥沒有執行query,下一步往哪執行
QueryResource
createQuery
爲啥到了QueryResource,從哪裏執行到該方法的。
SqlQueryManager
createQuery
建立QueryExecution
QueryExecution
(sqlQueryExecution)
start
1 分析查詢獲得執行計劃
2 建立調度器
3 scheduler.start();
SqlQueryScheduler
start
QueryExecution.submit
分發plan成task到worker上
TaskResource
...
ResultQuery
StatementResource
asyncQueryResults()
發起查詢以後就一直獲取查詢狀態
2.2.2 代碼執行鏈
接上圖:
Rer********************************* statementResource生成 *********************************************************
StatementResource.createQuery //建立並執行查詢
Query.create(statement,...)
Query result = new Query() //返回結果Result
QueryInfo queryInfo = queryManager.createQuery(sessionContext, query)
Statement wrappedStatement = sqlParser.createStatement(query, createParsingOptions(session));
queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement, parameters);
resourceGroupManager.submit(statement, queryExecution, selectionContext, queryExecutor);
groups.get(selectionContext.getResourceGroupId()).run(queryExecution);
executor.execute(query::start);
PlanRoot plan = analyzeQuery();
metadata.beginQuery(getSession(), plan.getConnectors());
planDistribution(plan); //由plan構建task並進行分發調度計劃,這個調度計劃體如今Scheduler上
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession()); // 得到stage的執行計劃
SqlQueryScheduler scheduler = new SqlQueryScheduler();//完成了sqlStageExecution和stageSchedulers的建立
List<SqlStageExecution> stages = createStages(); //根據不一樣的狀況,生成對應的stage;體現爲不一樣的StageScheduler和SqlStageExecution
SqlStageExecution stage = new SqlStageExecution() //
stageSchedulers.put(stageId, SourcePartitionedScheduler||new FixedCountScheduler(stage, partitionToNode||FixedCountScheduler)); //根據handle類型
SqlQueryScheduler.start();//分發計劃體如今plan建立的Scheduler上
SqlQueryScheduler##executor.submit(this::schedule); //Async, 循環執行,直到 execution 完成.
SqlQueryScheduler##stageSchedulers.get(stageId).schedule(); //使用上述 StageScheduler 的實現類來分發 Task 到工做節點
taskScheduler.apply(entry.getValue(), entry.getKey())); //對該函數(this.taskScheduler = stage::scheduleTask;)使用這兩個參數執行
sqlStageExecution::scheduleTask; // 在new StateSchedule中構造函數中獲取remoteTasks賦值給其屬性taskScheduler
RemoteTask(HttpRemoteTask) task = remoteTaskFactory.createRemoteTask() //
task.start(); //HttpRemoteTask-%s
scheduleUpdate();
HttpRemoteTask.sendUpdate(); // 異步請求
HttpClient.executeAsync(): // 發送請求到 TaskResource.createOrUpdateTask()
asyncQueryResults
StatementResource.getQueryResutl() // 分批獲取查詢結果,client向coor不斷請求,每次得到部分結果,就是由該方法處理的。
asyncQueryResults
note:查詢請求上的token是用來保證分批查詢結果的順序的
********************************* SqlTask *********************************************************
Reponse TaskResource.createOrUpdateTask(TaskId,TaskUpdateRequest,UriInfo) // 這個request請求的中就含有Fragment信息,後續會根據Fragment信息建立SqlTaskExecution
TaskInfo taskInfo = taskManager.updateTask(session,...)
sqlTask.updateTask(session, fragment, sources, outputBuffers);
SqlTaskExecution SqlTaskExecutionFactory.create(Session session, QueryContext queryContext, TaskStateMachine taskStateMachine, OutputBuffer outputBuffer, PlanFragment fragment, List<TaskSource> sources)
localExecutionPlan = LocalExecutionPlaner.plan( taskContext, fragment.getRoot(), fragment.getSymbols(), fragment.getPartitioningScheme(), fragment.getPipelineExecutionStrategy() == GROUPED_EXECUTION, fragment.getPartitionedSources(), outputBuffer);
PhysicalOperation physicalOperation = plan.accept(new Visitor(session, planGrouped), context); //physicalOperation這個持有OperatorFactory
context.addDriverFactory(); //將physicalOperation放到DriveFactory,其實DriveFactory也就是在localExecutionPlan中
SqlTaskExecutionFactory.createSqlTaskExecution( taskStateMachine, taskContext, outputBuffer, sources, localExecutionPlan, taskExecutor, taskNotificationExecutor, queryMonitor); //physicalOperation在其中localExecutionPlan的DriveFactory中
SqlTaskExecution.createDriver(DriverContext driverContext, @Nullable ScheduledSplit partitionedSplit) // 將physicalOperation放到了driverContext中
new SqlTaskExecution() //重要重要
taskExecutor.addTask() //添加task
********************************* Task執行 *********************************************************
TaskExecutor.start() //@PostConstruct
ExecutorService.execute()
TaskRunner.run()
PrioritizedSplitRunner.process() //
DriverSplitRunner.processFor() // DriverSplitRunner是SqlTaskExe內部類
DriverSplitRunnerFactory.createDriver()
Driver.processFor()
Driver.processInternal()
Driver.processNewSources()
Operator.getOutput(); // 會根據不一樣的 sql 操做,獲得不一樣的 Operator 實現類.而後根據實現,調用對應的 connector . 該方法返回的是一個 Page, Page 至關於一張 RDBMS 的表,只不過 Page 是列存儲的. 獲取 page 的時候,會根據 [Block 類型,文件格式]等,使用相應的 Loader 來 load 取數據.
Operator.addInput(); //下一個 Operator 若是須要上一個 Operator 的輸出,則會調用該方法
SetThreadName: 是 presto 模塊開始執行的標誌,如:
- try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
- try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
- try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
- try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
2.2.3 問題說明
PlanRoot plan = analyzeQuery(); //生成了邏輯計劃
planDistribution(plan); // 劃分stage準則,創建task,task分發策略?
邏輯計劃的層次以下類結構圖所示
而後經過 plan方法處理獲得一個Stage執行計劃集合,最上層是outputStage(StageExecutionPlan),每一個StageExecutionPlan內部持有
List<StageExecutionPlan>。StageExecutionPlan的這種層級結構和前面的邏輯計劃的層次很是相似,可是是否一一對應沒有肯定。
sqlQueryExecution
scheduler = new SqlQueryScheduler;
scheduler.start();
SqlQueryScheduler:
new:
createStages 建立了5個stage(sqlStageExecution),
put 同時將對應的stage封裝在對應的(根據stage的類型決定schedule的類型)StageScheduler中
start():
StageScheduler.schedule();
NodeScheduler.createNodeSelector
NodeSelector內部維護了NodeMap,存了active節點。
2.3 重要概念結構
2.3.1 plan、Frag及Node結構
############################################################################################################
LogicalPlanner.plan()
PlanNode root = planStatement(analysis, analysis.getStatement());
這一步的解析出來的Plan只是初步的單純SQL層面的節點樹。沒有Exchange信息。在下面的這個迭代使用優化器的過程當中,對節點進行了分析,在對應的位置補上了ExchangeNode,這個過程其實也就決定了Fragment的劃分,進一步決定了後面stage的劃分。
for (PlanOptimizer optimizer : planOptimizers) {
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
requireNonNull(root, format("%s returned a null plan", optimizer.getClass().getName()));
}
具體的是按照什麼規則去決定了哪一個位置添加ExchangeNode????
############################################################################################################
綜上:整體的plan fragment的內部node結構以下:
---mt官方博客的node圖---->
本例示意圖
上述示意圖對應的SQL: select nationkey,sum(totalprice) from orders a left outer join customer b on a.custkey=b.custkey where nationkey > 15 group by nationkey;
planRoot表明整個邏輯計劃;
SubPlan表明邏輯計劃樹;subPlan是由Fragment和subPlan構成的。所以這條鏈條上其實都是Fragment。Fragment纔是邏輯執行計劃的核心,SubPlan是一種邏輯上的一種劃分。
PlanFragment表明一串planNode節點(通常是嵌套節點);每一個subPlan都有一個PlanFrag,所以Fragment其實表明這這個subPlan 。因此mt博客上的說的subplan的partition屬性和咱們如今的frag中的partition的屬性是一致的。Fragment表明這subPlan,本例中有五個Fragment,對應着後面五個Stage(三類,具體見下文重要概念部分),所以stage其實這創建Fragment的時候就已經肯定了。
planNode:單純的SQL解析後的節點,單純是指僅僅是SQL節點,連字段類型這種meta信息都不包括,這種meta信息在Fragment中提供了。上圖和mt官方博客的subPlan劃分在 aggr partial部分,project和Aggr node的前後順序不同。
2.3.2 Stage及StageScheduler
五個sqlStageExecutionsqlStage對應着前面的五個Fragment,Execution中的stageMachine持有的Fragment。
sqlStageExecutionsqlStage到底意味着什麼?fragment相對於planNode多了meta信息,sqlStageExecutionsqlStage和Frag存在對應關係,那麼sqlStageExecutionsqlStage在Frag基礎了有多了什麼?
location?memory?task?
stege如何建立task?task如何調度
DistributedExecutionPlanner.doPlan() //遞歸調用doPlan方法,把以前的樹形結構的Plan取出單個Plan加入到dependencies中
return new StageExecutionPlan(currentFragment, splitSources, dependencies.build()); //構造樹形的StageExecutionPlan,StageExecutionPlan是由Fragment和子StageExecutionPlan構成的,能夠看出這個StageExecutionPlan和SubPlan的結構基本是同樣的。
//createStages方法的方法體,遞歸調用CreateStages;簡單來講就是遍歷節點數把節點上的全部的Fragment都取出來,建立SqlStageExecution。即一個Frag對應一個SqlStageExecution
//在建立SqlStageExecution的同時,也把SqlStageExecution放到schedulers中,後面在schedule中啓動。
new SqlStageExecution
for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
List<SqlStageExecution> subTree = createStages();
stages.addAll(subTree);
SqlStageExecution childStage = subTree.get(0);
childStagesBuilder.add(childStage);
}
以下圖所示,在new的構造函數中,執行了SqlStageExecution的scheduleTask,在這個方法使用工廠類建立了RemoteTask,並start啓動,同時將這個remoteTask返回。
2.3.3 Task
如上圖所示,咱們總共生成了和Fragment一一對應的5個RemoteTask。除了帶有session、Fragment信息,還肯定了執行節點。
PhysicalOperation physicalOperation = plan.accept(new Visitor(session, planGrouped), context);
在LocalExecutionPlanner中根據Frag經過Visitor遍歷生成Operator。可是爲什麼這個只執行了ScanFilterAndPro的生成。???
2.3.4 Driver
Driver表明着對一個split的處理,講道理這個Driver的建立代碼應該和split有所體現。
其次Driver上游的Task的內容仍是Fragment,這裏如何將Fragment轉換爲Operator,也應該有所體現。
debug 本次有46Driver
最開始主要是這種Operator,
‘
Note:最複雜的的這個最後一個僅有一個
#############
LocalExchangeSourceOperator的輸出Page 並非sourceOperator,
source只有兩個
2.3.5 Operator結構
2.3.6 page結構
2.6 代碼相關框架
airlift/airlift restful服務
Guice相似spring的輕量級框架
內存管理 https://github.com/airlift/slice
3 工程debug
A:編譯
源碼拉下來以後
build方案1 ./mvnw clean install 執行的時候出現問題
build方案2 ./mvnw clean install -DskipTests
B: 跨數據源測試
tpch及hive
tpch自帶,安裝一個hive,執行跨數據源測試。
9 參考資料
【】https://github.com/prestodb/presto
【】https://tech.meituan.com/presto.html
【】https://blog.csdn.net/lnho2015/article/details/78628433?locationnum=9&fps=1
【Operator】https://blog.csdn.net/sinat_27545249/article/details/52450689
【示例】https://cloud.tencent.com/developer/article/1032986