kafka+flume-ng+hdfs 整合

Kafka java

   由LinkedIn於2010年12月(https://thenewstack.io/streaming-data-at-linkedin-apache-kafka-reaches-1-1-trillion-messages-per-day/)開源出來一個消息的發佈/訂閱系統,用scala實現;版本從0.05到如今0.10.2.0(2017-02-25)node

  

   系統中,生產者(producer)主動向集羣某個topic發送(push)消息(message);消費者(consumer)以組(group)爲單位訂閱topic,當消費者啓動消費程序以後,若是集羣中有未消費完的或者新的消息,則實時的拉取(pull)消息到消費者本地處理。數據庫

   集羣是以topic爲單位管理和存儲消息的。既然是集羣,就利用集羣的優點;將topic分紅多個分片(partition),生產者發送的消息存儲在各個分片上。對應的多是不一樣節點的不一樣本地磁盤。每一個分片能夠設置多個副本(replicated)用來確保數據的容錯性。apache

   並且每一個分片上的數據都是先後有序的;對應的就是後面的消息追加的文件中去;這種場景就可以利用磁盤順序讀寫的特性。api

   

   固然消費者消費也是有序消費的;偏移量(offset)從小到大順序消費。服務器

   

   總體效果就是架構

   

   磁盤順序讀寫 內存隨機讀寫app

特色dom

  分佈式:由zookeeper管理,能夠啓動多個broker-server;以集羣的方式給生產環境提供穩定的服務。maven

  容錯性:大部分分佈式都具備的。

      1.只要有一個正常的broker-server,集羣就能正常運行。

      2.能夠設置爲Topic的partition設置副本,確保就算一臺機器的磁盤壞了;也不影響數據消費

  負載問題:

      1.生產者發送消息能夠指定規則,發送到不一樣的partition上。

      2.topic中全部partition選取一個對外提供服務的leader;若是leader宕掉了,從後選中選取下一個。

  可擴展性:新增broker很是方便。

生產者樣例代碼

 1 import java.util.Properties
 2 import kafka.producer.ProducerConfig
 3 import kafka.javaapi.producer.Producer
 4 import java.util.Random
 5 import java.util.Date
 6 import kafka.producer.KeyedMessage
 7 import kafka.producer.Partitioner
 8 import kafka.utils.VerifiableProperties
 9 import org.apache.log4j.PropertyConfigurator
10 import java.util.concurrent.TimeUnit
11 import java.text.SimpleDateFormat
12 import org.tx.common.BIConstant
13 
14 /**
15  * @date    2015-10-27 16:54:19
16  */
17 object FirstKafkaProducer {
18   
19   PropertyConfigurator.configure("etc/log4j.properties");
20   
21   def main(args: Array[String]): Unit = {
22     
23     
24 //    val Array(interval,records) = args
25     val (interval,records) = (1,1)
26     val props = new Properties()
27 //        props.put("metadata.broker.list", "own:9092,own:9093,own:9094")
28         props.put("metadata.broker.list", "hdpc1-dn003:9092")
29         props.put("serializer.class", "kafka.serializer.StringEncoder")
30         props.put("partitioner.class", "org.henry.scala.scalamvn.SimplePartitioner")
31 //        props.put("request.required.acks", "-1")
32     val config = new ProducerConfig(props);
33     val producer = new Producer[String, String](config)
34     val sdf = new SimpleDateFormat(BIConstant.DATE_SDF)
35     
36     for (i <- 1 to records.toInt) {
37       val rnd = new Random();
38       val runtime = new Date();
39       val ip = rnd.nextInt(255).toString();
40 //      val msg = runtime + ",www.example.com," + ip;
41       val msg = "1001|2|2|"+runtime.getTime
42       println(" *** "+msg)
43       val data = new KeyedMessage[String, String]("mytopic", ip, msg)
44       TimeUnit.SECONDS.sleep(interval.toInt)
45       producer.send(data)
46     }
47     producer.close
48   }
49 }
50 
51 class SimplePartitioner(props:VerifiableProperties) extends Partitioner {
52   
53   override def partition(key: Any, numPartitions: Int): Int = {
54     var partition = 0;
55     val stringKey =  key.toString();
56     val offset = stringKey.lastIndexOf('.');
57     if (offset > 0) {
58        partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
59     }
60     partition;
61   }
62 }
View Code

消費者樣例代碼(https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) 

 1 package com.test.groups;
 2  
 3 import kafka.consumer.ConsumerIterator;
 4 import kafka.consumer.KafkaStream;
 5  
 6 public class ConsumerTest implements Runnable {
 7     private KafkaStream m_stream;
 8     private int m_threadNumber;
 9  
10     public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
11         m_threadNumber = a_threadNumber;
12         m_stream = a_stream;
13     }
14  
15     public void run() {
16         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
17         while (it.hasNext())
18             System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
19         System.out.println("Shutting down Thread: " + m_threadNumber);
20     }
21 }
View Code

 

