環境
虛擬機:VMware 10
Linux版本:CentOS-6.5-x86_64
客戶端:Xshell4
FTP:Xftp4
jdk1.8
storm-0.9
apache-flume-1.6.0html
1、Flume+Kafka+Storm架構設計java
採集層:實現日誌收集,使用負載均衡策略
消息隊列:做用是解耦及不一樣速度系統緩衝
實時處理單元:用Storm來進行數據處理,最終數據流入DB中
展現單元:數據可視化,使用WEB框架展現node
2、案例:
經過flume客戶端向flume採集器發送日誌,flume將日誌發送到kafka集羣主題testflume,storm集羣消費kafka主題testflume日誌,將通過過濾處理的消息發送給kafka集羣主題LOGError,實現數據清理。git
Client:github
package com.sxt.flume; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import java.nio.charset.Charset; /** * Flume官網案例 * http://flume.apache.org/FlumeDeveloperGuide.html * @author root */ public class RpcClientDemo { public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); // Initialize client with the remote Flume agent's host and port client.init("node1", 41414); // Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. for (int i = 10; i < 20; i++) { String sampleData = "Hello Flume!ERROR" + i; client.sendDataToFlume(sampleData); System.out.println("發送數據:" + sampleData); } client.cleanUp(); } } class MyRpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of the // above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); // Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of // the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }
storm處理:shell
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.sxt.storm.logfileter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import storm.kafka.bolt.selector.DefaultTopicSelector; /** * This topology demonstrates Storm's stream groupings and multilang * capabilities. */ public class LogFilterTopology { public static class FilterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String line = tuple.getString(0); System.err.println("Accept: " + line); // 包含ERROR的行留下 if (line.contains("ERROR")) { System.err.println("Filter: " + line); collector.emit(new Values(line)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用 declarer.declare(new Fields("message")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // https://github.com/apache/storm/tree/master/external/storm-kafka // config kafka spout,話題 String topic = "testflume"; ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181"); // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裏 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 對應一個應用 List<String> zkServers = new ArrayList<String>(); System.out.println(zkHosts.brokerZkStr); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = 2181; // 是否從頭開始消費 spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 1000; // StringScheme將字節流轉解碼成某種編碼的字符串 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); // set kafka spout builder.setSpout("kafka_spout", kafkaSpout, 3); // set bolt builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout"); // 數據寫出 // set kafka bolt // withTopicSelector使用缺省的選擇器指定寫入的topic: LogError // withTupleToKafkaMapper tuple==>kafka的key和message KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter"); Config conf = new Config(); // set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092"); /** * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 : * 生產者等待消息在leader接收成功確認以後,繼續發送下一條數據 -1 : * 生產者等待消息在follower副本接收到數據確認以後,繼續發送下一條數據 */ props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" })); // 本地方式運行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } }
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.sxt.storm.logfileter; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import storm.kafka.bolt.selector.DefaultTopicSelector; /** * This topology demonstrates Storm's stream groupings and multilang * capabilities. */ public class LogFilterTopology { public static class FilterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String line = tuple.getString(0); System.err.println("Accept: " + line); // 包含ERROR的行留下 if (line.contains("ERROR")) { System.err.println("Filter: " + line); collector.emit(new Values(line)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用 declarer.declare(new Fields("message")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // https://github.com/apache/storm/tree/master/external/storm-kafka // config kafka spout,話題 String topic = "testflume"; ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181"); // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裏 SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 對應一個應用 List<String> zkServers = new ArrayList<String>(); System.out.println(zkHosts.brokerZkStr); for (String host : zkHosts.brokerZkStr.split(",")) { zkServers.add(host.split(":")[0]); } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = 2181; // 是否從頭開始消費 spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 1000; // StringScheme將字節流轉解碼成某種編碼的字符串 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); // set kafka spout builder.setSpout("kafka_spout", kafkaSpout, 3); // set bolt builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout"); // 數據寫出 // set kafka bolt // withTopicSelector使用缺省的選擇器指定寫入的topic: LogError // withTupleToKafkaMapper tuple==>kafka的key和message KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter"); Config conf = new Config(); // set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092"); /** * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 : * 生產者等待消息在leader接收成功確認以後,繼續發送下一條數據 -1 : * 生產者等待消息在follower副本接收到數據確認以後,繼續發送下一條數據 */ props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", props); conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" })); // 本地方式運行 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } }
參考:
美團日誌收集系統
Apache Flume
Apache Flume負載均衡express