by 小閃電java
0前言apache
storm的主要做用是進行流式的實時計算,對於一直產生的數據流處理是很是迅速的,然而大部分數據並非均勻的數據流,而是時而多時而少。對於這種狀況下進行批處理是不合適的,所以引入了kafka做爲消息隊列,與storm完美配合,這樣能夠實現穩定的流式計算。下面是一個簡單的示例實現從kafka讀取數據,並寫入到kafka,以此來掌握storm與kafka之間的交互。api
1程序框圖app
實質上就是storm的kafkaspout做爲一個consumer,kafkabolt做爲一個producer。async
框圖以下:maven
2 pom.xmlide
創建一個maven項目,將storm,kafka,zookeeper的外部依賴疊加起來。ui
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.tony</groupId> <artifactId>storm-example</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.3</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <!-- storm-kafka模塊須要的依賴 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <repository> <id>central</id> <url>http://repo1.maven.org/maven2/</url> <snapshots> <enabled>false</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>clojars</id> <url>https://clojars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>scala-tools</id> <url>http://scala-tools.org/repo-releases</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> <repository> <id>conjars</id> <url>http://conjars.org/repo/</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>true</enabled> </releases> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> <encoding>UTF-8</encoding> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
3 kafkaspout的消費邏輯,修改MessageScheme類,其中定義了倆個字段,key和message,方便分發到kafkabolt。代碼以下this
package com.tony.storm_kafka.util; import java.io.UnsupportedEncodingException; import java.util.List; import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /* *author: hi *public class MessageScheme{ } **/ public class MessageScheme implements Scheme { @Override public List<Object> deserialize(byte[] arg0) { try{ String msg = new String(arg0, "UTF-8"); String msg_0 = "hello"; return new Values(msg_0,msg); } catch (UnsupportedEncodingException e) { // TODO: handle exception e.printStackTrace(); } return null; } @Override public Fields getOutputFields() { return new Fields("key","message"); } }
4.編寫topology主類,配置kafka,提交topology到storm的代碼,其中kafkaspout的zkhost有動態和靜態倆種配置,儘可能使用動態自尋的方式。google
package org.tony.storm_kafka.common; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import storm.kafka.trident.TridentKafkaState; import java.util.Arrays; import java.util.Properties; import org.tony.storm_kafka.bolt.ToKafkaBolt; import com.tony.storm_kafka.util.MessageScheme; public class KafkaBoltTestTopology { //配置kafka spout參數 public static String kafka_zk_port = null; public static String topic = null; public static String kafka_zk_rootpath = null; public static BrokerHosts brokerHosts; public static String spout_name = "spout"; public static String kafka_consume_from_start = null; public static class PrinterBolt extends BaseBasicBolt { /** * */ private static final long serialVersionUID = 9114512339402566580L; // @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } // @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println("-----"+(tuple.getValue(1)).toString()); } } public StormTopology buildTopology(){ //kafkaspout 配置文件 kafka_consume_from_start = "true"; kafka_zk_rootpath = "/kafka08"; String spout_id = spout_name; brokerHosts = new ZkHosts("192.168.201.190:2191,192.168.201.191:2191,192.168.201.192:2191", kafka_zk_rootpath+"/brokers"); kafka_zk_port = "2191"; SpoutConfig spoutConf = new SpoutConfig(brokerHosts, "testfromkafka", kafka_zk_rootpath, spout_id); spoutConf.scheme = new SchemeAsMultiScheme(new MessageScheme()); spoutConf.zkPort = Integer.parseInt(kafka_zk_port); spoutConf.zkRoot = kafka_zk_rootpath; spoutConf.zkServers = Arrays.asList(new String[] {"10.9.201.190", "10.9.201.191", "10.9.201.192"}); //是否從kafka第一條數據開始讀取 if (kafka_consume_from_start == null) { kafka_consume_from_start = "false"; } boolean kafka_consume_frome_start_b = Boolean.valueOf(kafka_consume_from_start); if (kafka_consume_frome_start_b != true && kafka_consume_frome_start_b != false) { System.out.println("kafka_comsume_from_start must be true or false!"); } System.out.println("kafka_consume_from_start: " + kafka_consume_frome_start_b); spoutConf.forceFromStart=kafka_consume_frome_start_b; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConf)); builder.setBolt("forwardToKafka", new ToKafkaBolt<String, String>()).shuffleGrouping("spout"); return builder.createTopology(); } public static void main(String[] args) { KafkaBoltTestTopology kafkaBoltTestTopology = new KafkaBoltTestTopology(); StormTopology stormTopology = kafkaBoltTestTopology.buildTopology(); Config conf = new Config(); //設置kafka producer的配置 Properties props = new Properties(); props.put("metadata.broker.list", "192.10.43.150:9092"); props.put("producer.type","async"); props.put("request.required.acks", "0"); // 0 ,-1 ,1 props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); conf.put("topic","testTokafka"); if(args.length > 0){ // cluster submit. try { StormSubmitter.submitTopology("kafkaboltTest", conf, stormTopology); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else{ new LocalCluster().submitTopology("kafkaboltTest", conf, stormTopology); } } }
5 示例結果,testfromkafka topic裏面的數據能夠經過另外寫個類來進行持續的生產。
topic testfromkafka的數據
topic testTokafka的數據
6 補充ToKfakaBolt,集成基礎的Bolt類,主要改寫Excute,同時加上Ack機制。
import java.util.Map; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import storm.kafka.bolt.mapper.TupleToKafkaMapper; import storm.kafka.bolt.selector.KafkaTopicSelector; import storm.kafka.bolt.selector.DefaultTopicSelector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; /* *author: yue *public class ToKafkaBolt{ } **/ public class ToKafkaBolt<K,V> extends BaseRichBolt{ private static final Logger Log = LoggerFactory.getLogger(ToKafkaBolt.class); public static final String TOPIC = "topic"; public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties"; private Producer<K, V> producer; private OutputCollector collector; private TupleToKafkaMapper<K, V> Mapper; private KafkaTopicSelector topicselector; public ToKafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K, V> mapper){ this.Mapper = mapper; return this; } public ToKafkaBolt<K, V> withTopicSelector(KafkaTopicSelector topicSelector){ this.topicselector = topicSelector; return this; } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { if (Mapper == null) { this.Mapper = new FieldNameBasedTupleToKafkaMapper<K, V>(); } if (topicselector == null) { this.topicselector = new DefaultTopicSelector((String)stormConf.get(TOPIC)); } Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES); Properties properties = new Properties(); properties.putAll(configMap); ProducerConfig config = new ProducerConfig(properties); producer = new Producer<K, V>(config); this.collector = collector; } @Override public void execute(Tuple input) { // String iString = input.getString(0); K key = null; V message = null; String topic = null; try { key = Mapper.getKeyFromTuple(input); message = Mapper.getMessageFromTuple(input); topic = topicselector.getTopic(input); if (topic != null) { producer.send(new KeyedMessage<K, V>(topic,message)); }else { Log.warn("skipping key = "+key+ ",topic selector returned null."); } } catch ( Exception e) { // TODO: handle exception Log.error("Could not send message with key = " + key + " and value = " + message + " to topic = " + topic, e); }finally{ collector.ack(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
做 者:小閃電
出處:http://www.cnblogs.com/yueyanyu/ 本文版權歸做者和博客園共有,歡迎轉載、交流,但未經做者贊成必須保留此段聲明,且在文章頁面明顯位置給出原文連接。若是以爲本文對您有益,歡迎點贊、歡迎探討。本博客來源於互聯網的資源,若侵犯到您的權利,請聯繫博主予以刪除。