flume-ng

   由Cloudera於2010年5月開源出來,在2010年7月加入Cloudera Hadoop的發行版本CDH3b2(http://blog.cloudera.com/blog/2010/07/whats-new-in-cdh3b2-flume/)中,用Java開發的。

   開始是flume-OG,一直到2011年10月,最後版本是0.94.0。後面對核心組件/代碼架構的進行里程碑式的重構,就有了flume-ng;並開源到apache。

   flume OG:agent採集數據,而後發送到collector;collector聚集後存儲到目的地。

   

   flume NG最小單位的架構;只有agent角色,分爲三個步驟來接收和傳輸數據

     Source(數據採集): 默認實現了從不一樣數據源接受數據。如Avro/Kafka/Netcat/Http等等;也能夠根據具體需求擴展實現source

     Channel(數據臨時存儲的地方): 接受source的數據,可選擇持久化到數據庫或者本地磁盤;確保sink處理完數據後,刪除;保證數據完整性。

     Sink(數據存儲目的地):數據存儲介質的實現。可選擇HDFS/kafka/local file等等。

   在整個流程中,接收到的每條數據被封裝成Event來進行傳遞和處理的。

     

   

特色

   1.部署,配置簡單方便;通用

   2.中間對數據作了臨時存儲,確保數據不丟失

 

整合

   那麼將這幾個有關聯的開源組件爲我所用;並且還要考慮到後期開發調試方便,版本管理;部署到生產環境下的時候操做簡單,可維護性好;可以監控JVM狀態。就須要進行自動化部署的改造,並且公司有成熟可靠的解決方案。那麼一切就瓜熟蒂落了。

   總體思路是:使用maven將項目依賴的jar包/啓動腳本,簡單的配置打包成tar包。具體的配置項執行啓動腳本,調用Main方法後從公共的配置中心加載。

   在此次改造中,主要的任務有兩大塊。

     1.理解,使用如今成熟的自動化部署步驟、過程。

     2.確保功能正常的狀況下,將原flume的conf下面配置文件搬移到配置中心的事情。

   基本流程是:

     將代碼提交到Git進行管理,使用jekins獲取Git代碼打成tar包。tar包裏面基本包含bin,conf,lib目錄。bin下面存聽任務啓動中止腳本,conf下面存放簡單的配置參數,lib下面存放項目依賴的jar和自身jar包。發佈到私服;

     服務器下載tar包,解壓;啓動bin下面start.sh腳本啓動應用。應用啓動時會將配套的日誌、JMX監控服務註冊,啓動;再從配置中心獲取詳細的參數配置;啓動目標程序。

   

    以本次flume的使用場景做爲例子,具體作了哪些事。

    因爲咱們的數據源選取爲Kafka,存儲介質是HDFS。因此數據傳輸流就是kafka(source)-->local file(channel)-->HDFS(sink)

    而原生態的flume啓動入口是使用腳本調用org.apache.flume.node.Application(Main方法入口程序),因此改造的切入點就是這裏。

    調用node.Application時,額外傳入參數

-Djava-container=com.appleframework.flume.node.Application
-Dconfig-factory=com.appleframework.config.PropertyConfigurerFactory

    其中PropertyConfigurerFactory是讀取、加載system.properties指向配置中心的詳細配置參數的做用。Application主要是增長從公共配置中加載參數的方法。其餘地方保持統一。其中deploy.group=flume-demo,deploy.dataId=hdfs,deploy.env=dev這三個參數來識別一組配置信息。

    更改的代碼詳情以下

    node.Application

 1 public class Application {
 2 
 3     ...
 4     public static void main(String[] args) {
 5 
 6 
 7     ...
 8     boolean isZkConfigured = false;
 9 
10             Options options = new Options();
11 
12             Option option = new Option("n", "name", true, "the name of this agent");
13             option.setRequired(true);
14             options.addOption(option);
15 
16             option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)");
17             option.setRequired(false);
18             options.addOption(option);
19 
20             option = new Option(null, "no-reload-conf", false, "do not reload config file if changed");
21             options.addOption(option);
22 
23             // Options for Zookeeper
24             option = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -f missing)");
25             option.setRequired(false);
26             options.addOption(option);
27 
28             option = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs");
29             option.setRequired(false);
30             options.addOption(option);
31 
32             option = new Option("h", "help", false, "display help text");
33             options.addOption(option);
34             
35             option = new Option("e", "env", true, "the environment of this app");
36             option.setRequired(false);
37             options.addOption(option);
38 
39                         Component.init(args);
40     ...              
41     }
42     ...    
43 }
View Code

    Component

  1 package com.appleframework.flume.node;
  2 
  3 import java.io.File;
  4 import java.lang.management.ManagementFactory;
  5 import java.text.SimpleDateFormat;
  6 import java.util.ArrayList;
  7 import java.util.Date;
  8 import java.util.Hashtable;
  9 import java.util.List;
 10 import java.util.Properties;
 11 
 12 import javax.management.MBeanServer;
 13 import javax.management.ObjectName;
 14 
 15 import org.apache.log4j.Logger;
 16 
 17 import com.appleframework.boot.config.ConfigContainer;
 18 import com.appleframework.boot.core.CommandOption;
 19 import com.appleframework.boot.core.Container;
 20 import com.appleframework.boot.core.log4j.Log4jContainer;
 21 import com.appleframework.boot.core.log4j.LoggingConfig;
 22 import com.appleframework.boot.core.monitor.MonitorConfig;
 23 import com.appleframework.boot.core.monitor.MonitorContainer;
 24 import com.appleframework.config.core.PropertyConfigurer;
 25 
 26 public class Component {
 27 
 28     private static Logger logger = Logger.getLogger(Component.class);
 29 
 30     static void init(String[] args) {
 31         //處理啓動參數
 32         CommandOption.parser(args);
 33                     
 34         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 35         
 36         final List<Container> containers = new ArrayList<Container>();
 37         containers.add(new Log4jContainer());
 38         containers.add(new MonitorContainer());
 39         
 40         String configContainer = System.getProperty("config-factory");
 41         if (null != configContainer) {
 42             containers.add(new ConfigContainer(configContainer));
 43         }
 44         
 45         for (Container container : containers) {
 46             container.start();
 47             try {
 48                 
 49                 Hashtable<String, String> properties = new Hashtable<String, String>();
 50 
 51                 properties.put(Container.TYPE_KEY, Container.DEFAULT_TYPE);
 52                 properties.put(Container.ID_KEY, container.getType());
 53                 
 54                 ObjectName oname = ObjectName.getInstance("com.appleframework", properties);
 55                 Object mbean = null;
 56                 if(container instanceof Log4jContainer) {
 57                     mbean = new LoggingConfig();
 58                 }
 59                 else if(container instanceof MonitorContainer) {
 60                     mbean = new MonitorConfig();
 61                 }
 62                 else {
 63                     logger.error("The Error Container :" + container.getName());
 64                 }
 65                 
 66                 if (mbs.isRegistered(oname)) {
 67                     mbs.unregisterMBean(oname);
 68                 }
 69                 mbs.registerMBean(mbean, oname);
 70             } catch (Exception e) {
 71                 logger.error("註冊JMX服務出錯:" + e.getMessage(), e);
 72             }
 73             logger.warn("服務 " + container.getType() + " 啓動!");
 74         }
 75         
 76         logger.warn(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss]").format(new Date()) + " 全部服務啓動成功!");
 77         
 78         extHadoopConfToLocal();
 79     }
 80     
 81     /**
 82      * 讀取配置中心的hdfs配置到本地,而後動態加載到classpath
 83      */
 84     static void extHadoopConfToLocal() {
 85         
 86         System.setProperty("HADOOP_USER_NAME","hdfs");
 87         String abc = PropertyConfigurer.getString("abc");
 88         logger.info("load conf from center ["+abc+"]");
 89         String dir = System.getProperty("user.dir");
 90         File file = new File(dir + "/conf/core-site.xml");
 91         
 92         // write core-site.xml to local if not exists
 93         if (!file.exists()) {
 94             Properties centerProps = PropertyConfigurer.getProps();
 95             Properties hdfsProps = new Properties();
 96             for (String key : centerProps.keySet().toArray(new String[0])) {
 97                 if (HDFSUtil.startWith(key)) {
 98                     hdfsProps.put(key, centerProps.get(key));
 99                 }
100             }
101             try {
102                 HDFSUtil.writerXMLToLocal(file, hdfsProps);
103             } catch (Throwable t) {
104                 logger.error("write hdfs conf to local errors["+hdfsProps+"]", t);
105             }
106         }
107         
108         //load dynamically to classpath
109         ExtClasspathLoader.loadResourceDir(file.getParent());
110     }
111 }
View Code

    HDFSUtil

 1 package com.appleframework.flume.node;
 2 
 3 import java.io.File;
 4 import java.io.FileNotFoundException;
 5 import java.io.FileOutputStream;
 6 import java.io.UnsupportedEncodingException;
 7 import java.lang.reflect.Method;
 8 import java.net.URL;
 9 import java.net.URLClassLoader;
