做者 | 白松java
【注:本文爲原創,引用轉載需與數瀾聯繫。】node
Apache Giraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google and described in a 2010 paper. Both systems are inspired by the Bulk Synchronous Parallelmodel of distributed computation introduced by Leslie Valiant. Giraph adds several features beyond the basic Pregel model, including master computation, sharded aggregators, edge-oriented input, out-of-core computation, and more. With a steady development cycle and a growing community of users worldwide, Giraph is a natural choice for unleashing the potential of structured datasets at a massive scale.apache
Giraph基於Hadoop而建,將MapReduce中Mapper進行封裝,未使用reducer。在Mapper中進行屢次迭代,每次迭代等價於BSP模型中的SuperStep。一個Hadoop Job等價於一次BSP做業。基礎結構以下圖所示。服務器
每部分的功能以下:app
1. ZooKeeper: responsible for computation statesocket
–partition/worker mapping分佈式
–global state: #superstepide
–checkpoint paths, aggregator values, statisticsoop
2. Master: responsible for coordination測試
–assigns partitions to workers
–coordinates synchronization
–requests checkpoints
–aggregates aggregator values
–collects health statuses
3. Worker: responsible for vertices
–invokes active vertices compute() function
–sends, receives and assigns messages
–computes local aggregation values
(1)實驗環境
三臺服務器:test16五、test6二、test63。test165同時是JobTracker和TaskTracker.
測試例子:官網自帶的SSSP程序,數據是本身模擬生成。
運行命令:Hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5
(2)爲節約空間,下文中全部代碼均爲核心代碼片斷。
(3)core-site.xml中hadoop.tmp.dir的路徑設爲:/home/hadoop/hadooptmp
(4)寫本文是屢次調試完成的,故文中的JobID不同,讀者可理解爲同一JobID.
(5)後續文章也遵循上述規則。
Giraph中自定義org.apache.giraph.graph.GraphMapper類來繼承Hadoop中的 org.apache.hadoop.mapreduce.Mapper<Object,Object,Object,Object>類,覆寫了setup()、map()、cleanup()和run()方法。GraphMapper類的說明以下:
「This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.」
BSP的運算邏輯被封裝在GraphMapper類中,其擁有一GraphTaskManager對象,用來管理Job的tasks。每一個GraphMapper對象都至關於BSP中的一個計算節點(compute node)。
在GraphMapper類中的setup()方法中,建立GraphTaskManager對象並調用其setup()方法進行一些初始化工做。以下:
map()方法爲空,由於全部操做都被封裝在了GraphTaskManager類中。在run()方法中調用GraphTaskManager對象的execute()方法進行BSP迭代計算。
功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.
下面講述setup()方法,代碼以下:
一、locateZookeeperClasspath(zkPathList)
找到ZK jar的本地副本,其路徑爲:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用於啓動ZooKeeper服務。
二、startZooKeeperManager(),初始化和配置ZooKeeperManager。
定義以下:
三、org.apache.giraph.zk.ZooKeeperManager 類
功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.
ZooKeeperManager類的setup()定義以下:
createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目錄下爲每一個task建立一個文件,文件內容爲空。文件名爲本機的Hostname+taskPartition,以下截圖:
運行時指定了5個workers(-w 5),再加上一個master,全部上面有6個task。
getZooKeeperServerList()方法中,taskPartition爲0的task會調用createZooKeeperServerList()方法建立ZooKeeper server List,也是建立一個空文件,經過文件名來描述Zookeeper servers。
首先獲取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目錄下文件,若是當前目錄下有文件,則把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。掃描taskDirectory目錄後,若hostNameTaskMap的size大於serverCount(等於GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT變量,定義爲1),就中止外層的循環。外層循環的目的是:由於taskDirectory下的文件每一個task文件時多個task在分佈式條件下建立的,有可能task 0在此建立server List時,別的task尚未生成後task文件。Giraph默認爲每一個Job啓動一個ZooKeeper服務,也就是說只有一個task會啓動ZooKeeper服務。
通過屢次測試,task 0老是被選爲ZooKeeper Server ,由於在同一進程中,掃描taskDirectory時,只有它對應的task 文件(其餘task的文件尚未生成好),而後退出for循環,發現hostNameTaskMap的size等於1,直接退出while循環。那麼此處就選了test162 0。
最後,建立了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0
onlineZooKeeperServers(),根據zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder來建立ZooKeeper服務進程,而後Task 0 再經過socket鏈接到ZooKeeper服務進程上,最後建立文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 來標記master任務已完成。worker一直在進行循環檢測master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0,即worker等待直到master上的ZooKeeper服務已經啓動完成。
啓動ZooKeeper服務的命令以下:
四、determineGraphFunctions()。
GraphTaskManager類中有CentralizedServiceMaster對象和CentralizedServiceWorker 對象,分別對應於master和worker。每一個BSP compute node扮演的角色斷定邏輯以下:
a) If not split master, everyone does the everything and/or running ZooKeeper.
b) If split master/worker, masters also run ZooKeeper
c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.
該斷定在GraphTaskManager 類中的靜態方法determineGraphFunctions()中定義,片斷代碼以下:
默認的,Giraph會區分master和worker。會在master上面啓動zookeeper服務,不會在worker上啓動ZooKeeper服務。那麼Task 0 就是master+ZooKeeper,其餘Tasks就是workers