老版本:官方文檔html
新版本:官方文檔java
Storm可集成組件:apache
需求:給kafka數據添加日期bootstrap
實際用途:可根據業務續期自定義,例如解析Nginx日誌ip限制訪問等centos
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.zhiwei</groupId> <artifactId>data_process_experience</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>storm_experience</artifactId> <packaging>jar</packaging> <properties> <version.storm>1.2.2</version.storm> <version.slf4j>1.7.2</version.slf4j> <version.lombok>1.18.4</version.lombok> <version.kafka>2.2.0</version.kafka> <version.storm-kafka>1.2.2</version.storm-kafka> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${version.storm}</version> <exclusions> <exclusion> <groupId>ring-cors</groupId> <artifactId>ring-cors</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${version.slf4j}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${version.lombok}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${version.kafka}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>${version.storm-kafka}</version> </dependency> </dependencies> </project>
package com.zhiwei.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.time.LocalDate; import java.util.Map; /** * 數據前綴添加日期 */ @Slf4j public class TimeProcessBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { // 初始化 //collector:該bolt組件的收集器,用於把處理的數據發給下一個bolt組件 this.collector = collector; } @Override public void execute(Tuple tuple) { log.info("kafka主題:{}, 消息:{}", tuple.getValue(1), tuple.getValue(0)); if(StringUtils.isNotEmpty((String)tuple.getValue(0))) { collector.emit(getTuple(tuple)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("key", "message")); } private Values getTuple(Tuple tuple){ String nowTime = LocalDate.now().toString(); return new Values(nowTime, String.format("【 %s 】%s", LocalDate.now().toString(), String.valueOf(tuple.getValue(0)))); } }
package com.zhiwei.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.kafka.bolt.KafkaBolt; import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; import org.apache.storm.kafka.spout.ByTopicRecordTranslator; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Properties; /** * 例如:給kafka數據添加時間戳 */ public class StormKafkaTopology { private static String topologyName = "storm-storm-case"; private static String spoutTopic = "kafka-spout-topic"; private static String boltTopic = "kafka-bolt-topic"; public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { TopologyBuilder builder = new TopologyBuilder(); //kafka記錄轉換爲tuple ByTopicRecordTranslator<String, String> byTopicRecordTranslator = new ByTopicRecordTranslator<>((record) -> new Values(record.value(), record.topic()), new Fields("value", "topic")); //設置消費topic byTopicRecordTranslator.forTopic(spoutTopic, (record) -> new Values(record.value(), record.topic()), new Fields("value", "topic")); KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig .builder("centos:9092", spoutTopic) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setRecordTranslator(byTopicRecordTranslator) .build(); builder.setSpout("kafkaSpout", new KafkaSpout<>(kafkaSpoutConfig)); builder.setBolt("timeProcessBolt", new TimeProcessBolt()).shuffleGrouping("kafkaSpout"); Properties props = new Properties(); props.put("bootstrap.servers", "centos:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaBolt bolt = new KafkaBolt() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector(boltTopic)) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("timeProcessBolt"); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(0); if (args != null && args.length > 0) { //提交到集羣運行 StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { //本地模式運行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, config, builder.createTopology()); Utils.sleep(100000); cluster.killTopology(topologyName); cluster.shutdown(); } } }