Kafka java
由LinkedIn於2010年12月(https://thenewstack.io/streaming-data-at-linkedin-apache-kafka-reaches-1-1-trillion-messages-per-day/)開源出來一個消息的發佈/訂閱系統,用scala實現;版本從0.05到如今0.10.2.0(2017-02-25)node
系統中,生產者(producer)主動向集羣某個topic發送(push)消息(message);消費者(consumer)以組(group)爲單位訂閱topic,當消費者啓動消費程序以後,若是集羣中有未消費完的或者新的消息,則實時的拉取(pull)消息到消費者本地處理。數據庫
集羣是以topic爲單位管理和存儲消息的。既然是集羣,就利用集羣的優點;將topic分紅多個分片(partition),生產者發送的消息存儲在各個分片上。對應的多是不一樣節點的不一樣本地磁盤。每一個分片能夠設置多個副本(replicated)用來確保數據的容錯性。apache
並且每一個分片上的數據都是先後有序的;對應的就是後面的消息追加的文件中去;這種場景就可以利用磁盤順序讀寫的特性。api
固然消費者消費也是有序消費的;偏移量(offset)從小到大順序消費。服務器
總體效果就是架構
磁盤順序讀寫 內存隨機讀寫app
特色dom
分佈式:由zookeeper管理,能夠啓動多個broker-server;以集羣的方式給生產環境提供穩定的服務。maven
容錯性:大部分分佈式都具備的。
1.只要有一個正常的broker-server,集羣就能正常運行。
2.能夠設置爲Topic的partition設置副本,確保就算一臺機器的磁盤壞了;也不影響數據消費
負載問題:
1.生產者發送消息能夠指定規則,發送到不一樣的partition上。
2.topic中全部partition選取一個對外提供服務的leader;若是leader宕掉了,從後選中選取下一個。
可擴展性:新增broker很是方便。
生產者樣例代碼
1 import java.util.Properties 2 import kafka.producer.ProducerConfig 3 import kafka.javaapi.producer.Producer 4 import java.util.Random 5 import java.util.Date 6 import kafka.producer.KeyedMessage 7 import kafka.producer.Partitioner 8 import kafka.utils.VerifiableProperties 9 import org.apache.log4j.PropertyConfigurator 10 import java.util.concurrent.TimeUnit 11 import java.text.SimpleDateFormat 12 import org.tx.common.BIConstant 13 14 /** 15 * @date 2015-10-27 16:54:19 16 */ 17 object FirstKafkaProducer { 18 19 PropertyConfigurator.configure("etc/log4j.properties"); 20 21 def main(args: Array[String]): Unit = { 22 23 24 // val Array(interval,records) = args 25 val (interval,records) = (1,1) 26 val props = new Properties() 27 // props.put("metadata.broker.list", "own:9092,own:9093,own:9094") 28 props.put("metadata.broker.list", "hdpc1-dn003:9092") 29 props.put("serializer.class", "kafka.serializer.StringEncoder") 30 props.put("partitioner.class", "org.henry.scala.scalamvn.SimplePartitioner") 31 // props.put("request.required.acks", "-1") 32 val config = new ProducerConfig(props); 33 val producer = new Producer[String, String](config) 34 val sdf = new SimpleDateFormat(BIConstant.DATE_SDF) 35 36 for (i <- 1 to records.toInt) { 37 val rnd = new Random(); 38 val runtime = new Date(); 39 val ip = rnd.nextInt(255).toString(); 40 // val msg = runtime + ",www.example.com," + ip; 41 val msg = "1001|2|2|"+runtime.getTime 42 println(" *** "+msg) 43 val data = new KeyedMessage[String, String]("mytopic", ip, msg) 44 TimeUnit.SECONDS.sleep(interval.toInt) 45 producer.send(data) 46 } 47 producer.close 48 } 49 } 50 51 class SimplePartitioner(props:VerifiableProperties) extends Partitioner { 52 53 override def partition(key: Any, numPartitions: Int): Int = { 54 var partition = 0; 55 val stringKey = key.toString(); 56 val offset = stringKey.lastIndexOf('.'); 57 if (offset > 0) { 58 partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions; 59 } 60 partition; 61 } 62 }
消費者樣例代碼(https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example)
1 package com.test.groups; 2 3 import kafka.consumer.ConsumerIterator; 4 import kafka.consumer.KafkaStream; 5 6 public class ConsumerTest implements Runnable { 7 private KafkaStream m_stream; 8 private int m_threadNumber; 9 10 public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { 11 m_threadNumber = a_threadNumber; 12 m_stream = a_stream; 13 } 14 15 public void run() { 16 ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 17 while (it.hasNext()) 18 System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); 19 System.out.println("Shutting down Thread: " + m_threadNumber); 20 } 21 }
flume-ng
由Cloudera於2010年5月開源出來,在2010年7月加入Cloudera Hadoop的發行版本CDH3b2(http://blog.cloudera.com/blog/2010/07/whats-new-in-cdh3b2-flume/)中,用Java開發的。
開始是flume-OG,一直到2011年10月,最後版本是0.94.0。後面對核心組件/代碼架構的進行里程碑式的重構,就有了flume-ng;並開源到apache。
flume OG:agent採集數據,而後發送到collector;collector聚集後存儲到目的地。
flume NG最小單位的架構;只有agent角色,分爲三個步驟來接收和傳輸數據
Source(數據採集): 默認實現了從不一樣數據源接受數據。如Avro/Kafka/Netcat/Http等等;也能夠根據具體需求擴展實現source
Channel(數據臨時存儲的地方): 接受source的數據,可選擇持久化到數據庫或者本地磁盤;確保sink處理完數據後,刪除;保證數據完整性。
Sink(數據存儲目的地):數據存儲介質的實現。可選擇HDFS/kafka/local file等等。
在整個流程中,接收到的每條數據被封裝成Event來進行傳遞和處理的。
特色
1.部署,配置簡單方便;通用
2.中間對數據作了臨時存儲,確保數據不丟失
整合
那麼將這幾個有關聯的開源組件爲我所用;並且還要考慮到後期開發調試方便,版本管理;部署到生產環境下的時候操做簡單,可維護性好;可以監控JVM狀態。就須要進行自動化部署的改造,並且公司有成熟可靠的解決方案。那麼一切就瓜熟蒂落了。
總體思路是:使用maven將項目依賴的jar包/啓動腳本,簡單的配置打包成tar包。具體的配置項執行啓動腳本,調用Main方法後從公共的配置中心加載。
在此次改造中,主要的任務有兩大塊。
1.理解,使用如今成熟的自動化部署步驟、過程。
2.確保功能正常的狀況下,將原flume的conf下面配置文件搬移到配置中心的事情。
基本流程是:
將代碼提交到Git進行管理,使用jekins獲取Git代碼打成tar包。tar包裏面基本包含bin,conf,lib目錄。bin下面存聽任務啓動中止腳本,conf下面存放簡單的配置參數,lib下面存放項目依賴的jar和自身jar包。發佈到私服;
服務器下載tar包,解壓;啓動bin下面start.sh腳本啓動應用。應用啓動時會將配套的日誌、JMX監控服務註冊,啓動;再從配置中心獲取詳細的參數配置;啓動目標程序。
以本次flume的使用場景做爲例子,具體作了哪些事。
因爲咱們的數據源選取爲Kafka,存儲介質是HDFS。因此數據傳輸流就是kafka(source)-->local file(channel)-->HDFS(sink)
而原生態的flume啓動入口是使用腳本調用org.apache.flume.node.Application(Main方法入口程序),因此改造的切入點就是這裏。
調用node.Application時,額外傳入參數
-Djava-container=com.appleframework.flume.node.Application
-Dconfig-factory=com.appleframework.config.PropertyConfigurerFactory
其中PropertyConfigurerFactory是讀取、加載system.properties指向配置中心的詳細配置參數的做用。Application主要是增長從公共配置中加載參數的方法。其餘地方保持統一。其中deploy.group=flume-demo,deploy.dataId=hdfs,deploy.env=dev這三個參數來識別一組配置信息。
更改的代碼詳情以下
node.Application
1 public class Application { 2 3 ... 4 public static void main(String[] args) { 5 6 7 ... 8 boolean isZkConfigured = false; 9 10 Options options = new Options(); 11 12 Option option = new Option("n", "name", true, "the name of this agent"); 13 option.setRequired(true); 14 options.addOption(option); 15 16 option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)"); 17 option.setRequired(false); 18 options.addOption(option); 19 20 option = new Option(null, "no-reload-conf", false, "do not reload config file if changed"); 21 options.addOption(option); 22 23 // Options for Zookeeper 24 option = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -f missing)"); 25 option.setRequired(false); 26 options.addOption(option); 27 28 option = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs"); 29 option.setRequired(false); 30 options.addOption(option); 31 32 option = new Option("h", "help", false, "display help text"); 33 options.addOption(option); 34 35 option = new Option("e", "env", true, "the environment of this app"); 36 option.setRequired(false); 37 options.addOption(option); 38 39 Component.init(args); 40 ... 41 } 42 ... 43 }
Component
1 package com.appleframework.flume.node; 2 3 import java.io.File; 4 import java.lang.management.ManagementFactory; 5 import java.text.SimpleDateFormat; 6 import java.util.ArrayList; 7 import java.util.Date; 8 import java.util.Hashtable; 9 import java.util.List; 10 import java.util.Properties; 11 12 import javax.management.MBeanServer; 13 import javax.management.ObjectName; 14 15 import org.apache.log4j.Logger; 16 17 import com.appleframework.boot.config.ConfigContainer; 18 import com.appleframework.boot.core.CommandOption; 19 import com.appleframework.boot.core.Container; 20 import com.appleframework.boot.core.log4j.Log4jContainer; 21 import com.appleframework.boot.core.log4j.LoggingConfig; 22 import com.appleframework.boot.core.monitor.MonitorConfig; 23 import com.appleframework.boot.core.monitor.MonitorContainer; 24 import com.appleframework.config.core.PropertyConfigurer; 25 26 public class Component { 27 28 private static Logger logger = Logger.getLogger(Component.class); 29 30 static void init(String[] args) { 31 //處理啓動參數 32 CommandOption.parser(args); 33 34 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 35 36 final List<Container> containers = new ArrayList<Container>(); 37 containers.add(new Log4jContainer()); 38 containers.add(new MonitorContainer()); 39 40 String configContainer = System.getProperty("config-factory"); 41 if (null != configContainer) { 42 containers.add(new ConfigContainer(configContainer)); 43 } 44 45 for (Container container : containers) { 46 container.start(); 47 try { 48 49 Hashtable<String, String> properties = new Hashtable<String, String>(); 50 51 properties.put(Container.TYPE_KEY, Container.DEFAULT_TYPE); 52 properties.put(Container.ID_KEY, container.getType()); 53 54 ObjectName oname = ObjectName.getInstance("com.appleframework", properties); 55 Object mbean = null; 56 if(container instanceof Log4jContainer) { 57 mbean = new LoggingConfig(); 58 } 59 else if(container instanceof MonitorContainer) { 60 mbean = new MonitorConfig(); 61 } 62 else { 63 logger.error("The Error Container :" + container.getName()); 64 } 65 66 if (mbs.isRegistered(oname)) { 67 mbs.unregisterMBean(oname); 68 } 69 mbs.registerMBean(mbean, oname); 70 } catch (Exception e) { 71 logger.error("註冊JMX服務出錯:" + e.getMessage(), e); 72 } 73 logger.warn("服務 " + container.getType() + " 啓動!"); 74 } 75 76 logger.warn(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss]").format(new Date()) + " 全部服務啓動成功!"); 77 78 extHadoopConfToLocal(); 79 } 80 81 /** 82 * 讀取配置中心的hdfs配置到本地,而後動態加載到classpath 83 */ 84 static void extHadoopConfToLocal() { 85 86 System.setProperty("HADOOP_USER_NAME","hdfs"); 87 String abc = PropertyConfigurer.getString("abc"); 88 logger.info("load conf from center ["+abc+"]"); 89 String dir = System.getProperty("user.dir"); 90 File file = new File(dir + "/conf/core-site.xml"); 91 92 // write core-site.xml to local if not exists 93 if (!file.exists()) { 94 Properties centerProps = PropertyConfigurer.getProps(); 95 Properties hdfsProps = new Properties(); 96 for (String key : centerProps.keySet().toArray(new String[0])) { 97 if (HDFSUtil.startWith(key)) { 98 hdfsProps.put(key, centerProps.get(key)); 99 } 100 } 101 try { 102 HDFSUtil.writerXMLToLocal(file, hdfsProps); 103 } catch (Throwable t) { 104 logger.error("write hdfs conf to local errors["+hdfsProps+"]", t); 105 } 106 } 107 108 //load dynamically to classpath 109 ExtClasspathLoader.loadResourceDir(file.getParent()); 110 } 111 }
HDFSUtil
1 package com.appleframework.flume.node; 2 3 import java.io.File; 4 import java.io.FileNotFoundException; 5 import java.io.FileOutputStream; 6 import java.io.UnsupportedEncodingException; 7 import java.lang.reflect.Method; 8 import java.net.URL; 9 import java.net.URLClassLoader; 10 import java.util.HashSet; 11 import java.util.Properties; 12 import java.util.Set; 13 14 import org.dom4j.Document; 15 import org.dom4j.DocumentHelper; 16 import org.dom4j.Element; 17 import org.dom4j.io.OutputFormat; 18 import org.dom4j.io.XMLWriter; 19 20 /** 21 * @author tjuhenryli<lijia@9zhitx.com> 22 * @date 2017-02-23 10:31:14 23 * 24 **/ 25 26 public class HDFSUtil { 27 //DFS_HA_NAMENODES_KEY_PREFIX DFS_NAMENODE_RPC_ADDRESS_KEY DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX 28 public static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS"; 29 public static final String DFS_NAMESERVICES = "dfs.nameservices"; 30 public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; 31 public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; 32 public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider"; 33 public static Set<String> CACHE = new HashSet<String>(); 34 static { 35 CACHE.add(FS_DEFAULT_NAME_KEY); 36 CACHE.add(DFS_NAMESERVICES); 37 CACHE.add(DFS_HA_NAMENODES_KEY_PREFIX); 38 CACHE.add(DFS_NAMENODE_RPC_ADDRESS_KEY); 39 CACHE.add(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX); 40 } 41 42 public static boolean startWith(String content) { 43 for (String key : CACHE) 44 if (content.startsWith(key)) return true; 45 return false; 46 } 47 48 public static void writerXMLToLocal(File file,Properties props) throws Throwable { 49 Element root = DocumentHelper.createElement("configuration"); 50 Document document = DocumentHelper.createDocument(root); 51 52 for (String key : props.keySet().toArray(new String[0])) { 53 Element property = root.addElement("property"); 54 Element name = property.addElement("name"); 55 Element value = property.addElement("value"); 56 name.setText(key); 57 value.setText(props.getProperty(key)); 58 } 59 60 OutputFormat format = new OutputFormat(" ", true);// 設置縮進爲4個空格,而且另起一行爲true 61 if (file.exists()) return; 62 else if (!file.getParentFile().exists()) file.getParentFile().mkdirs(); 63 XMLWriter xmlWriter = new XMLWriter(new FileOutputStream(file), format); 64 xmlWriter.write(document); 65 } 66 67 }
ExtClasspathLoader
1 package com.appleframework.flume.node; 2 3 import java.io.File; 4 import java.lang.reflect.Method; 5 import java.net.URL; 6 import java.net.URLClassLoader; 7 import java.util.ArrayList; 8 import java.util.List; 9 10 /** 11 * 根據properties中配置的路徑把jar和配置文件加載到classpath中。 12 * @author jnbzwm 13 * 14 */ 15 public final class ExtClasspathLoader { 16 /** URLClassLoader的addURL方法 */ 17 private static Method addURL = initAddMethod(); 18 19 private static URLClassLoader classloader = (URLClassLoader) ClassLoader.getSystemClassLoader(); 20 21 /** 22 * 初始化addUrl 方法. 23 * @return 可訪問addUrl方法的Method對象 24 */ 25 private static Method initAddMethod() { 26 try { 27 Method add = URLClassLoader.class.getDeclaredMethod( "addURL" , new Class[] { URL.class } ); 28 add.setAccessible( true ); 29 return add; 30 } 31 catch (Exception e) { 32 throw new RuntimeException(e); 33 } 34 } 35 36 /** 37 * 加載jar classpath。 38 */ 39 // public static void loadClasspath() { 40 // List < String > files = getJarFiles(); 41 // for (String f : files) { 42 // loadClasspath(f); 43 // } 44 // 45 // List < String > resFiles = getResFiles(); 46 // 47 // for (String r : resFiles) { 48 // loadResourceDir(r); 49 // } 50 // } 51 52 public static void loadClasspath(String filepath) { 53 File file = new File(filepath); 54 loopFiles(file); 55 } 56 57 public static void loadResourceDir(String filepath) { 58 File file = new File(filepath); 59 loopDirs(file); 60 } 61 62 /** 63 * 循環遍歷目錄,找出全部的資源路徑。 64 * @param file 當前遍歷文件 65 */ 66 private static void loopDirs(File file) { 67 // 資源文件只加載路徑 68 if (file.isDirectory()) { 69 addURL(file); 70 File[] tmps = file.listFiles(); 71 for (File tmp : tmps) { 72 loopDirs(tmp); 73 } 74 } 75 } 76 77 /** 78 * 循環遍歷目錄,找出全部的jar包。 79 * @param file 當前遍歷文件 80 */ 81 private static void loopFiles(File file) { 82 if (file.isDirectory()) { 83 File[] tmps = file.listFiles(); 84 for (File tmp : tmps) { 85 loopFiles(tmp); 86 } 87 } 88 else { 89 if (file.getAbsolutePath().endsWith( " .jar " ) || file.getAbsolutePath().endsWith( " .zip " )) { 90 addURL(file); 91 } 92 } 93 } 94 95 /** 96 * 經過filepath加載文件到classpath。 97 * @param filePath 文件路徑 98 * @return URL 99 * @throws Exception 異常 100 */ 101 private static void addURL(File file) { 102 try { 103 addURL.invoke(classloader, new Object[] { file.toURI().toURL() } ); 104 } 105 catch (Exception e) { 106 } 107 } 108 109 /** 110 * 從配置文件中獲得配置的須要加載到classpath裏的路徑集合。 111 * @return 112 */ 113 private static List < String > getJarFiles() { 114 // TODO 從properties文件中讀取配置信息略 115 return new ArrayList<String>() ; 116 } 117 118 /** 119 * 從配置文件中獲得配置的須要加載classpath裏的資源路徑集合 120 * @return 121 */ 122 private static List < String > getResFiles() { 123 // TODO 從properties文件中讀取配置信息略 124 List<String> files = new ArrayList<String>(); 125 files.add("etc"); 126 return files ; 127 } 128 129 public static void main(String[] args) { 130 } 131 }