版權聲明:本文爲博主原創文章,未經博主容許不得轉載html
本文是基於hadoop 2.7.1,以及kafka 0.11.0.0。kafka-connect是以單節點模式運行,即standalone。java
一. 首先,先對kafka和kafka connect作一個簡單的介紹node
kafka:Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。比較直觀的解釋就是其有一個生產者(producer)和一個消費者(consumer)。能夠將kafka想象成一個數據容器,生產者負責發送數據到這個容器中,而消費者從容器中取出數據,在將數據作處理,如存儲到hdfs。算法
kafka connect:Kafka Connect是一種用於在Kafka和其餘系統之間可擴展的、可靠的流式傳輸數據的工具。它使得可以快速定義將大量數據集合移入和移出Kafka的鏈接器變得簡單。即適合批量數據導入導出操做。apache
二. 下面將介紹如何用kafka connect將數據寫入到hdfs中。包括在這個過程當中可能碰到的一些問題說明。json
首先啓動kafka-connect:多線程
bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
這個命令後面兩個參數,
第一個是指定啓動的模式,有分佈式和單節點兩種,這裏是單節點。kafka自帶,放於config目錄下。
第二個參數指向描述connector的屬性的文件,能夠有多個,這裏只有一個connector用來寫入到hdfs。須要本身建立。
接下來看看connector1.properties的內容,
name="test" #該connector的名字
#將本身按connect接口規範編寫的代碼打包後放在kafka/libs目錄下,再根據項目結構引用對應connector
connector.class=hdfs.HdfsSinkConnector
#Task是導入導出的具體實現,這裏是指定多少個task來並行運行導入導出做業,由多線程實現。因爲hdfs中一個文件每次只能又一個文件操做,因此這裏只能是1
tasks.max=1
#指定從哪一個topic讀取數據,這些實際上是用來在connector或者task的代碼中讀取的。
topics=test
#指定key以那種方式轉換,需和Producer發送方指定的序列化方式一致
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.json.JsonConverter #同上
hdfs.url=hdfs://127.0.0.1:9000 #hdfs的url路徑,在Connector中會被讀取
hdfs.path=/test/file #hdfs文件路徑,一樣Connector中被讀取
key.converter.schemas.enable=true #稍後介紹,能夠true也能夠false,影響傳輸格式
value.converter.schemas.enable=true #稍後介紹,能夠true也能夠false
三. 接下來看代碼,connect主要是導入導出兩個概念,導入是source,導出時Sink。這裏只使用Sink,不過Source和Sink的實現其實基本相同。
實現Sink其實不難,實現對應的接口,即SinkConnector和SinkTask兩個接口,再打包放到kafka/libs目錄下便可。其中SinkConnector只有一個,而Task能夠有多
先是Connector
public class HdfsSinkConnector extends SinkConnector { //這兩項爲配置hdfs的urlh和路徑的配置項,須要在connector1.properties中指定 public static final String HDFS_URL = "hdfs.url"; public static final String HDFS_PATH = "hdfs.path"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url") .define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path"); private String hdfsUrl; private String hdfsPath; @Override public String version() { return AppInfoParser.getVersion(); }
//start方法會再初始的時候執行一次,這裏主要用於配置 @Override public void start(Map<String, String> props) { hdfsUrl = props.get(HDFS_URL); hdfsPath = props.get(HDFS_PATH); } //這裏指定了Task的類 @Override public Class<? extends Task> taskClass() { return HdfsSinkTask.class; } //用於配置Task的config,這些都是會在Task中用到 @Override public List<Map<String, String>> taskConfigs(int maxTasks) { ArrayList<Map<String, String>> configs = new ArrayList<>(); for (int i = 0; i < maxTasks; i++) { Map<String, String> config = new HashMap<>(); if (hdfsUrl != null) config.put(HDFS_URL, hdfsUrl); if (hdfsPath != null) config.put(HDFS_PATH, hdfsPath); configs.add(config); } return configs; } //關閉時的操做,通常是關閉資源。 @Override public void stop() { // Nothing to do since FileStreamSinkConnector has no background monitoring. } @Override public ConfigDef config() { return CONFIG_DEF; } }
接下來是Taskapp
public class HdfsSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(HdfsSinkTask.class); private String filename; public static String hdfsUrl; public static String hdfsPath; private Configuration conf; private FSDataOutputStream os; private FileSystem hdfs; public HdfsSinkTask(){ } @Override public String version() { return new HdfsSinkConnector().version(); } //Task開始會執行的代碼,可能有多個Task,因此每一個Task都會執行一次 @Override public void start(Map<String, String> props) { hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL); hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH); System.out.println("----------------------------------- start--------------------------------"); conf = new Configuration(); conf.set("fs.defaultFS", hdfsUrl); //這兩個是與hdfs append相關的設置 conf.setBoolean("dfs.support.append", true); conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); try{ hdfs = FileSystem.get(conf); // connector.hdfs = new Path(HDFSPATH).getFileSystem(conf); os = hdfs.append(new Path(hdfsPath)); }catch (IOException e){ System.out.println(e.toString()); } } //核心操做,put就是將數據從kafka中取出,存放到其餘地方去 @Override public void put(Collection<SinkRecord> sinkRecords) { for (SinkRecord record : sinkRecords) { log.trace("Writing line to {}: {}", logFilename(), record.value()); try{ System.out.println("write info------------------------" + record.value().toString() + "-----------------"); os.write((record.value().toString()).getBytes("UTF-8")); os.hsync(); }catch(Exception e){ System.out.print(e.toString()); } } } @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { try{ os.hsync(); }catch (Exception e){ System.out.print(e.toString()); } }
//一樣是結束時候所執行的代碼,這裏用於關閉hdfs資源 @Override public void stop() { try { os.close(); }catch(IOException e){ System.out.println(e.toString()); } } private String logFilename() { return filename == null ? "stdout" : filename; } }
這裏重點提一下,由於在connector1.propertise中設置了key.converter=org.apache.kafka.connect.converters.ByteArrayConverter,因此不能用命令行形式的
producer發送數據,而是要用程序的方式,而且在producer總也要設置key的序列化形式爲org.apache.kafka.common.serialization.ByteArraySerializer。
編碼完成,先用idea以開發程序與依賴包分離的形式打包成jar包,而後將程序對應的jar包(通常就是「項目名.jar」)放到kafka/libs目錄下面,這樣就能被找到。
四. 接下來對這個過程的問題作一個彙總。
1.在connector1.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的問題。
這個選項默認在connect-standalone.properties中是true的,這個時候發送給topic的Json格式是須要使用avro格式,具體狀況能夠百度,這裏給出一個樣例。
{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": true,
"field": "c1"
}, {
"type": "string",
"optional": true,
"field": "c2"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "create_ts"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "update_ts"
}],
"optional": false,
"name": "foobar"
},
"payload": {
"c1": 10000,
"c2": "bar",
"create_ts": 1501834166000,
"update_ts": 1501834166000
}
}
主要就是schema和payload這兩個,不按照這個格式會報錯以下分佈式
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
若是想發送普通的json格式而不是avro格式的話,很簡單key.converter.schemas.enable和value.converter.schemas.enable設置爲false就行。這樣就能發送普通的json格式數據。ide
2.在啓動的過程當中出現各類各樣的java.lang.ClassNotFoundException。
在啓動connector的時候,一開始老是會報各個各樣的ClassNotFoundException,不是這個包就是那個包,查找問題一直說要麼缺乏包要麼是包衝突。這個是什麼緣由呢?
其實歸根結底仍是依賴衝突的問題,由於kafka程序自定義的類加載器加載類的目錄是在kafka/libs中,而寫到hdfs須要hadoop的包。
我一開始的作法是將hadoop下的包路徑添加到CLASSPATH中,這樣子問題就來了,由於kafka和hadoop的依賴包是有衝突的,好比hadoop是guava-11.0.2.jar,而kafka是guava-20.0.jar,兩個jar包版本不一樣,而咱們是在kafka程序中調用hdfs,因此當jar包衝突時應該優先調用kafka的。可是注意kafka用的是程序自定義的類加載器,其優先級是低於CLASSPATH路徑下的類的,就是說加載類時會優先加載CLASSPATH下的類。這樣子就有問題了。
個人解決方案時將kafka和hadoop加載的jar包路徑都添加到CLASSPATH中,而且kafka的路徑寫在hadoop前面,這樣就能夠啓動connector成功。
---
推薦閱讀:
大數據存儲的進化史 --從 RAID 到 Hdfs
貝葉斯分類算法實例 --根據姓名推測男女
從分治算法到 MapReduce