Foreman線程的run方法中的queryRequest是org.apache.drill.exec.proto.UserProtos的RunQuery
能夠認爲就是用戶輸入的查詢語句,只不過因爲是分佈式,客戶端輸入的查詢,會經過RPC在Foreman上執行
protobuffer文件的定義在drill-protocol/src/main/protobuf下,好比User.proto對應了UserProtosnode
關鍵看下run()上面的註釋. 何時被調用: 在查詢創建起來的時候
以什麼樣的方式調用: 執行線程池. 功能是什麼: 完成遠程執行
注意這個方法的結束並不表明查詢生命週期的Foreman角色的結束.express
Called by execution pool to do query setup, and kick off remote execution.
Note that completion of this function is not the end of the Foreman's role in the query's lifecycle.apache
https://tnachen.wordpress.com/2013/11/05/lifetime-of-a-query-in-drill-...
http://yangyoupeng-cn-fujitsu-com.iteye.com/blog/1974556緩存
Client服務器
The SELECT statement in particular is querying a HDFS data file, with a specific WHERE clause filtering based on the expression clause.數據結構
From the client, it submits this statement into Sqlline, which is a simple Java-based console that is able to talk to a JDBC driver, passes the SELECT statement into Optiq.app
Optiq is a library that Drill utilizes for query parsing and planning, which it provides pluggable transformation rules that can map a SQL token into any specific object you want. Optiq also embeds a cost-based query optimizer, which we utilize for it for choosing the best order of SQL operators in the statement without any other statistics. We implemented custom Optiq rules that maps specific SQL operators (WHERE, LIMIT, etc) and each rule converts its operator into our specific Logical Operator syntax that Drill understands.分佈式
This collection of Logical operators with its own configurations forms a Drill Logical plan. Drill’s logical plan sole purpose is to describe logically what work Drill needs to perform to produce the Query results, without any optimizations or distribution.ide
Once the client generates this logical plan, it looks up one of the DrillBit host/port information and passes the logical plan to that DrillBit.wordpress
Drill logical plan的惟一的目標就是Drill的數據流的工做流程,而沒有作任何的優化,和分佈式計算的分發等工做
一旦client產生了logical plan,那麼他會查詢其中一個已經配置好的DrillBit的host/port的信息,
而後將logical plan傳遞給DrillBit(這個接收查詢的DrillBit就是Foreman)
Running Logical Plan
Any DrillBit in the cluster can initiate a query, and that DrillBit becomes the Drill process that is responsible for sending back the results back to the client.
在集羣中任何一個DrillBit都能運行一個查詢,而執行查詢的DrillBit要負責將查詢結果返回給client
Once the UserServer that is listening to the User submission gets the logical plan, it passes it through Foreman that is responsible for turning this plan into an actual execution plan and submits the plan information for execution.
UserServer會監聽客戶端提交的查詢任務,一旦獲取到邏輯計劃,它會把邏輯計劃傳給Foreman.
Foreman會調優該plan,而且轉換爲實際執行的計劃,並提交該計劃的信息用於後面的執行.
Inside of Foreman, the first operation it does is to transform the logical plan into a physical plan via Drill’s Optimizer. Drill’s current version of Optimizer is very basic, which simply transforms each logical operator directly into one or more phyiscal operators without much optimization rules looking into the association of other operators.
The physical plan is simple a DAG of physical operators, and each child/parent relationship implies how data flows through the graph. As we are inspired by Google’s Dremel paper, the take away we saw that implemented which is a MPP style multi-level execution tree, where in this execution tree each node represents a different DrillBit process and they each depend on each other results for computation.
物理計劃是physical operators的有向無環圖.每一子節點或者父節點之間的關係都指明瞭數據如何在DAG圖中流動
在這個執行樹中,每個節點都表明一個不一樣的DrillBit計算過程,他們相互依賴彼此的計算結果
As we want to break this physical plan into a multi-level execution tree that involves multiple DrillBits, we first need to collect information about each physical operator. Each physical operator with the given operator configuration can return estimated stats about Network/Cpu/Memory/Disk and Row size and count. It also can return a list of preferred DrillBits that it wants to get executed on, which we call endpoint affinities.
將物理計劃分解成多個DrillBits參與的多層級的執行樹,首先要蒐集每個physical operators的信息
根據給定的操做符的配置信息,每一個physical operators會返回預估的統計信息,RowSize行的大小,Count數量
它也可以返回一個將要執行該operator的DrilBit列表,稱做距離最近/最優的端點相似HDFS中的讀操做,讀取HDFS塊時,NN會返回這個塊的DN列表,客戶端讀取離本身最近的DN的數據塊
One example Endpoint affinity is where a Parquet Scan opreator will want to initiate this query closet to where the Parquet file is stored, and it can look up the Parquet file’s Metadata information that stores the HDFS data node host and return that as a preferred endpoint if we have a DrillBit running there.
好比一個Parquet掃描操做符會在離保存着Parquet文件最近的DrillBit上面發起查詢
他能夠查詢Parquet文件的元數據信息: 元數據保存了HDFS的DN節點,並返回一個最優的endpointParquet文件是相似JSON那樣有者self-describe格式的文件,即文件自己含有schema,儘管schema是free的.
因爲Parquet保存在HDFS上,HDFS上的文件是有副本的. 而Scan操做符是要訪問文件的,
因此Scan操做符會選擇離本身這個操做符最近的DN上的Parquet文件副本時,是最優的狀況.
固然對於最優的端點的前提是這個節點安裝了DrillBit服務. 由於Drill是操做符的載體.也就是說,集羣的DrillBit服務能夠執行一個物理計劃分解出來的physical operators
physical operators能夠被集羣的多個Drillbit執行.
一般DrillBit計算節點上也運行着DN這樣的數據存儲節點,而操做符須要存儲的數據資源
因此操做符會選擇離存儲資源最近的Drillbit,這樣的Drillbit是最優的endpoint.
With the physical plan, all the stats and endpoint affinities, the Parallellizer in Foreman transforms this plan into a number of fragments. Each fragment is simply a self contained Physical plan that is designed to run on each DrillBit node. In any given Physical plan, there will be only one Root Fragment that is going to run in the initiating DrillBit, possibly one or more Leaf fragments and possibly one or more intermediate fragments.
有了物理計劃,全部的統計信息,最優端點,Foreman中的Parallellizer會將物理計劃轉換爲多個fragments.
每個Fragment自身也是一個物理計劃, 它們一樣會被分配到DrillBit節點上面運行.
任何一個物理計劃(通過Foreman轉換後的每個Fragment)只有一個RootFragment,多箇中間或Leaf Fragment.
Running Fragments
The Root fragment will be submitted to the Worker manager in its current DrillBit, the intermediate fragments will be stored in the HazelCast distributed cache, and all the leaf fragments will be sent directly to the DrillBits assigned via our BitCom through our RPC layer with Protobuf messages.
Rootfragment會被提交給它所在的當前DrillBit上的WorkerManager.中間fragment保存在Hazelcast分佈式緩存,
全部的leaf fragment會直接經過BitCom(RPC層次的東西,協議是Protobuf)發送給其餘DrillBits在WEB頁面能夠看到的是Major和MinorFragment.那麼這裏的Root,Intermediate,Leaf Fragment是怎麼YY出來的?
The Worker Manager once receives this Root Fragment starts running this plan, which always contains a Screen Operator that blocks and wait for Records to be returned. If a plan also has multiple Drillbits involved, then it will also contain a exchange operator that sets up a Receiver that waits for incoming Records from the wire.
Worker Manager一旦接受到Root Fragment就會運行這個plan,而且老是會包含一個Screen Operator,用來阻塞而且等待返回的數據.
若是該plan須要另外多個DrillBit參與,這些DrillBit組成一個wire,Worker Manager也同時會包含一個exchange operator,該exchange operator啓動了一個Receiver用以等待wire中的數據wire相似HDFS中DN組成的pipeline.當客戶端寫數據時,參與存儲的DN造成一個管道,數據包在管道中流動.
只有全部的DN節點返回ack應答給客戶端時,才認爲數據寫入成功. 這裏參與計算的DrillBit節點也同樣.
Exchange操做符相似於客戶端,只有wire中的DrillBit數據獲取完畢,返回給Receiver,才認爲計算完成.
The Leaf fragments that are sent via the wire will be executed as soon as they arrive. The fragment will be parsed into a DAG with Physical operators, and setups the execution/data flow. Each Physical operator imposes a Pull style messaging, where starting from the bottom of the tree it pulls it’s parent for Records and the parent will return an Outcome status. The operators is designed to handle each possible outcome status, which can be STOP, OK, OK_WITH_NEW_SCHEMA, NONE. Since Drill supports flexible schema, which in other words can tolerate schema changes in the same dataset, the operators needs to handle what happens when there is a new schema for this set of records. Seeing the benefits of columnar storage:http://the-paper-trail.org/blog/columnar-storage/. Drill also implemented its own in memory columnar format which we called ValueVectors. A ValueVector is simply a collection of bytes that represent a run of values within the same column. Each pull of messages in each Physical operator returns a RecordBatch that can contain one or more ValueVectors per column with schema.
經過wire被髮送的葉子Fragment一旦到達目的DrillBit就會被當即執行.葉子Fragment會被解析爲由物理操做符組成的DAG.
每個物理操做符都會利用一個Pull類型的消息機制,從樹的底部開始,operator會從他的parent operator中拉取Records,
而他的parent operator則返回一個Outcome status消息. operators被設計爲能處理每一種可能的結果狀態.
由於Drill支持動態schema,也就是說Drill容許在同一個數據集中schema發生變化,因此Drill要可以處理當schema發生變化的狀況Drill同時實現了他本身的列式內存數據結構:ValueVector,它是一個bytes集合,表明了一個column內的數據.
在每個Physical operator pull的消息中會返回一個RecordBatch: 對於每一個列都會包含一個或者多個ValueVector
At the very tip of the Leaf fragment in my slideshow example is the Scan operator, which is configured to look up a Parquet file and run it through the Parquet Storage Engine. The Storage engine is simply responsible for pulling in data from the data source, and converting them into ValueVectors and passes that back to its child as a RecordBatch.
從數據源中拉取數據,把數據轉換爲ValueVectors,而後將這些ValueVector做爲RecordBatch傳遞迴他的child
FragmentTree從底到上, 底部是Parent, 越往上就是Child. child會拉取parent的記錄.
而從上到下來看,Fragment分解爲RootFragment->Intermediate->LeafFragment.
這彷佛有點矛盾,leaf是parent,往上則是child. 而最上面又是root fragment.掃描操做符的步驟:
1.Leaf Fragment拉取數據源的數據
2.將數據轉換爲ValueVectors
3.組裝成RecordBatch
4.傳遞給它的孩子,即上一層
Eventually the Leaf fragment will take this batch, and through the Sender operator send it to the Intermediate DrillBit.
最終,全部的Leaf fragment將會接管這些batch數據,經過Sender operator發送給中間DrillBit
對於數據源,它們只暴露給物理計劃造成的全部Leaf Fragment.這些Leaf Fragment負責讀取數據.
The Intermediate DrillBit once receives a RecordBatch for the first time, will lookup the fragment from HazelCast by its fragment id from RecordBatch, and setup the Receiver and Physical opreators necessary for processing in this DrillBit.
中間fragment一旦第一次接受到一個RecordBatch,會從HazleCast中經過RecordBatch中保留的fragment id
查詢相應的fragment,而且設置Receiver以及必要的physical operator來繼續在DrillBit中進行處理計算
The intermediate fragment contains a Filtering operator, and inside this operator once it receives a RecordBatch it looks up the new schema and passes that to our CodeGeneration with the specified filter expression and type information, and generate a specific code that does the filtering. This is designed to avoid casting, run tight loops and leverage prefetching as we can eliminate function calls. This is a very common approach looking at the Hive’s new vectorized query engine via Stinger initiative or in Impala.
中間Fragment包含一個Filtering operator,在這個Filtering operator內部,一旦他接收到一個RecordBatch,他就會查找新的schema,而且將schema傳遞給CodeGeneration,同時還會傳遞一個特殊定義的filter expression,type information,藉此產生一段特殊的code來完成filter 操做。經過設計成避免casting,運行輕量級的loop,以及進行prefetching,來減小方法的調用,這種方式在Hive的新vectoried query engine(經過Stinger initiative)以及impala中很廣泛
The intermediate fragment eventually streams a batch at a time as soon as it is available to the Root DrillBit, which the Screen operator receives and returns it to the Client.
中間fragment最終會議batch爲單元,一次發送一個batch給Root DrillBit(就是Foreman),
在Root DrillBit中會由Screen operator來接收相關數據,而且返回給client(接收查詢的也負責返回查詢結果 )
DrillClient that receives the RecordBatch, simply transforms our own columnar ValueVectors into Rows and shows the result to the client.
DrillClient接收RecordBatch,簡單將ValueVector轉換成Rows而且顯示給client
This is overall what the flow looks like in our Drill alpha release, and we will be continuing to validate this architecture through diagnostic and benchmarking, and also provide more operators and storage engine so we can do much more interesting data analysis.
前面有邏輯操做符LogicalOperator接口,一樣也有物理操做符PhysicalOperator接口
咱們先看下HasAffinity有最優節點,方法getOperatorAffinity返回最優的EndPoint列表
描述了一個物理操做符對一些特定的DrillBit節點有親和性的, 用於分配決策.
//Describes a physical operator that has affinity to particular nodes. Used for assignment decisions. public interface HasAffinity extends PhysicalOperator { public List<EndpointAffinity> getOperatorAffinity(); } public class EndpointAffinity { private final DrillbitEndpoint endpoint; private double affinity = 0.0d;
EndpointAffinity captures affinity value for a given single Drillbit endpoint.
EndpointAffinity有DrillbitEndpoint的引用, 註釋中提到affinity value,因此是否是夠親和是能夠計算出來的.
初始時這個值是0,調用addAffinity()能夠給Drillbit endpoint添加一個親和力的值.
DrillBit Endpoint對象被定義爲protobuf,在Coordination.proto中:
message DrillbitEndpoint{ optional string address = 1; optional int32 user_port = 2; optional int32 control_port = 3; optional int32 data_port = 4; optional Roles roles = 5; }
Drillbit能夠認爲是Drill的計算節點. 在bin下的drillbit.sh start啓動一個Drill服務.
LogicalPlan有必定的格式:head,storageengine,query. 一樣PyhsicalPlan也有,它們的head是相同的.
PhysicalPlan的構造函數是一系列的物理操做符,而LogicalPlan的構造函數是邏輯操做符集合.目的都是構造一張DAG圖.
@JsonPropertyOrder({ "head", "graph" }) public class PhysicalPlan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlan.class); PlanProperties properties; Graph<PhysicalOperator, Root, Leaf> graph; @JsonCreator public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){ this.properties = properties; this.graph = Graph.newGraph(operators, Root.class, Leaf.class); }
在官方的設計文檔中http://drill.apache.org/docs/rpc-overview/對RPC只是寥寥數語,還有待補充.
咱們根據上面的Query流程一步步分析, 首先是客戶端提交一個查詢, 通過Optiq生成邏輯計劃後會交給DrillClient運行:
/** * Submits a Logical plan for direct execution (bypasses parsing) 提交一個邏輯計劃,直接執行 * @param plan the plan to execute * @return a handle for the query result */ public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException { UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(); ListHoldingResultsListener listener = new ListHoldingResultsListener(query); client.submitQuery(listener, query); //這個client是UserClient,而不是DrillClient了 return listener.getResults(); }
邏輯計劃封裝成RunQuery協議,監聽器ListHoldingResultsListener用於當獲取到結果後通知客戶端能夠獲取數據了.
監聽器用Future來封裝查詢的結果集合,若是結果尚未出來,調用future.get()會被阻塞,這是Java的Future語法自己的特性.
private class ListHoldingResultsListener implements UserResultsListener { private Vector<QueryDataBatch> results = new Vector<>(); private SettableFuture<List<QueryDataBatch>> future = SettableFuture.create(); private UserProtos.RunQuery query ; //提交失敗時, 在從新鏈接後, 由客戶端從新鏈接 public void queryCompleted(QueryState state) { future.set(results); } public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { results.add(result); } public List<QueryDataBatch> getResults() throws RpcException{ return future.get(); } }
UserClient做爲用戶的客戶端,會向DrillBit發送一個RUN_QUERY的請求, 發送內容在RunQuery query對象裏.
public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> { private final QueryResultHandler queryResultHandler = new QueryResultHandler(); public void submitQuery(UserResultsListener resultsListener, RunQuery query) { send(queryResultHandler.getWrappedListener(connection, resultsListener), RpcType.RUN_QUERY, query, QueryId.class); }
connection對象是客戶端創建的到服務器端的鏈接, 在UserClient的父類BasicClient的構造方法中建立的.
這裏用的是Netty, 客戶端在建立過程還綁定了多個Handler: 協議的編解碼,消息的編解碼,InboundHandler到達處理器
,客戶端握手Handler.
public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends RpcBus<T, R> { private final Bootstrap b; protected R connection; public BasicClient(...) { b = new Bootstrap() .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { connection = initRemoteConnection(ch); ch.closeFuture().addListener(getCloseHandler(ch, connection)); final ChannelPipeline pipe = ch.pipeline(); pipe.addLast("protocol-decoder", getDecoder(connection.getAllocator())); pipe.addLast("message-decoder", new RpcDecoder("c-" + rpcConfig.getName())); pipe.addLast("protocol-encoder", new RpcEncoder("c-" + rpcConfig.getName())); pipe.addLast("handshake-handler", new ClientHandshakeHandler()); pipe.addLast("message-handler", new InboundHandler(connection)); pipe.addLast("exception-handler", new RpcExceptionHandler(connection)); } }); }
send()調用最終會調用RpcBus的同名send方法, 第一個參數listener是Rpc的輸出監聽器(相對Income到達)
其中發送內容RunQuery query是protobufBody, 最後一個參數dataBodies是可選的.
// The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a system. public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> implements Closeable { protected final CoordinationQueue queue = new CoordinationQueue(16, 16); public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, boolean allowInEventLoop, ByteBuf... dataBodies) { ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection); OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies); ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m); channelFuture.addListener(futureListener); channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); }
客戶端發起的一次查詢是怎麼提交到服務端上執行: connection.getChannel().writeAndFlush(m)
客戶端經過connection的通道往服務端寫入一個Rpc消息, Rpc消息分爲到達Inboud和輸出Outbound.
OutboundRpcMessage含有protobuf協議體,以及數據部分. 協議自己只是定義了數據的格式. 真正的數據也要一塊兒發送出去.
雖然這裏是客戶端的查詢請求, dataBodies自己是沒有值的,由於在一開始調用的時候就沒有傳入這個參數.
到此爲止, 客戶端發起的一次查詢請求已經完成了, 接下去的流程交給服務端處理這個請求了.
這裏用到一個futureListener, 是爲了可以監聽服務器端是否已經把數據準備好了.
這裏的queue會將CoordinationId和RpcListener放到map中, 並在接收到數據後從map中移除.
public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection) { int i = circularInt.getNext(); RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection); Object old = map.put(i, future); return future; }
服務端的啓動方式和客戶端同樣都是經過Netty. 而且都註冊了一個InboundHandler.
由於在上一步客戶端發送REQUET請求, 因此服務器的InboundHandler可以接收到這個請求
protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> { private final C connection; public InboundHandler(C connection) { this.connection = connection; } protected void decode(final ChannelHandlerContext ctx, final InboundRpcMessage msg, final List<Object> output) throws Exception { final Channel channel = connection.getChannel(); final Stopwatch watch = new Stopwatch().start(); switch (msg.mode) { case REQUEST: { // handle message and ack. ResponseSender sender = new ResponseSenderImpl(connection, msg.coordinationId); handle(connection, msg.rpcType, msg.pBody, msg.dBody, sender); break; }
這裏InboundHandler繼承的是Netty的MessageToMessageDecoder抽象類,因此要在decode中重寫接收的邏輯
若是是繼承Netty的ChannelInboundHandlerAdapter, 則重寫的方法是channelRead, 後面這種在netty的helloworld中比較常見.
爲何須要ResponseSender, 由於服務端接收客戶端的查詢請求, 在獲取到結果後, 要負責將結果response發送send給客戶端纔算完成.
protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException{ sender.send(handle(connection, rpcType, pBody, dBody)); } protected abstract Response handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
handle是個抽象方法, 這裏由於在Server中了, 因此選擇UserServer的實現方法. 客戶端傳入的rpcType=RUN_QUERY等於下面的RUN_QUERY_VALUE
public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> { final UserWorker worker; protected Response handle(UserClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody){ switch (rpcType) { case RpcType.RUN_QUERY_VALUE: final RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody)); final QueryId queryId = worker.submitWork(connection, query); return new Response(RpcType.QUERY_HANDLE, queryId);
DrillClient將查詢交給UserClient, UserServer則將具體的查詢工做交給了UserWorker, 它的返回值也是一個QueryId協議.
最終的返回值是Response, 對應了RpcBus的sender.send(handle(...)) --> sender.send(Response)
注意服務端接受到查詢請求RUN_QUERY後, 交給worker處理, worker會當即返回這個查詢對應的QueryId. 所以也不是當即返回結果的
看下服務端的ResponseSender的實現類, 定義在RpcBus中.
private class ResponseSenderImpl implements ResponseSender { RemoteConnection connection; int coordinationId; public ResponseSenderImpl(RemoteConnection connection, int coordinationId) { super(); this.connection = connection; this.coordinationId = coordinationId; } public void send(Response r) { OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, coordinationId, r.pBody, r.dBodies); connection.getChannel().writeAndFlush(outMessage); } }
注意這裏的coordinationId是客戶端放入queue隊列中futureListener的一個編號. 而服務端返回的QueryId是r.pBody.
服務端也將OutboundRpcMessage經過connection通道寫回去. 即寫到客戶端去, 由於服務端並無將查詢結果當即計算出來,
因此須要將QueryId返回給客戶端, 並在適當的時候若是服務端獲取到結果會通知客戶端.
服務端發送的是Response, 因此如今服務端的流程已經走完了(雖然worker尚未完成,可是對於一次RPC來講是完成了), 輪到客戶端接收響應.
仍是在RpcBus的InboundHandler中. 只不過此次是客戶端接受到服務端發送的響應請求:
case RESPONSE: MessageLite m = getResponseDefaultInstance(msg.rpcType); //這裏是QueryId RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass()); //對應一開始的queue.get(...) Parser<?> parser = m.getParserForType(); Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes())); //pBody只是協議格式 rpcFuture.set(value, msg.dBody); //dBody纔是數據內容 break;
從隊列中獲取Future, 最後調用future的set方法, 將數據設置到value中, 這樣future.get()就能獲取到value.
public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) { RpcOutcome<?> rpc = map.remove(coordinationId); return (RpcOutcome<V>) rpc; }
Step | Operation |
---|---|
DrillClient.runQuery: | ListHoldingResultsListener |
UserClient.submitQuery: | 經過QueryResultHandler將ListHoldingResultsListener>>UserResultsListener封裝爲SubmissionListener>>RpcOutcomeListener |
RpcBus.send: | 將RpcOutcomeListener封裝爲ChannelListenerWithCoordinationId<>RpcOutcome |
InboundHandler.RESPONSE: | 從隊列中獲取RpcOutcome rpcFuture, 設置數據到value |
RpcListener.set: | 調用RpcOutcomeListener.success, 即SubmissionListener.success: 添加UserResultsListener負責監聽結果 |
ListHoldingResultsListener.getResults: | 從Future中獲取結果 |
Object | Why |
---|---|
ListHoldingResultsListener | 持有結果集的監聽器, 因此負責最終結果的獲取,固然獲取操做不在這裏, 它只管取數據 |
SubmissionListener | 提交監聽器, 做業提交後, 我負責執行, 可是執行的動做並不在這裏哦 |
UserResultsListener | 用戶的結果集監聽器, 這裏應該是負責結果的產生, 不過它是個接口 |
RpcListener | RPC監聽器,要和具體本次查詢的coordinationId關聯起來 |
用戶的結果集監聽器的方法表示了查詢的一個過程:
1.QueryId到達, 由服務端產生QueryId
2.數據到達, 並被監聽器成功接收到
3.查詢完畢, 監聽器不會再收到任何數據
public interface UserResultsListener { /** * QueryId is available. Called when a query is successfully submitted to the server. * @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK} */ void queryIdArrived(QueryId queryId); void submissionFailed(UserException ex); /** * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received * @param result data batch received */ void dataArrived(QueryDataBatch result, ConnectionThrottle throttle); /** * The query has completed (successsful completion or cancellation). The listener will not receive any other * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED) */ void queryCompleted(QueryState state); }