hdfs寫入數據流程總結:
================================
一、經過配置文件獲取DistributedFileSystem實例
二、初始化校驗和類型和大小 ===> 類型CRC32C,大小4byte //對每一個chunk進行校驗,chunk大小512字節
三、建立namenode元數據:
在DFSOutputStream中dfsClient.namenode.create
四、使用computePacketChunkSize方法對packet和chunk進行計算 //計算每一個packet中的chunk數量(126)
五、使用DFSPacket初始化包對象
六、writeChecksumChunks寫入數據:方法,最終使用System.arrayCopy方法:
先寫入4 x 9字節的checksum
再寫入512 x 9字節的chunknode
七、waitAndQueueCurrentPacket:將數據放入dataQueue中。接着notifyAll,喚醒DataStreamer線程編程
八、DataStreamer:設置管線,而後打開datanode的傳輸流,
底層傳輸使用的是nio的非阻塞技術
protobuf串行化技術緩存
九、數據寫入成功的時候:
dataQueue.removeFirst(); //將數據隊列中的第一個數據刪除
ackQueue.addLast(one); //將此數據移動到確認隊列的末尾
dataQueue.notifyAll(); //通知DataStreamer繼續傳輸包併發
十、將數據實例化到磁盤的過程:
先把checksum和data之間的鴻溝去掉:
移動checksum數據到data數據以前
移動header數據到checksum以前app
HDFS 上傳流程
過程解析:詳解
這裏描述的 是一個256M的文件上傳過程
① 由客戶端 向 NameNode節點節點 發出請求
②NameNode 向Client返回能夠能夠存數據的 DataNode 這裏遵循 機架感應 原則分佈式
③客戶端 首先 根據返回的信息 先將 文件分塊(Hadoop2.X版本 每個block爲 128M 而以前的版本爲 64M)
④而後經過那麼Node返回的DataNode信息 直接發送給DataNode 而且是 流式寫入 同時 會複製到其餘兩臺機器
⑤dataNode 向 Client通訊 表示已經傳完 數據塊 同時向NameNode報告
⑥依照上面(④到⑤)的原理將 全部的數據塊都上傳結束 向 NameNode 報告 代表 已經傳完全部的數據塊 ide
這樣 整個HDFS上傳流程就 走完了
DFSPacket:
========================================
/**
* buf is pointed into like follows:
* (C is checksum data, D is payload data)
*
* [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
* ^ ^ ^ ^
* | checksumPos dataStart dataPos
* checksumStart函數
*
* Right before sending, we move the checksum data to immediately precede
* the actual data, and then insert the header into the buffer immediately
* preceding the checksum data, so we make sure to keep enough space in
* front of the checksum data to support the largest conceivable header.
*/oop
DataStreamer:
=========================================
DataStreamer類是對從管線發送數據包到datanode是可響應的,
從namenode獲取新的塊id和塊位置,而後開始流式傳輸包到datanodes
每一個包都有一個序列號相關聯,當block中全部的packet被髮出且收到全部的包回執
DataStreamer就關閉當前的塊ui
DFSOutputStream:
=======================================================
客戶端程序將數據經過這個流寫入到內部緩存。
數據被分割成packet,每一個包64k
一個packet由chunk組成。每一個chunk是512字節,相應的關聯一個校驗和
當客戶端程序填滿當前的packet,會填充到dataQueue(數據隊列)。
DataStreamer 線程從dataQueue(數據隊列)抓取數據,並將其經過管線發送到第一個datanode
接着將數據從dataQueue(數據隊列)移動到ackQueue(確認隊列)。當收到全部數據節點的確認回執
ResponseProcessor(響應處理器)會將數據從ackQueue(確認隊列)中移除
若是出現錯誤,全部未完成的包將從ackQueue(確認隊列)移出
經過清除錯誤數據節點的管線,生成一個新的管線
DataStreamer開始從新傳輸數據
Datanode和Namenode的VERSION文件: namenode: ================================= #Sun Mar 18 09:36:21 CST 2018 namespaceID=133742883 clusterID=CID-126a68dc-a8c1-4517-8f28-60fb6af6c269 cTime=0 storageType=NAME_NODE blockpoolID=BP-1464761855-192.168.23.101-1520907981134 layoutVersion=-63 datanode: ================================== #Sun Mar 18 09:36:47 CST 2018 storageID=DS-6068e606-1d2d-4865-aa62-1cd326ee3e64 clusterID=CID-126a68dc-a8c1-4517-8f28-60fb6af6c269 cTime=0 datanodeUuid=705f0e4e-a50b-4448-84ed-fc6e2f8d2923 storageType=DATA_NODE layoutVersion=-56
HDFS特性:
適用於存儲超大文件
適用於流式數據訪問,不具備隨機定位文件的功能
支持構建於商業硬件
不適用於低時間延遲的數據訪問
不適用於存儲海量小文件 //har、壓縮、sequenceFile
不適用於多用戶寫入和任意位置修改文件
MapReduce:
編程模型,適用於分佈式處理海量文件
App //入口函數(main)
Mapper //map
Reducer //reduce
在集羣運行MR程序:
一、修改代碼
二、將代碼打包成jar 併發送到Linux操做系統
三、建立源文件(1.txt)
四、使用命令hadoop jar myhadoop.jar com.oldboy.mr.App /1.txt /out
一、輸入路徑能夠寫文件名和文件夾名:
文件夾名稱會將文件夾下全部文件讀取,且忽略子目錄文件
二、Map數和文件數量有關:
須要將小文件進行歸檔或壓縮
hadoop archive -archiveName temp.har -p / Temp /
partition:分區,指派某種類型的key發送到某個reduce進行計算
hash分區: (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
key.hashCode() & Integer.MAX_VALUE //保證數字爲正值
% numReduceTasks //取餘數,保證範圍在0~n-1之間
自定義分區:
public class WCPartitioner extends Partitioner<Text,IntWritable> { /** * 數字到分區0,字符到分區1 */ @Override public int getPartition(Text text, IntWritable intWritable, int numPartitions) { String key = text.toString(); try { Integer.parseInt(key); return 0; } catch (Exception e) { return 1; } } }
combiner: map端的reduce: 在map端預處理時候的聚合(預聚合)