Kafka系列四 flume-kafka-storm整合

flume-kafka-storm

flume讀取日誌數據,而後發送至kafka。java

一、flume配置文件apache

agent.sources = kafkaSource
agent.channels = kafkaChannel
agent.sinks = kafkaSink

agent.sources.kafkaSource.type = exec
agent.sources.kafkaSource.command = tail -F /home/hadoop/kafkaData/kafka.log
agent.sources.kafkaSource.channels = kafkaChannel

agent.sinks.kafkaSink.channel = kafkaChannel
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = stormTopic
agent.sinks.kafkaSink.brokerList = 192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
agent.sinks.kafkaSink.kafka.flumeBatchSize = 20
agent.sinks.kafkaSink.kafka.producer.acks = 1
agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
agent.sinks.kafkaSink.kafka.producer.compression.type = snappy
agent.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder 

agent.channels.kafkaChannel.type=memory                                                                                                       
agent.channels.kafkaChannel.capacity=10000                                                                                                    
agent.channels.kafkaChannel.transactionCapacity=100

二、啓動flumeapp

bin/flume-ng agent --conf-file  conf/flume-kafka.conf -c conf/ --name agent -Dflume.root.logger=DEBUG,console

三、須要在flume機器上修改hosts文件,添加上kafka的主機名和ip的映射。maven

四、在kafka上建立主題ide

bin/kafka-topics.sh --create --zookeeper hadoop2:2181 --replication-factor 1 --partitions 3 --topic stormTopic

五、模擬生成日誌腳本oop

for((i=0;i<=10000;i++));
do echo "kafka_test-"$i>>/home/hadoop/kafkaData/kafka.log;
done

六、在kafka上開啓消費者ui

bin/kafka-console-consumer.sh --zookeeper hadoop2:2181 --from-beginning --topic stormTopic

至此,flum->kafka的數據流走通。spa

七、整合Storm,將kafka做爲stom的spout,將使用KafkaSpout。插件

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3     <modelVersion>4.0.0</modelVersion>
 4     <groupId>cn.itcast</groupId>
 5     <artifactId>kafkaStorm</artifactId>
 6     <version>0.0.1-SNAPSHOT</version>
 7     <packaging>jar</packaging>
 8     <dependencies>
 9         <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
10         <dependency>
11             <groupId>org.apache.storm</groupId>
12             <artifactId>storm-core</artifactId>
13             <version>1.2.1</version>
14             <scope>provided</scope>
15         </dependency>
16         <dependency>
17             <groupId>org.apache.storm</groupId>
18             <artifactId>storm-kafka</artifactId>
19             <version>1.2.1</version>
20         </dependency>
21         <dependency>
22             <groupId>org.apache.kafka</groupId>
23             <artifactId>kafka_2.12</artifactId>
24             <version>1.0.0</version>
25             <exclusions>
26                 <exclusion>
27                     <groupId>org.slf4j</groupId>
28                     <artifactId>slf4j-log4j12</artifactId>
29                 </exclusion>
30             </exclusions>
31         </dependency>
32         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
33         <dependency>
34             <groupId>org.apache.kafka</groupId>
35             <artifactId>kafka-clients</artifactId>
36             <version>1.0.0</version>
37         </dependency>
38 
39 
40     </dependencies>
41     <build>
42         <plugins>
43             <!-- 資源文件拷貝插件 -->
44             <plugin>
45                 <groupId>org.apache.maven.plugins</groupId>
46                 <artifactId>maven-resources-plugin</artifactId>
47                 <version>2.7</version>
48                 <configuration>
49                     <encoding>UTF-8</encoding>
50                 </configuration>
51             </plugin>
52             <!-- java編譯插件 -->
53             <plugin>
54                 <groupId>org.apache.maven.plugins</groupId>
55                 <artifactId>maven-compiler-plugin</artifactId>
56                 <version>3.2</version>
57                 <configuration>
58                     <source>1.8</source>
59                     <target>1.8</target>
60                     <encoding>UTF-8</encoding>
61                 </configuration>
62             </plugin>
63             <plugin>
64                 <groupId>org.apache.maven.plugins</groupId>
65                 <artifactId>maven-jar-plugin</artifactId>
66                 <version>2.4</version>
67             </plugin>
68             <plugin>
69                 <groupId>org.apache.maven.plugins</groupId>
70                 <artifactId>maven-assembly-plugin</artifactId>
71                 <version>2.4</version>
72                 <configuration>
73                     <descriptorRefs>
74                         <descriptorRef>jar-with-dependencies</descriptorRef>
75                     </descriptorRefs>
76                     <archive>
77                         <manifest>
78                             <mainClass>cn.itcast.kafka.Kafka2Storm</mainClass>
79                         </manifest>
80                     </archive>
81                 </configuration>
82                 <executions>
83                     <execution>
84                         <id>make-assembly</id>
85                         <phase>package</phase>
86                         <goals>
87                             <goal>single</goal>
88                         </goals>
89                     </execution>
90                 </executions>
91             </plugin>
92         </plugins>
93     </build>
94 </project>
pom.xml
 1 package cn.itcast.kafka;
 2 
 3 import org.apache.storm.Config;
 4 import org.apache.storm.LocalCluster;
 5 import org.apache.storm.generated.AlreadyAliveException;
 6 import org.apache.storm.generated.AuthorizationException;
 7 import org.apache.storm.generated.InvalidTopologyException;
 8 import org.apache.storm.kafka.BrokerHosts;
 9 import org.apache.storm.kafka.KafkaSpout;
