Hadoop家族系列文章,主要介紹Hadoop家族產品,經常使用的項目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增長的項目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。java
從2011年開始,中國進入大數據風起雲涌的時代,以Hadoop爲表明的家族軟件,佔據了大數據處理的廣闊地盤。開源界及廠商,全部數據軟件,無一不向Hadoop靠攏。Hadoop也從小衆的高富帥領域,變成了大數據開發的標準。在Hadoop原有技術基礎之上,出現了Hadoop家族產品,經過「大數據」概念不斷創新,推出科技進步。node
做爲IT界的開發人員,咱們也要跟上節奏,抓住機遇,跟着Hadoop一塊兒雄起!git
關於做者:程序員
- 張丹(Conan), 程序員Java,R,PHP,Javascript
- weibo:@Conan_Z
- blog: http://blog.fens.me
- email: bsspirit@gmail.com
轉載請註明出處:
http://blog.fens.me/hadoop-zookeeper-case/github
前言數據庫
軟件系統集成一直是工業界的一個難題,像10年以上的遺留系統集成,公司收購後的多系統集成,全球性的分步式系統集成等。雖然基於SOA的軟件架構,從理論上均可以解決這些集成的問題,可是具體實施過程,有些集成項目過於複雜而失敗。服務器
隨着技術的創新和發展,對於分步式集羣應用的集成,有了更好的開源軟件的支持,像zookeeper就是一個不錯的分步式協做軟件平臺。本文將經過一個案例介紹Zookeeper的強大。架構
目錄app
- 項目背景:分佈式消息中間件
- 需求分析:業務系統升級方案
- 架構設計:搭建Zookeeper的分步式協做平臺
- 程序開發:基於Zookeeper的程序設計
- 程序運行
1. 項目背景:分佈式消息中間件
隨着Hadoop的普及,愈來愈多的公司開始構建本身的Hadoop系統。有時候,公司內部的不一樣部門或不一樣的團隊,都有本身的Hadoop集羣。這種多集羣的方式,既能讓每一個團隊擁有個性化的Hadoop,又能避免大集羣的高度其中化運維難度。當數據量不是特別巨大的時候,小型集羣會有不少適用的場合。運維
固然,多個小型集羣也有缺點,就是資源配置可能形成浪費。每一個團隊的Hadoop集羣,都要配有服務器和運維人員。有些能力強的團隊,構建的hadoop集羣,能夠達到真正的個性化要求;而有一些能力比較差的團隊,搭建的Hadoop集羣性能會比較糟糕。
還有一些時候,多個團隊須要共同完成一個任務,好比,A團隊經過Hadoop集羣計算的結果,交給B團隊繼續工做,B完成了本身任務再交給C團隊繼續作。這就有點像業務系統的工做流同樣,一環一環地傳下去,直到最後一部分完成。
在業務系統中,咱們常常會用SOA的架構來解決這種問題,每一個團隊在ESB服務器上部署本身的服務,而後經過消息中間件完成調度任務。對於分步式的多個Hadoop集羣系統的協做,一樣能夠用這種架構來作,只要把消息中間件引擎換成支持分步式的消息中間件的引擎就好了。
Zookeeper就能夠作爲 分步式消息中間件,來完成上面的說的業務需求。ZooKeeper是Hadoop家族的一款高性能的分佈式協做的產品,是一個爲分佈式應用所設計的分佈的、開源的協調服務,它主要是用來解決分佈式應用中常常遇到的一些數據管理問題,簡化分佈式應用協調及其管理的難度,提供高性能的分佈式服務。Zookeeper的安裝和使用,請參考文章 ZooKeeper僞分佈式集羣安裝及使用。
ZooKeeper提供分佈式協做服務,並不須要依賴於Hadoop的環境。
2. 需求分析:業務系統升級方案
下面我將從一個案例出發,來解釋如何進行分步式協做平臺的系統設計。
2.1 案例介紹
某大型軟件公司,從事領域爲供應鏈管理,主要業務包括了 採購管理、應付帳款管理、應收帳款管理、供應商反覆管理、退貨管理、銷售管理、庫存管理、電子商務、系統集成等。
每塊業務的邏輯都很複雜,由單獨部門進行軟件開發和維護,部門之間的系統沒有直接通訊需求,每一個部門完成本身的功能就好了,最後經過數據庫來共享數據,實現各功能之間的數據交換。
隨着業務的發展,客戶對響應速度要求愈來愈高,經過數據庫來共享數據的方式,已經達不到信息交換的要求,系統進行了第一次升級,經過企業服務總線(ESB)統一管理公司內部全部業務。經過WebServices發佈服務,經過Message Queue實現業務功能的調度。
公司業務規模繼續擴大,跨國收購了多家公司。業務系統從原來的一個機房的集中式部署,變成了全球性的多機房的分步式部署。這時,Message Queue已經不能知足多機房跨地域的業務系統的功能需求了,須要一種分步式的消息中間件解決方案,來代替原有消息中間件的服務。
系統進行了第二次升級,採用Zookeeper做爲分步式中間件調度引擎。
經過上面的描述,咱們能夠看出,當一個公司從小到大,從國內業務發展到全球性業務的時候。
爲了配合業務發展,IT系統也是愈來愈複雜的,從最先的主從數據庫設計,到ESB企業系統總線的擴展,再到分步式ESB配合分步式消息系統,每一次的升級都須要軟件技術的支撐。
2.2 功能需求
全球性採購業務和全球性銷售業務,讓公司在市場中處於競爭優點。但因爲採購和銷售分別是由不一樣部門進行的軟件開發和維護,並且業務往來也在不一樣的國家和地區。因此在每個月底結算時,工做量都特別大。
好比,計算利潤表 (請不要糾結於公式的準確性)
當月利潤 = 當月銷售金額 - 當月採購金額 - 當月其餘支出
這樣一個很是簡單的計算公式,但對於跨國公司和部門來講,一點也不簡單的。
從系統角度來看,採購部門要統計採購數據(海量數據),銷售部門統計銷售數據((海量數據),其餘部門統計的其餘費用支出(彙總的少許數據),最後系統計算獲得當月的利潤。
這裏要說明的是,採購系統是單獨的系統,銷售是另外單獨的系統,及以其餘幾十個大大小小的系統,如何能讓多個系統,配合起來作這道計算題呢??
3. 架構設計:搭建Zookeeper的分步式協做平臺
接下來,咱們基於zookeeper來構建一個分步式隊列的應用,來解決上面的功能需求。下面內容,排除了ESB的部分,只保留zookeeper進行實現。
- 採購數據,爲海量數據,基於Hadoop存儲和分析。
- 銷售數據,爲海量數據,基於Hadoop存儲和分析。
- 其餘費用支出,爲少許數據,基於文件或數據庫存儲和分析。
咱們設計一個同步隊列,這個隊列有3個條件節點,分別對應採購(purchase),銷售(sell),其餘費用(other)3個部分。當3個節點都被建立後,程序會自動觸發計算利潤,並建立利潤(profit)節點。上面3個節點的建立,無順序要求。每一個節點只能被建立一次。
系統環境
- 2個獨立的Hadoop集羣
- 2個獨立的Java應用
- 3個Zookeeper集羣節點
圖標解釋:
- Hadoop App1,Hadoop App2 是2個獨立的Hadoop集羣應用
- Java App3,Java App4 是2個獨立的Java應用
- zk1,zk2,zk3是ZooKeeper集羣的3個鏈接點
- /queue,是znode的隊列目錄,假設隊列長度爲3
- /queue/purchase,是znode隊列中,1號排對者,由Hadoop App1提交,用於統計採購金額。
- /queue/sell,是znode隊列中,2號排對者,由Hadoop App2提交,用於統計銷售金額。
- /queue/other,是znode隊列中,3號排對者,由Java App3提交,用於統計其餘費用支出金額。
- /queue/profit,當znode隊列中滿了,觸發建立利潤節點。
- 當/qeueu/profit被建立後,app4被啓動,全部zk的鏈接通知同步程序(紅色線),隊列已完成,全部程序結束。
補充說明:
- 建立/queue/purchase,/queue/sell,/queue/other目錄時,沒有先後順序,程序提交後,/queue目錄下會生成對應該子目錄
- App1能夠經過zk2提交,App2也可經過zk3提交。原則上,找最近路由最近的znode節點提交。
- 每一個應用不能重複提出,直到3個任務都提交,計算利潤的任務纔會被執行。
- /queue/profit被建立後,zk的應用會監聽到這個事件,通知應用,隊列已完成!
這裏的同步隊列的架構更詳細的設計思路,請參考文章 ZooKeeper實現分佈式隊列Queue
4. 程序開發:基於Zookeeper的程序設計
最終的功能需求:計算2013年01月的利潤。
4.1 實驗環境
在真正企業開發時,咱們的實驗環境應該與需求是一致的,但個人硬件條件有限,因些作了一個簡化的環境設置。
- 把zookeeper的徹底分步式部署的3臺服務器集羣節點的,改成一臺服務器上3個集羣節點。
- 把2個獨立Hadoop集羣,改成一個集羣的2個獨立的MapReduce任務。
開發環境:
- Win7 64bit
- JDK 1.6
- Maven3
- Juno Service Release 2
- IP:192.168.1.10
Zookeeper服務器環境:
- Linux Ubuntu 12.04 LTS 64bit
- Java 1.6.0_29
- Zookeeper: 3.4.5
- IP: 192.168.1.201
- 3個集羣節點
Hadoop服務器環境:
- Linux Ubuntu 12.04 LTS 64bit
- Java 1.6.0_29
- Hadoop: 1.0.3
- IP: 192.168.1.210
4.2 實驗數據
3組實驗數據:
- 採購數據,purchase.csv
- 銷售數據,sell.csv
- 其餘費用數據,other.csv
4.2.1 採購數據集
一共4列,分別對應 產品ID,產品數量,產品單價,採購日期。
1,26,1168,2013-01-08 2,49,779,2013-02-12 3,80,850,2013-02-05 4,69,1585,2013-01-26 5,88,1052,2013-01-13 6,84,2363,2013-01-19 7,64,1410,2013-01-12 8,53,910,2013-01-11 9,21,1661,2013-01-19 10,53,2426,2013-02-18 11,64,2022,2013-01-07 12,36,2941,2013-01-28 13,99,3819,2013-01-19 14,64,2563,2013-02-16 15,91,752,2013-02-05 16,65,750,2013-02-04 17,19,2426,2013-02-23 18,19,724,2013-02-05 19,87,137,2013-01-25 20,86,2939,2013-01-14 21,92,159,2013-01-23 22,81,2331,2013-03-01 23,88,998,2013-01-20 24,38,102,2013-02-22 25,32,4813,2013-01-13 26,36,1671,2013-01-19 //省略部分數據
4.2.2 銷售數據集
一共4列,分別對應 產品ID,銷售數量,銷售單價,銷售日期。
1,14,1236,2013-01-14 2,19,808,2013-03-06 3,26,886,2013-02-23 4,23,1793,2013-02-09 5,27,1206,2013-01-21 6,27,2648,2013-01-30 7,22,1502,2013-01-19 8,20,1050,2013-01-18 9,13,1778,2013-01-30 10,20,2718,2013-03-14 11,22,2175,2013-01-12 12,16,3284,2013-02-12 13,30,4152,2013-01-30 14,22,2770,2013-03-11 15,28,778,2013-02-23 16,22,874,2013-02-22 17,12,2718,2013-03-22 18,12,747,2013-02-23 19,27,172,2013-02-07 20,27,3282,2013-01-22 21,28,224,2013-02-05 22,26,2613,2013-03-30 23,27,1147,2013-01-31 24,16,141,2013-03-20 25,15,5343,2013-01-21 26,16,1887,2013-01-30 27,12,2535,2013-01-12 28,16,469,2013-01-07 29,29,2395,2013-03-30 30,17,1549,2013-01-30 31,25,4173,2013-03-17 //省略部分數據
4.2.3 其餘費用數據集
一共2列,分別對應 發生日期,發生金額
2013-01-02,552 2013-01-03,1092 2013-01-04,1794 2013-01-05,435 2013-01-06,960 2013-01-07,1066 2013-01-08,1354 2013-01-09,880 2013-01-10,1992 2013-01-11,931 2013-01-12,1209 2013-01-13,1491 2013-01-14,804 2013-01-15,480 2013-01-16,1891 2013-01-17,156 2013-01-18,1439 2013-01-19,1018 2013-01-20,1506 2013-01-21,1216 2013-01-22,2045 2013-01-23,400 2013-01-24,1795 2013-01-25,1977 2013-01-26,1002 2013-01-27,226 2013-01-28,1239 2013-01-29,702 2013-01-30,1396 //省略部分數據
4.3 程序設計
咱們要編寫5個文件:
- 計算採購金額,Purchase.java
- 計算銷售金額,Sell.java
- 計算其餘費用金額,Other.java
- 計算利潤,Profit.java
- Zookeeper的調度,ZookeeperJob.java
4.3.1 計算採購金額
採購金額,是基於Hadoop的MapReduce統計計算。
public class Purchase { public static final String HDFS = "hdfs://192.168.1.210:9000"; public static final Pattern DELIMITER = Pattern.compile("[\t,]"); public static class PurchaseMapper extends Mapper<longwritable, text,="" intwritable=""> { private String month = "2013-01"; private Text k = new Text(month); private IntWritable v = new IntWritable(); private int money = 0; public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { System.out.println(values.toString()); String[] tokens = DELIMITER.split(values.toString()); if (tokens[3].startsWith(month)) {// 1月的數據 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//單價*數量 v.set(money); context.write(k, v); } } } public static class PurchaseReducer extends Reducer<text, intwritable,="" text,="" intwritable=""> { private IntWritable v = new IntWritable(); private int money = 0; @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (IntWritable line : values) { // System.out.println(key.toString() + "\t" + line); money += line.get(); } v.set(money); context.write(null, v); System.out.println("Output:" + key + "," + money); } } public static void run(Map<string, string=""> path) throws IOException, InterruptedException, ClassNotFoundException { JobConf conf = config(); String local_data = path.get("purchase"); String input = path.get("input"); String output = path.get("output"); // 初始化purchase HdfsDAO hdfs = new HdfsDAO(HDFS, conf); hdfs.rmr(input); hdfs.mkdirs(input); hdfs.copyFile(local_data, input); Job job = new Job(conf); job.setJarByClass(Purchase.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(PurchaseMapper.class); job.setReducerClass(PurchaseReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } public static JobConf config() {// Hadoop集羣的遠程配置信息 JobConf conf = new JobConf(Purchase.class); conf.setJobName("purchase"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); return conf; } public static Map<string,string> path(){ Map<string, string=""> path = new HashMap<string, string="">(); path.put("purchase", "logfile/biz/purchase.csv");// 本地的數據文件 path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFS的目錄 path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // 輸出目錄 return path; } public static void main(String[] args) throws Exception { run(path()); } }
4.3.2 計算銷售金額
銷售金額,是基於Hadoop的MapReduce統計計算。
public class Sell { public static final String HDFS = "hdfs://192.168.1.210:9000"; public static final Pattern DELIMITER = Pattern.compile("[\t,]"); public static class SellMapper extends Mapper<longwritable, text,="" intwritable=""> { private String month = "2013-01"; private Text k = new Text(month); private IntWritable v = new IntWritable(); private int money = 0; public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException { System.out.println(values.toString()); String[] tokens = DELIMITER.split(values.toString()); if (tokens[3].startsWith(month)) {// 1月的數據 money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//單價*數量 v.set(money); context.write(k, v); } } } public static class SellReducer extends Reducer<text, intwritable,="" text,="" intwritable=""> { private IntWritable v = new IntWritable(); private int money = 0; @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (IntWritable line : values) { // System.out.println(key.toString() + "\t" + line); money += line.get(); } v.set(money); context.write(null, v); System.out.println("Output:" + key + "," + money); } } public static void run(Map<string, string=""> path) throws IOException, InterruptedException, ClassNotFoundException { JobConf conf = config(); String local_data = path.get("sell"); String input = path.get("input"); String output = path.get("output"); // 初始化sell HdfsDAO hdfs = new HdfsDAO(HDFS, conf); hdfs.rmr(input); hdfs.mkdirs(input); hdfs.copyFile(local_data, input); Job job = new Job(conf); job.setJarByClass(Sell.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(SellMapper.class); job.setReducerClass(SellReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } public static JobConf config() {// Hadoop集羣的遠程配置信息 JobConf conf = new JobConf(Purchase.class); conf.setJobName("purchase"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); return conf; } public static Map<string,string> path(){ Map<string, string=""> path = new HashMap<string, string="">(); path.put("sell", "logfile/biz/sell.csv");// 本地的數據文件 path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目錄 path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 輸出目錄 return path; } public static void main(String[] args) throws Exception { run(path()); } }
4.3.3 計算其餘費用金額
其餘費用金額,是基於本地文件的統計計算。
public class Other { public static String file = "logfile/biz/other.csv"; public static final Pattern DELIMITER = Pattern.compile("[\t,]"); private static String month = "2013-01"; public static void main(String[] args) throws IOException { calcOther(file); } public static int calcOther(String file) throws IOException { int money = 0; BufferedReader br = new BufferedReader(new FileReader(new File(file))); String s = null; while ((s = br.readLine()) != null) { // System.out.println(s); String[] tokens = DELIMITER.split(s); if (tokens[0].startsWith(month)) {// 1月的數據 money += Integer.parseInt(tokens[1]); } } br.close(); System.out.println("Output:" + month + "," + money); return money; } }
4.3.4 計算利潤
利潤,經過zookeeper分步式自動調度計算利潤。
public class Profit { public static void main(String[] args) throws Exception { profit(); } public static void profit() throws Exception { int sell = getSell(); int purchase = getPurchase(); int other = getOther(); int profit = sell - purchase - other; System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit); } public static int getPurchase() throws Exception { HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config()); return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim()); } public static int getSell() throws Exception { HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config()); return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim()); } public static int getOther() throws IOException { return Other.calcOther(Other.file); } }
4.3.5 Zookeeper調度
調度,經過構建分步式隊列系統,自動化程序代替人工操做。
public class ZooKeeperJob { final public static String QUEUE = "/queue"; final public static String PROFIT = "/queue/profit"; final public static String PURCHASE = "/queue/purchase"; final public static String SELL = "/queue/sell"; final public static String OTHER = "/queue/other"; public static void main(String[] args) throws Exception { if (args.length == 0) { System.out.println("Please start a task:"); } else { doAction(Integer.parseInt(args[0])); } } public static void doAction(int client) throws Exception { String host1 = "192.168.1.201:2181"; String host2 = "192.168.1.201:2182"; String host3 = "192.168.1.201:2183"; ZooKeeper zk = null; switch (client) { case 1: zk = connection(host1); initQueue(zk); doPurchase(zk); break; case 2: zk = connection(host2); initQueue(zk); doSell(zk); break; case 3: zk = connection(host3); initQueue(zk); doOther(zk); break; } } // 建立一個與服務器的鏈接 public static ZooKeeper connection(String host) throws IOException { ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() { // 監控全部被觸發的事件 public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) { System.out.println("Queue has Completed!!!"); } } }); return zk; } public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException { System.out.println("WATCH => " + PROFIT); zk.exists(PROFIT, true); if (zk.exists(QUEUE, false) == null) { System.out.println("create " + QUEUE); zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { System.out.println(QUEUE + " is exist!"); } } public static void doPurchase(ZooKeeper zk) throws Exception { if (zk.exists(PURCHASE, false) == null) { Purchase.run(Purchase.path()); System.out.println("create " + PURCHASE); zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { System.out.println(PURCHASE + " is exist!"); } isCompleted(zk); } public static void doSell(ZooKeeper zk) throws Exception { if (zk.exists(SELL, false) == null) { Sell.run(Sell.path()); System.out.println("create " + SELL); zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { System.out.println(SELL + " is exist!"); } isCompleted(zk); } public static void doOther(ZooKeeper zk) throws Exception { if (zk.exists(OTHER, false) == null) { Other.calcOther(Other.file); System.out.println("create " + OTHER); zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { System.out.println(OTHER + " is exist!"); } isCompleted(zk); } public static void isCompleted(ZooKeeper zk) throws Exception { int size = 3; List children = zk.getChildren(QUEUE, true); int length = children.size(); System.out.println("Queue Complete:" + length + "/" + size); if (length >= size) { System.out.println("create " + PROFIT); Profit.profit(); zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); for (String child : children) {// 清空節點 zk.delete(QUEUE + "/" + child, -1); } } } }
5. 運行程序
最後,咱們運行整個的程序,包括3個部分。
- zookeeper服務器
- hadoop服務器
- 分步式隊列應用
5.1 啓動zookeeper服務
啓動zookeeper服務器集羣:
~ cd toolkit/zookeeper345 # 啓動zk集羣3個節點 ~ bin/zkServer.sh start conf/zk1.cfg ~ bin/zkServer.sh start conf/zk2.cfg ~ bin/zkServer.sh start conf/zk3.cfg ~ jps 4234 QuorumPeerMain 5002 Jps 4275 QuorumPeerMain 4207 QuorumPeerMain
查看zookeeper集羣中,各節點的狀態
# 查看zk1節點狀態 ~ bin/zkServer.sh status conf/zk1.cfg JMX enabled by default Using config: conf/zk1.cfg Mode: follower # 查看zk2節點狀態,zk2爲leader ~ bin/zkServer.sh status conf/zk2.cfg JMX enabled by default Using config: conf/zk2.cfg Mode: leader # 查看zk3節點狀態 ~ bin/zkServer.sh status conf/zk3.cfg JMX enabled by default Using config: conf/zk3.cfg Mode: follower
啓動zookeeper客戶端:
~ bin/zkCli.sh -server 192.168.1.201:2181 # 查看zk [zk: 192.168.1.201:2181(CONNECTED) 0] ls / [queue, queue-fifo, zookeeper] # /queue路徑無子目錄 [zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue []
5.2 啓動Hadoop服務
~ hadoop/hadoop-1.0.3 ~ bin/start-all.sh ~ jps 25979 JobTracker 26257 TaskTracker 25576 DataNode 25300 NameNode 12116 Jps 25875 SecondaryNameNode
5.3 啓動分步式隊列ZookeeperJob
5.3.1 啓動統計採購數據程序,設置啓動參數1
只顯示用戶日誌,忽略系統日誌。
WATCH => /queue/profit /queue is exist! Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase Output:2013-01,9609887 create /queue/purchase Queue Complete:1/3
在zk中查看queue目錄
[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue [purchase]
5.3.2 啓動統計銷售數據程序,設置啓動參數2
只顯示用戶日誌,忽略系統日誌。
WATCH => /queue/profit /queue is exist! Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell Output:2013-01,2950315 create /queue/sell Queue Complete:2/3
在zk中查看queue目錄
[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue [purchase, sell]
5.3.3 啓動統計其餘費用數據程序,設置啓動參數3
只顯示用戶日誌,忽略系統日誌。
WATCH => /queue/profit /queue is exist! Output:2013-01,34193 create /queue/other Queue Complete:3/3 create /queue/profit cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000 2950315 cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000 9609887 Output:2013-01,34193 profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765 Queue has Completed!!!
在zk中查看queue目錄
[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue [profit]
在最後一步,統計其餘費用數據程序運行後,從日誌中看到3個條件節點都已知足要求。而後,經過同步的分步式隊列自動啓動了計算利潤的程序,並在日誌中打印了2013年1月的利潤爲-6693765。
本文介紹的源代碼,已上傳到github:https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myzk/hadoop
經過這個複雜的實驗,咱們成功地用zookeeper實現了分步式隊列,並應用到了業務中。固然,實驗中也有一些不是特別的嚴謹的地方,請同窗邊作邊思考。