10 import java.util.HashSet;
11 import java.util.Properties;
12 import java.util.Set;
13 
14 import org.dom4j.Document;
15 import org.dom4j.DocumentHelper;
16 import org.dom4j.Element;
17 import org.dom4j.io.OutputFormat;
18 import org.dom4j.io.XMLWriter;
19 
20 /**
21  * @author tjuhenryli<lijia@9zhitx.com>
22  * @date 2017-02-23 10:31:14
23  *
24  **/
25 
26 public class HDFSUtil {
27     //DFS_HA_NAMENODES_KEY_PREFIX DFS_NAMENODE_RPC_ADDRESS_KEY DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX
28     public static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS";
29     public static final String DFS_NAMESERVICES = "dfs.nameservices";
30     public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
31     public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
32     public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
33     public static Set<String> CACHE = new HashSet<String>();
34     static {
35         CACHE.add(FS_DEFAULT_NAME_KEY);
36         CACHE.add(DFS_NAMESERVICES);
37         CACHE.add(DFS_HA_NAMENODES_KEY_PREFIX);
38         CACHE.add(DFS_NAMENODE_RPC_ADDRESS_KEY);
39         CACHE.add(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX);
40     }
41     
42     public static boolean startWith(String content) {
43         for (String key : CACHE)
44             if (content.startsWith(key)) return true;
45         return false;
46     }
47     
48     public static void writerXMLToLocal(File file,Properties props) throws Throwable {
49         Element root = DocumentHelper.createElement("configuration");
50         Document document = DocumentHelper.createDocument(root);
51         
52         for (String key : props.keySet().toArray(new String[0])) {
53             Element property = root.addElement("property");
54             Element name = property.addElement("name");
55             Element value = property.addElement("value");
56             name.setText(key);
57             value.setText(props.getProperty(key));
58         }
59         
60         OutputFormat format = new OutputFormat("    ", true);// 設置縮進爲4個空格,而且另起一行爲true
61         if (file.exists()) return;
62         else if (!file.getParentFile().exists()) file.getParentFile().mkdirs();
63         XMLWriter xmlWriter = new XMLWriter(new FileOutputStream(file), format);
64         xmlWriter.write(document);
65     }
66     
67 }
View Code

    ExtClasspathLoader

  1 package com.appleframework.flume.node;
  2 
  3 import java.io.File;
  4 import java.lang.reflect.Method;
  5 import java.net.URL;
  6 import java.net.URLClassLoader;
  7 import java.util.ArrayList;
  8 import java.util.List;
  9 
 10 /** 
 11  * 根據properties中配置的路徑把jar和配置文件加載到classpath中。
 12  *  @author  jnbzwm
 13  *
 14   */ 
 15 public   final   class  ExtClasspathLoader  {
 16      /**  URLClassLoader的addURL方法  */ 
 17      private   static  Method addURL  =  initAddMethod();
 18 
 19      private   static  URLClassLoader classloader  =  (URLClassLoader) ClassLoader.getSystemClassLoader();
 20 
 21      /** 
 22      * 初始化addUrl 方法.
 23      *  @return  可訪問addUrl方法的Method對象
 24       */ 
 25      private   static  Method initAddMethod()  {
 26          try   {
 27             Method add  =  URLClassLoader.class.getDeclaredMethod( "addURL" ,  new  Class[]  { URL.class  } );
 28             add.setAccessible( true );
 29              return  add;
 30         } 
 31          catch  (Exception e)  {
 32              throw   new  RuntimeException(e);
 33         } 
 34     } 
 35 
 36      /** 
 37      * 加載jar classpath。
 38       */ 
 39 //     public   static   void  loadClasspath()  {
 40 //        List < String >  files  =  getJarFiles();
 41 //         for  (String f : files)  {
 42 //            loadClasspath(f);
 43 //        } 
 44 //
 45 //        List < String >  resFiles  =  getResFiles();
 46 //
 47 //         for  (String r : resFiles)  {
 48 //            loadResourceDir(r);
 49 //        } 
 50 //    } 
 51 
 52      public   static   void  loadClasspath(String filepath)  {
 53         File file  =   new  File(filepath);
 54         loopFiles(file);
 55     } 
 56 
 57      public   static   void  loadResourceDir(String filepath)  {
 58         File file  =   new  File(filepath);
 59         loopDirs(file);
 60     } 
 61 
 62      /**     
 63      * 循環遍歷目錄,找出全部的資源路徑。
 64      *  @param  file 當前遍歷文件
 65       */ 
 66      private   static   void  loopDirs(File file)  {
 67          //  資源文件只加載路徑 
 68          if  (file.isDirectory())  {
 69             addURL(file);
 70             File[] tmps  =  file.listFiles();
 71              for  (File tmp : tmps)  {
 72                 loopDirs(tmp);
 73             } 
 74         } 
 75     } 
 76 
 77      /**     
 78      * 循環遍歷目錄,找出全部的jar包。
 79      *  @param  file 當前遍歷文件
 80       */ 
 81      private   static   void  loopFiles(File file)  {
 82          if  (file.isDirectory())  {
 83             File[] tmps  =  file.listFiles();
 84              for  (File tmp : tmps)  {
 85                 loopFiles(tmp);
 86             } 
 87         } 
 88          else   {
 89              if  (file.getAbsolutePath().endsWith( " .jar " )  ||  file.getAbsolutePath().endsWith( " .zip " ))  {
 90                 addURL(file);
 91             } 
 92         } 
 93     } 
 94 
 95      /** 
 96      * 經過filepath加載文件到classpath。
 97      *  @param  filePath 文件路徑
 98      *  @return  URL
 99      *  @throws  Exception 異常
100       */ 
101      private   static   void  addURL(File file)  {
102          try   {
103             addURL.invoke(classloader,  new  Object[]  { file.toURI().toURL() } );
104         } 
105          catch  (Exception e)  {
106         } 
107     } 
108 
109      /** 
110      * 從配置文件中獲得配置的須要加載到classpath裏的路徑集合。
111      *  @return 
112       */ 
113      private   static  List < String >  getJarFiles()  {
114          //  TODO 從properties文件中讀取配置信息略 
115          return   new ArrayList<String>() ;
116     } 
117 
118      /** 
119      * 從配置文件中獲得配置的須要加載classpath裏的資源路徑集合
120      *  @return 
121       */ 
122      private   static  List < String >  getResFiles()  {
123          // TODO 從properties文件中讀取配置信息略
124          List<String> files = new ArrayList<String>();
125          files.add("etc");
126          return   files ;
127     } 
128 
129      public   static   void  main(String[] args)  {
130     } 
131 }
View Code
相關文章
相關標籤/搜索