站在DataNode的視角,看看pipeline寫的流程,本文不分析客戶端部分,從客戶端寫數據以前拿到了3個可寫的block位置提及。node
每一個datanode會建立一個線程DataXceiverServer,接收上游過來的TCP鏈接,對於每一個新建的TCP鏈接,都會建立一個叫作DataXceiver的線程處理這個鏈接. 這個線程不斷的從TCP鏈接中讀op,而後調用processOp(op)處理這個op,這裏以write block 這個op爲例.數組
對於datanode來講,write block操做由DataXceiver的writeBlock函數實現.app
大致步驟以下:ide
new 一個BlockReceiver對象,隨後用於接收上游(client或者datanode)的block數據.函數
根據傳進來的DatanodeInfo數組,向數組的第一個元素表明的datanode創建TCP鏈接,targets參數是從上游的TCP鏈接中解析出來的,邏輯在Receiver的opWriteBlock方法中,Receiver是DataXceiver的基類.而後調用Sender的writeBlock方法給下游datanode發送write block相關元信息,包括DatanodeInfo數組(刨去第一個元素),clientname,block的當前gs,minBytesRcvd,maxBytesRcvd(對於append,recovery操做有用)等。而後讀取下游的回覆封裝在BlockOpResponseProto對象中,能夠經過內部成員firstBadLink知道建pipeline中第一個失敗的datanode節點。接着將BlockOpResponseProto回覆給上游
(datanode或者client),最後調用第一步new的BlockReceiver的receiveBlock方法用於接收一個完整的block.以下:oop
receiveBlock內部根據clientname發現是一個客戶端在寫block,建立一個PacketResponder線程用於處理下游datanode對packet的ack.PacketResponder後面分析。接着,不斷的調用receivePacket()方法從上游(datanode或者client)接收一個個的packet,接收一個完整的packet的邏輯是由內部的PacketReceiver來處理的.
對於一個接收到的packet,寫入block file文件,同時checksum信息寫meta文件,而後放入PacketResponder的ack queue隊列,而後將packet寫給下游的datanode。最後調用PacketResponder的 close方法,這個方法會等到ack queue爲空,即全部packet都已經從下游收到,而且已經給上游ack.線程
receiveBlock()結束後,關掉和上下游的鏈接.對象
清空ack queue的邏輯由專門處理下游ack包的PacketResponder線程處理,邏輯以下:接口
若是datanode是pipeline的中間node(經過PacketResponder的type屬性來決定,LAST_IN_PIPELINE和HAS_DOWNSTREAM_IN_PIPELINE),
那麼從下游讀一個PipelineAck,從ack中拿到seqno,而後從ack queue中get(不刪除)第一個packet,拿出seqno,記做expected_seq_no,而後比較是否相等,若是不相等,說明寫出錯. 若是seqno相同,往下.隊列
若是從ack queue中get的packet是block的最後一個packet,說明一個block接收完成.那麼調用finalizeBlock方法.finalizeBlock方法邏輯以下:
關閉block file和meta file文件,調用FsDatasetImpl的finalizeBlock(block)將block文件以及對應的meta文件移動到對應的block pool下的finalized目錄下,而後生成一個FinalizedReplica對象,將bpid->FinalizedReplica的映射關係記錄在內存中的volumnMap中,對象位於FsDatasetImpl下的ReplicaMap volumnMap(從ReplicaMap中定位一個ReplicaInfo,須要拿着bpid和block id去找)最後調用datanode的closeBlock()方法,將block回報給namenode,該方法邏輯以下:
拿着block的bpid從BlockPoolManager中拿到相應的BPOfferService,通知namenode這個block。在data node這邊,data node和每一個namenode的接口由一
個BPServiceActor來承擔,這是一個線程, 這個線程會向namenode彙報received block或者指示namenode去刪除block.最後調用DatanodeProtocolClientSideTranslatorPB bpNamenode的blockReceivedAndDeleted()將block信息彙報上去.
將packet從ack queue的頭部刪除。
能夠看出,一個block的寫操做對於每一個data node來講,由兩個線程參與,一個是DataXceiver,用於接收上游的數據,一個是PacketResponder,用於處理下游回來的ack。尚未接收到下游的ack而且沒有給上游回覆ack的packet都存在在ack queue中。
hadoop-hdfs-2.4.1.jar