10 import org.apache.storm.kafka.SpoutConfig;
11 import org.apache.storm.kafka.ZkHosts;
12 import org.apache.storm.topology.TopologyBuilder;
13 
14 public class Kafka2Storm {
15     public static void main(String[] args)
16             throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
17         TopologyBuilder topologyBuilder = new TopologyBuilder();
18         BrokerHosts hosts = new ZkHosts("192.168.25.142:2181,192.168.25.143:2181,192.168.25.144:2181");
19         /**
20          * hosts:用以獲取Kafka broker和partition的信息,在zk上獲取,此處填寫zk的地址
21          * topic:從哪一個topic讀取消息 zkRoot:進度信息記錄於zookeeper的哪一個路徑下
22          * id:進度記錄的id,想要一個新的Spout讀取以前的記錄,應把它的id設爲跟以前的同樣
23          */
24         SpoutConfig spoutConfig = new SpoutConfig(hosts, "stormTopic", "/mykafka", "kafkaSpout");
25         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
26         topologyBuilder.setSpout("kafkaSpout", kafkaSpout);
27         // 將一行行的文本切分紅單詞
28         topologyBuilder.setBolt("valueBolt", new ValueBolt(), 1).shuffleGrouping("kafkaSpout");
29         // 啓動topology的配置信息
30         Config config = new Config();
31         // 定義集羣分配多少個工做進程來執行這個topology
32         config.setNumWorkers(3);
33         
34          LocalCluster localCluster = new LocalCluster();
35          localCluster.submitTopology("kafkaStomrTopology", config,
36          topologyBuilder.createTopology());
37         // 集羣模式提交topology
38 //        StormSubmitter.submitTopologyWithProgressBar("kafkaStomrTopology", config, topologyBuilder.createTopology());
39     }
40 }
Kafka2Storm.java
 1 package cn.itcast.kafka;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.OutputFieldsDeclarer;
 8 import org.apache.storm.topology.base.BaseRichBolt;
 9 import org.apache.storm.tuple.Tuple;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 
13 public class ValueBolt extends BaseRichBolt {
14     Logger logger = LoggerFactory.getLogger(ValueBolt.class);
15 
16     /**
17      * 
18      */
19     private static final long serialVersionUID = 1L;
20 
21     @Override
22     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
23         // TODO Auto-generated method stub
24 
25     }
26 
27     @Override
28     public void execute(Tuple input) {
29         logger.info(new String((byte[]) input.getValue(0)));
30     }
31 
32     @Override
33     public void declareOutputFields(OutputFieldsDeclarer declarer) {
34         // TODO Auto-generated method stub
35 
36     }
37 
38 }
ValueBolt.java
相關文章
相關標籤/搜索