Storm應用系列之——集成Kafka

本文系原創系列,轉載請註明。html

原帖地址:http://blog.csdn.net/xeseojava

 

前言

在前面Storm系列之——基本概念一文中,提到過Storm的Spout應該是源源不斷的取數據,不能間斷。那麼,很顯然,消息隊列系統、分佈式內存系統或內存數據庫是做爲其數據源的很好的選擇。本文就如何集成Kafka進行介紹。git

Kafka的基本介紹:http://blog.csdn.net/xeseo/article/details/18311955github

準備工做

KafkaSpout其實網上已經有人寫了,在github上開源了,不用咱們本身造輪子。只是要注意版本問題:數據庫

0.7版本的Kafka,對應KafkaSpout可使用Storm-contrib下面的例子apache

源碼:https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafkaapp

Maven依賴:https://clojars.org/storm/storm-kafkamaven

0.8版本的Kafka在API上和底層Offset的處理方式上發生了重大變化,因此老的KafkaSpout再也不適用,必須使用新的KafkaAPI分佈式

源碼:https://github.com/wurstmeister/storm-kafka-0.8-pluside

Maven依賴:https://clojars.org/net.wurstmeister.storm/storm-kafka-0.8-plus

這裏由於0.8版本的Kafka必然是未來主流,因此我就不介紹0.7 的了,使用方式基本上是相似的。

PS:

是人寫的,就會有bug,況且是別人分享出來的。因此,遇到bug,還請去github上提交一個issue告訴做者修正。

2014/7/29 更新:

wurstmeister/storm-kafka-0.8-plus 如今合併到Apache Storm了,在其external/storm-kakfa目錄

Maven依賴直接更新成:

[plain] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1.               <dependency>  

  2.   <groupId>org.apache.storm</groupId>  

  3.   <artifactId>storm-kafka</artifactId>  

  4.   <version>0.9.2-incubating</version>  

  5. </dependency>  

可是storm彷佛沒有直接把external的包加載到classpath,因此使用時,還得手動把該jar包從external/storm-kafka/下拷到storm的lib目錄。

固然,也能夠在maven中加上<scope>compile</scope>,直接把該jar打到你項目一塊兒。

使用KafkaSpout

一個KafkaSpout只能去處理一個topic的內容,因此,它要求初始化時提供以下與topic相關信息:

  • Kafka集羣中的Broker地址 (IP+Port)

有兩種方法指定:

1. 使用靜態地址,即直接給定Kafka集羣中全部Broker信息

[java] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1. GlobalPartitionInformation info = new GlobalPartitionInformation();  

  2. info.addPartition(0, new Broker("10.1.110.24",9092));  

  3. info.addPartition(0, new Broker("10.1.110.21",9092));  

  4. BrokerHosts brokerHosts = new StaticHosts(info);  

 

2. 從Zookeeper動態讀取

[java] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1. BrokerHosts brokerHosts = new ZkHosts("10.1.110.24:2181,10.1.110.22:2181");  

推薦使用這種方法,由於Kafka的Broker可能會動態的增減

  • topic名字

  • 當前spout的惟一標識Id (如下代稱$spout_id)

  • zookeeper上用於存儲當前處理到哪一個Offset了 (如下代稱$zk_root)

  • 當前topic中數據如何解碼

瞭解Kafka的應該知道,Kafka中當前處理到哪的Offset是由客戶端本身管理的。因此,後面兩個的目的,實際上是在zookeeper上創建一個 $zk_root/$spout_id 的節點,其值是一個map,存放了當前Spout處理的Offset的信息。

在Topology中加入Spout的代碼:

[java] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1. String topic = "test";  

  2. String zkRoot = "kafkastorm";  

  3. String spoutId = "myKafka";  

  4.   

  5. SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);  

  6. spoutConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());  

  7.   

  8. TopologyBuilder builder = new TopologyBuilder();  

  9. builder.setSpout("spout", new KafkaSpout(spoutConfig), spoutNum);  


其中TestMessageScheme就是告訴KafkaSpout如何去解碼數據,生成Storm內部傳遞數據

