大數據處理框架之Strom:Flume+Kafka+Storm整合

環境
  虛擬機:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客戶端:Xshell4
  FTP:Xftp4
  jdk1.8
  storm-0.9
  apache-flume-1.6.0html

1、Flume+Kafka+Storm架構設計java

採集層:實現日誌收集,使用負載均衡策略
消息隊列:做用是解耦及不一樣速度系統緩衝
實時處理單元:用Storm來進行數據處理,最終數據流入DB中
展現單元:數據可視化,使用WEB框架展現node

2、案例:
經過flume客戶端向flume採集器發送日誌,flume將日誌發送到kafka集羣主題testflume,storm集羣消費kafka主題testflume日誌,將通過過濾處理的消息發送給kafka集羣主題LOGError,實現數據清理。git

Client:github

package com.sxt.flume;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

/**
 * Flume官網案例
 * http://flume.apache.org/FlumeDeveloperGuide.html 
 * @author root
 */
public class RpcClientDemo {
    
    public static void main(String[] args) {
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("node1", 41414);

        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        for (int i = 10; i < 20; i++) {
            String sampleData = "Hello Flume!ERROR" + i;
            client.sendDataToFlume(sampleData);
            System.out.println("發送數據:" + sampleData);
        }

        client.cleanUp();
    }
}

class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the
        // above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }

    public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

        // Send the event
        try {
            client.append(event);
        } catch (EventDeliveryException e) {
            // clean up and recreate the client
            client.close();
            client = null;
            client = RpcClientFactory.getDefaultInstance(hostname, port);
            // Use the following method to create a thrift client (instead of
            // the above line):
            // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
    }

    public void cleanUp() {
        // Close the RPC connection
        client.close();
    }
}

storm處理:shell

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.sxt.storm.logfileter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;

/**
 * This topology demonstrates Storm's stream groupings and multilang
 * capabilities.
 */
public class LogFilterTopology {

    public static class FilterBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String line = tuple.getString(0);
            System.err.println("Accept:  " + line);
            // 包含ERROR的行留下
            if (line.contains("ERROR")) {
                System.err.println("Filter:  " + line);
                collector.emit(new Values(line));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用
            declarer.declare(new Fields("message"));
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        // https://github.com/apache/storm/tree/master/external/storm-kafka
        // config kafka spout,話題
        String topic = "testflume";
        ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
        // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裏
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 對應一個應用
        List<String> zkServers = new ArrayList<String>();
        System.out.println(zkHosts.brokerZkStr);
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        // 是否從頭開始消費
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 1000;
        // StringScheme將字節流轉解碼成某種編碼的字符串
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // set kafka spout
        builder.setSpout("kafka_spout", kafkaSpout, 3);

        // set bolt
        builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");

        // 數據寫出
        // set kafka bolt
        // withTopicSelector使用缺省的選擇器指定寫入的topic: LogError
        // withTupleToKafkaMapper tuple==>kafka的key和message
        KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

        builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");

        Config conf = new Config();
        // set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
        /**
         * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 :
         * 生產者等待消息在leader接收成功確認以後,繼續發送下一條數據 -1 :
         * 生產者等待消息在follower副本接收到數據確認以後,繼續發送下一條數據
         */
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" }));

        // 本地方式運行
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("mytopology", conf, builder.createTopology());

    }
}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.sxt.storm.logfileter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;

/**
 * This topology demonstrates Storm's stream groupings and multilang
 * capabilities.
 */
public class LogFilterTopology {

    public static class FilterBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String line = tuple.getString(0);
            System.err.println("Accept:  " + line);
            // 包含ERROR的行留下
            if (line.contains("ERROR")) {
                System.err.println("Filter:  " + line);
                collector.emit(new Values(line));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // 定義message提供給後面FieldNameBasedTupleToKafkaMapper使用
            declarer.declare(new Fields("message"));
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        // https://github.com/apache/storm/tree/master/external/storm-kafka
        // config kafka spout,話題
        String topic = "testflume";
        ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
        // /MyKafka,偏移量offset的根目錄,記錄隊列取到了哪裏
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 對應一個應用
        List<String> zkServers = new ArrayList<String>();
        System.out.println(zkHosts.brokerZkStr);
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        // 是否從頭開始消費
        spoutConfig.forceFromStart = true;
        spoutConfig.socketTimeoutMs = 60 * 1000;
        // StringScheme將字節流轉解碼成某種編碼的字符串
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // set kafka spout
        builder.setSpout("kafka_spout", kafkaSpout, 3);

        // set bolt
        builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");

        // 數據寫出
        // set kafka bolt
        // withTopicSelector使用缺省的選擇器指定寫入的topic: LogError
        // withTupleToKafkaMapper tuple==>kafka的key和message
        KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

        builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");

        Config conf = new Config();
        // set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
        /**
         * Kafka生產者ACK機制 0 : 生產者不等待Kafka broker完成確認,繼續發送下一條數據 1 :
         * 生產者等待消息在leader接收成功確認以後,繼續發送下一條數據 -1 :
         * 生產者等待消息在follower副本接收到數據確認以後,繼續發送下一條數據
         */
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);

        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node1", "node2", "node3" }));

        // 本地方式運行
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("mytopology", conf, builder.createTopology());

    }
}

 


參考:
美團日誌收集系統
Apache Flume
Apache Flume負載均衡express

相關文章
相關標籤/搜索