POM java
<properties> <storm.version>1.1.0</storm.version> <kafka.version>0.10.2.0</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!-- 因爲storm環境中有該jar,因此不用pack到最終的task.jar中 --> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.version}</version> <scope>${provided.scope}</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>
toploployapache
public class KafkaTopology { /** * storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.boyoi.kafka.topology.KafkaTopology x2 */ public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout<>(getKafkaSpoutConfig()),200); /** * 1)shuffleGrouping(隨機分組) * 2)fieldsGrouping(按照字段分組,在這裏便是同一個單詞只能發送給一個Bolt) * 3)allGrouping(廣播發送,即每個Tuple,每個Bolt都會收到) * 4)globalGrouping(全局分組,將Tuple分配到task id值最低的task裏面) * 5)noneGrouping(隨機分派) * 6)directGrouping(直接分組,指定Tuple與Bolt的對應發送關係) * 7)Local or shuffle Grouping * 8)customGrouping (自定義的Grouping) */ builder.setBolt("filter", new FilterBolt(), 200).shuffleGrouping("spout"); builder.setBolt("analysis", new AnalysisBlot(), 200).shuffleGrouping("filter"); builder.setBolt("HBase", new HBaseBlot(),200).shuffleGrouping("analysis"); Config conf = new Config(); // conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(5); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("x2", conf, builder.createTopology()); cluster.shutdown(); } } /** * 獲取kafka spout 配置 */ private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig() { return KafkaSpoutConfig.builder("192.168.1.9:9092,192.168.1.40:9092", "test2") .setGroupId("kafka") .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str")) .build(); } private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc(); /** * Needs to be serializable */ private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable { @Override public List<Object> apply(ConsumerRecord<String, String> record) { return new Values(record.value()); } } }
Blotapp
public class FilterBolt extends BaseRichBolt{ private OutputCollector outputCollector; /** * 初始化工做 */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } /** * 執行邏輯,目的是過濾無用的字符串 */ @Override public void execute(Tuple tuple) { String value = tuple.getString(0); // 提交下一個 if (null != value && !"".equals(value)){ try { int val = Integer.parseInt(value); outputCollector.emit(new Values(val)); }catch (Exception e){ // ignore System.out.println(value + "不是數字!略過!!!"); } } // 返回確認 outputCollector.ack(tuple); } /** * 申明傳入到一個Bolt的字段名稱 * 經過 input.getStringByField("str");input.getIntegerByField("int"); 得到指定的 */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("value")); } }
其它blot略maven
代碼寫好後。使用maven打成jar包。ide
放在集羣Nimbus的某個目錄下ui
storm jar /opt/storm-jar/storm-1.0-SNAPSHOT.jar com.*.kafka.topology.KafkaTopology x2