[java] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1. public class TestMessageScheme implements Scheme {  

  2.   

  3.     private static final Logger LOGGER = LoggerFactory.getLogger(TestMessageScheme.class);  

  4.       

  5.     @Override  

  6.     public List<Object> deserialize(byte[] bytes) {  

  7.         try {  

  8.             String msg = new String(bytes, "UTF-8");  

  9.             return new Values(msg);  

  10.         } catch (InvalidProtocolBufferException e) {  

  11.             LOGGER.error("Cannot parse the provided message!");  

  12.         }  

  13.           

  14.         //TODO: what happend if returns null?  

  15.         return null;  

  16.     }  

  17.   

  18.     @Override  

  19.     public Fields getOutputFields() {  

  20.         return new Fields("msg");  

  21.     }  

  22.   

  23. }  

這個解碼方式是與Producer端生成時塞入數據的編碼方式配套的。這裏我Producer端塞入的是String的byte,因此這裏也還原成String,定義輸出爲一個名叫"msg"的field。

後面就能夠本身添加Bolt處理tuple中該field的數據了。

使用TransactionalTridentKafkaSpout

TransactionalTridentKafkaSpout是爲事務性的Trident而用的。用法與KafkaSpout有所不一樣。

[java] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1. TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, topic, spoutId);  

  2. kafkaConfig.scheme = new SchemeAsMultiScheme(new TestMessageScheme());  

  3.   

  4. TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);  

  5.   

  6. TridentTopology topology = new TridentTopology();  

  7. topology.newStream("test_str", kafkaSpout).shuffle().each(new Fields("msg", new PrintFunction());  

看到它並無要求咱們提供zkRoot,由於直接代碼裏面寫死了…… -_-T

地址是 /transactional/<STREAM_NAME>/<Spout_Id>,在上面的例子中,就是  /transactional/test_str/myKafaka

常見問題

1. 本地模式沒法保存Offset

KafkaSpout初始化時,會去取spoutConfig.zkServers 和 spoutConfig.zkPort 變量的值,而該值默認是沒塞的,因此是空,那麼它就會去取當前運行的Storm所配置的zookeeper地址和端口,而本地運行的Storm,是一個臨時的zookeeper實例,並不會真正持久化。因此,每次關閉後,數據就沒了。

本地模式,要顯示的去配置

[java] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1. spoutConfig.zkServers = new ArrayList<String>(){{  

  2.                 add("10.1.110.20");  

  3.                 add("10.1.110.21");  

  4.                 add("10.1.110.24");  

  5.             }};  

  6. spoutConfig.zkPort = 2181;  



2. 用Maven導入時,運行中SLF4J打印MutipleBinding 錯誤,致使無log輸出。

緣由是在這個KafkaSpout的pom.xml裏依賴了kafka_2.9.2,而這貨帶了一個slf4j-simple的SLF4J綁定,修復這個問題

[html] view plaincopyprint?在CODE上查看代碼片派生到個人代碼片

 

  1. <del><dependency>  

  2.   <groupId>net.wurstmeister.storm</groupId>  

  3.   <artifactId>storm-kafka-0.8-plus</artifactId>  

  4.   <version>0.2.0</version>  

  5.   <exclusion>  

  6.     <groupId>org.slf4j</groupId>  

  7.     <artifactId>slf4j-simple</artifactId>  

  8.   </exclusion>  

  9. </dependency></del>  


3. 若是在topology第一次啓動前,往kafka裏面寫數據,啓動Storm後,這部分數據讀不出來

緣由是第一次啓動topology時,在zookeeper上並未建立出保存Offset信息的節點,因此默認它會取當前partition最新的Offset(Kafka本身維護的單個partition上遞增序號)。

理論上,若是找不到保存的Offset信息,應該從-1的Offset讀起。

這個問題我給做者提出來了,但做者認爲這樣能夠避免重複處理,我沒有想通爲什麼會有重複處理。但好在做者說會在後續版本加入參數來控制。

剛去看了下,彷佛做者已經在提交 8b764cd fix掉了。有興趣的能夠去試下。我是本身本地改了他的代碼。

以上問題已修復併合並。

相關文章
相關標籤/搜索