基於windows10的Flume+kafka+storm的集成學習筆記

這個週末基於windows10單機版學習Flume+kafka+storm的簡單集成,目的是加深對應基本概念的認識。這裏不具體介紹flume,kafka,storm的原理,只對基本概念只作簡單說明。php

1.1  準備階段

操做系統:windows 10家庭版html

在官方網站下載下載編譯後的軟件,本人學習對應的軟件版本以下:java

apache-flume-1.9.0-binnode

apache-storm-1.0.5git

kafka_2.11-1.1.1spring

zookeeper-3.4.10shell

1.2  學習目標

(1)     使用Flume基於spooling directory和netcat採集日誌數據,做爲Kafka的Producer;數據庫

(2)     使用Kafka的客戶端輸入日誌做爲Kafka的Producer;apache

(3)     使用storm消費Kafka的日誌,讀取的日誌數據保存到文件系統。bootstrap

以下圖:

 

1.3  Flume

1.3.1  基本概念

Flume是一個分佈式、可靠、高可用的海量日誌採集、聚合、傳輸的系統。核心是把數據從數據源(source)收集過來,在將收集到的數據送到指定的目的地(sink)。爲了保證輸送的過程必定成功,在送到目的地(sink)以前,會先緩存數據(channel),待數據真正到達目的地(sink)後,Flume再刪除本身緩存的數據。

基本概念:

Source:source組件是專門用來收集數據的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。

Channelsource組件把數據收集來之後,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數據的——對採集到的數據進行簡單的緩存,能夠存放在memory、jdbc、file等等。

Sinksink組件是用於把數據發送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。

 

Flume運行機制:

Flume的核心就是一個agent,這個agent對外有兩個進行交互的地方,一個是接受數據輸入的source,一個是數據輸出的sink,sink負責將數據發送到外部指定的目的地。source接收到數據以後,將數據發送給channel,chanel做爲一個數據緩衝區會臨時存放這些數據,隨後sink會將channel中的數據發送到指定的地方,例如HDFS等。注意:只有在sink將channel中的數據成功發送出去以後,channel纔會將臨時數據進行刪除,這種機制保證了數據傳輸的可靠性與安全性。

1.3.1  演示例子

下面的例子以spooling directory source和netcat source爲例子進行學習。

步驟一:下載官網flume1.9

將軟件解壓到本地D:\Study\codeproject\apache-flume-1.9.0-bin

步驟二:在conf目錄下複製flume-env.ps1.template改成flume-env.ps1

步驟三:檢查安裝是否成功: 

D:\Study\codeproject\apache-flume-1.9.0-bin\bin>flume-ng.cmd version

D:\Study\codeproject\apache-flume-1.9.0-bin\bin>powershell.exe -NoProfile -InputFormat none -ExecutionPolicy unrestricted -File D:\Study\codeproject\apache-flume-1.9.0-bin\bin\flume-ng.ps1 version
WARN: Config directory not set. Defaulting to D:\Study\codeproject\apache-flume-1.9.0-bin\conf
Sourcing environment configuration script D:\Study\codeproject\apache-flume-1.9.0-bin\conf\flume-env.ps1
  Running FLUME version :
    class: org.apache.flume.tools.VersionInfo
    arguments:
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
 

備註:不清楚爲啥在windows10下一直報以下錯誤:

Sourcing environment configuration script D:\Study\codeproject\apache-flume-1.9.0-bin\conf\flume-env.ps1

Test-Path : 路徑中具備非法字符。

所在位置 F:\kafka\apache-flume-1.8.0-bin\bin\flume-ng.ps1:106 字符: 56

+ ...                               ? { "$_" -ne "" -and (Test-Path $_ )} |

+                                                         ~~~~~~~~~~~~

    + CategoryInfo          : InvalidArgument: (F:\kafka\apache-flume-1.8.0-bin\":String) [Test-Path],ArgumentException

    + FullyQualifiedErrorId : ItemExistsArgumentError,Microsoft.PowerShell.Commands.TestPathCommand

後將GetHadoopHome、GetHbaseHome、GetHiveHome相關的腳本所有註釋掉,就能夠了。

 

步驟四:在conf目錄下新增kafka_sink.conf配置文件,內容以下:

# example.conf: A single-node Flume configuration

#命名Agent a1的組件

a1.sources = r1 r2

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 12345

a1.sources.r2.type = spooldir

a1.sources.r2.spoolDir = D:/test

a1.sources.r2.fileHeader = true

#描述Sink

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = flume

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.compression.type = snappy

#描述內存Channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#爲Channle綁定Source和Sink

a1.sources.r1.channels = c1

a1.sources.r2.channels = c1

a1.sinks.k1.channel = c1

因爲KAFKA尚未配置好,這裏先不啓動Flume,等kafka部署好後啓動。

 

1.4     Kafka

1.4.1  基本概念

Kafka是一個分佈式數據流平臺,能夠運行在單臺服務器上,也能夠在多臺服務器上部署造成集羣。它提供了相似於JMS的特性,可是在設計實現上徹底不一樣,此外它並非JMS規範的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成爲Producer,消息接受者成爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)成爲broker。不管是kafka集羣,仍是producer和consumer都依賴於zookeeper來保證系統可用性集羣保存一些meta信息。

基本概念:

Message(消息):傳遞的數據對象,主要由四部分構成:offset(偏移量)、key、value、timestamp(插入時間); 其中offset和timestamp在kafka集羣中產生,key/value在producer發送數據的時候產生

Broker(代理者):Kafka集羣中的機器/服務被成爲broker, 是一個物理概念。

Topic(主題):維護Kafka上的消息類型被稱爲Topic,是一個邏輯概念。

Partition(分區):具體維護Kafka上的消息數據的最小單位,一個Topic能夠包含多個分區;Partition特性:ordered & immutable。(在數據的產生和消費過程當中,不須要關注數據具體存儲的Partition在那個Broker上,只須要指定Topic便可,由Kafka負責將數據和對應的Partition關聯上)

Producer(生產者):負責將數據發送到Kafka對應Topic的進程

Consumer(消費者):負責從對應Topic獲取數據的進程

Consumer Group(消費者組):每一個consumer都屬於一個特定的group組,一個group組能夠包含多個consumer,但一個組中只會有一個consumer消費數據。

1.4.2  演示例子

第一步:下載對應的軟件(依賴zookeeper)

kafka_2.11-1.1.1(目錄:D:\Study\codeproject\kafka_2.11-1.1.1)

zookeeper-3.4.10(目錄:D:\Study\codeproject\zookeeper-3.4.10)

 

第二步:運行Zookeeper

這裏zookeeper採用單機安裝,很是簡單,在執行啓動腳本以前,有幾個基本的配置項須要配置一下,Zookeeper 的配置文件在 conf 目錄下,這個目錄下有 zoo_sample.cfg 和 log4j.properties,須要將 zoo_sample.cfg 更名爲 zoo.cfg,由於 Zookeeper 在啓動時會找這個文件做爲默認配置文件。

修改zoo.cfg配置:

dataDir=D:/Study/codeproject/zookeeper-3.4.10/data

dataDir就是 Zookeeper 保存數據的目錄,默認狀況下,Zookeeper 將寫數據的日誌文件也保存在這個目錄裏。

執行啓動命令:

D:\Study\codeproject\zookeeper-3.4.10\bin>zkServer.cmd

第三步:運行Kafka

修改kafka配置:

D:\Study\codeproject\kafka_2.11-1.1.1\config>server.properties 修改該文件的配置(數據存儲的位置)

log.dirs=D:/Study/codeproject/kafka_2.11-1.1.1/data

啓動kafka,這裏注意一下是否正常運行了,若是日誌報錯則將日誌文件夾刪除後再讓其自動從新生成。

D:\Study\codeproject\kafka_2.11-1.1.1>.\bin\windows\kafka-server-start.bat .\config\server.properties

第四步:建立一個名字爲flume的Topic

D:\Study\codeproject\kafka_2.11-1.1.1\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 --topic flume

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

第五步:打開這個Topic的一個Consumer

新打開一個窗口,建立一個KAFKA消費者

D:\Study\codeproject\kafka_2.11-1.1.1\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 --topic flume

第六步:啓動Flume

啓動剛纔配置好的Flume :

D:\Study\codeproject\apache-flume-1.9.0-bin\bin>flume-ng agent --conf ../conf --conf-file ../conf/kafka_sink.conf --name a1 -property flume.root.logger=INFO,console

 D:\Study\codeproject\apache-flume-1.9.0-bin\bin>powershell.exe -NoProfile -InputFormat none -ExecutionPolicy unrestricted -File D:\Study\codeproject\apache-flume-1.9.0-bin\bin\flume-ng.ps1 agent --conf ../conf --conf-file ../conf/kafka_sink.conf --name a1 -property flume.root.logger=INFO,console

Sourcing environment configuration script ../conf\flume-env.ps1

  Running FLUME agent :

    class: org.apache.flume.node.Application

    arguments: -n a1 -f "D:\Study\codeproject\apache-flume-1.9.0-bin\conf\kafka_sink.conf"

 

說明:其中--conf指定配置文件路徑,--conf-file指定配置文件,--name指定配置文件裏的要啓動agent名字(一個配置文件裏能夠有多個agent的定義),-Dflume.root.logger指定Flume運行時輸出的日誌的級別和地方。

 這樣flume就鏈接到kafka了。

 

第六步:驗證Flume是否與kafka正常鏈接,使用telnet(netcat)

新建一個窗口,telnet localhost 12345

輸入:hello, I am from netcat source....

檢查kafka的消費者端口是否正常輸出,以下圖:

 

 

第七步:驗證spooling directory接入Flume是否能夠被KAFKA消費

在D:\Test目錄下放入一個文件test文本文件,內容爲:

Hello, I am from file system source.......

觀看文件處理結果(文件被增長了COMPLETLED,同時文件內容呈如今KAFKA消費者的界面上):

 

 

1.4.3  使用Elicpse開一個簡單的生產者,查看KAFKA消費者的輸出

package testdemo;
/**
 * @author 45014
 *
 */
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KDataProducer
{
 private static Producer<Integer, String> producer;
 private final Properties props = new Properties();
 public KDataProducer()
 {
  // 定義鏈接的broker list
  props.put("metadata.broker.list", "localhost:9092");
  // 定義序列化類 Java中對象傳輸以前要序列化
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  producer = new Producer<Integer, String>(new ProducerConfig(props));
 }
 public static void main(String[] args)
 {
  for (int x = 0; x < 200; x = x + 1)  //發送200次消息
  {
   KDataProducer sp = new KDataProducer();
   // 定義topic
   String topic = "flume";
   // 定義要發送給topic的消息
   String messageStr = "I am from Kafka Client based on elcipse...";
   List<KeyedMessage<Integer, String>> datalist = new ArrayList<KeyedMessage<Integer, String>>();
   // 構建消息對象
   KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
   datalist.add(data);
   // 推送消息到broker
   producer.send(data);
   producer.close();
   
   try
   {
    Thread.sleep(6000);
   }
   catch(Exception e)
   {
    e.printStackTrace();
   }
  }
 }
}

輸出結果以下:

1.5      Storm

1.5.1  基本概念

  Storm是一個分佈式實時流式計算平臺,支持水平擴展,經過追加機器就能提供併發數進而提升處理能力;同時具有自動容錯機制,能自動處理進程、機器、網絡等異常。它能夠很方便地對流式數據進行實時處理和分析,能運用在實時分析、在線數據挖掘、持續計算以及分佈式 RPC 等場景下。Storm 的實時性可使得數據從收集處處理展現在秒級別內完成,從而爲業務方決策提供實時的數據支持。

Nimbus和Supervisor之間的全部協調工做都是經過Zookeeper集羣完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。全部的狀態要麼在zookeeper裏面, 要麼在本地磁盤上。這也就意味着你能夠用kill -9來殺死Nimbus和Supervisor進程, 而後再重啓它們,就好像什麼都沒有發生過。這個設計使得Storm異常的穩定。

 

Nimbus:即Storm的Master,負責資源分配和任務調度。一個Storm集羣只有一個Nimbus。

Supervisor:即Storm的Slave,負責接收Nimbus分配的任務,管理全部Worker,一個Supervisor節點中包含多個Worker進程。

Worker:工做進程,每一個工做進程中都有多個Task。

Task:任務,在 Storm 集羣中每一個 Spout 和 Bolt 都由若干個任務(tasks)來執行。每一個任務都與一個執行線程相對應。

Topology:計算拓撲,Storm 的拓撲是對實時計算應用邏輯的封裝,它的做用與 MapReduce 的任務(Job)很類似,區別在於 MapReduce 的一個 Job 在獲得結果以後總會結束,而拓撲會一直在集羣中運行,直到你手動去終止它。拓撲還能夠理解成由一系列經過數據流(Stream Grouping)相互關聯的 Spout 和 Bolt 組成的的拓撲結構。

Stream:數據流(Streams)是 Storm 中最核心的抽象概念。一個數據流指的是在分佈式環境中並行建立、處理的一組元組(tuple)的無界序列。數據流能夠由一種可以表述數據流中元組的域(fields)的模式來定義。

Spout:數據源(Spout)是拓撲中數據流的來源。通常 Spout 會從一個外部的數據源讀取元組而後將他們發送到拓撲中。根據需求的不一樣,Spout 既能夠定義爲可靠的數據源,也能夠定義爲不可靠的數據源。一個可靠的 Spout可以在它發送的元組處理失敗時從新發送該元組,以確保全部的元組都能獲得正確的處理;相對應的,不可靠的 Spout 就不會在元組發送以後對元組進行任何其餘的處理。一個 Spout能夠發送多個數據流。

Bolt:拓撲中全部的數據處理均是由 Bolt 完成的。經過數據過濾(filtering)、函數處理(functions)、聚合(aggregations)、聯結(joins)、數據庫交互等功能,Bolt 幾乎可以完成任何一種數據處理需求。一個 Bolt 能夠實現簡單的數據流轉換,而更復雜的數據流變換一般須要使用多個 Bolt 並經過多個步驟完成。

Stream grouping:爲拓撲中的每一個 Bolt 的肯定輸入數據流是定義一個拓撲的重要環節。數據流分組定義了在 Bolt 的不一樣任務(tasks)中劃分數據流的方式。在 Storm 中有八種內置的數據流分組方式。

Topology是咱們開發程序主要的用的組件。Topology則是使用Spout獲取數據,Bolt來進行計算。總的來講就是一個Topology由一個或者多個的Spout和Bolt組成。

1.5.1  演示例子

步驟一:下載官網apache-storm-1.0.5

解壓到本地D:\Study\codeproject\apache-storm-1.0.5

步驟二:修改storm.yaml的配置

storm.local.dir: "D:\\Study\\codeproject\\apache-storm-1.0.5\\data"

 

步驟三:啓動storm服務,分別

D:\Study\codeproject\apache-storm-1.0.5\bin>storm nimbus

D:\Study\codeproject\apache-storm-1.0.5\bin>storm supervisor

D:\Study\codeproject\apache-storm-1.0.5\bin>storm ui

 

登陸管理頁面查看:http://localhost:8080/index.html

 

Topology Summary展示提交到集羣中的Topology

拓撲運行模式支持:本地模式和分佈式模式下面分別進行學習:

1、本地模式

本地模式是咱們用來本地開發調試的,不須要部署到storm集羣就能運行,運行java的main函數就能夠了。

 

步驟一:將以下代碼去掉註釋,而後執行KafkaTopology的main函數:

 LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology("WordCount1", config, builder.createTopology());

步驟二:執行KafkaTopology的main函數

爲演示storm獲取flume/kafka的數據,須要進行以下操做:

(1)在Flume的telnet 端口輸入:

  hello, I am from netcat source....

(2)在test文件夾下面新建一個test.txt文件

  文件內容爲:Hello, I am from file system source.......

(3)執行KDataProducer的main函數,進行kafka prducer生產數據:

 

步驟三:查看D:盤生成的文件:word-92e0042c-8a74-4343-80c6-f13c1c29c168,內容以下:

 

內容正好是輸入的信息,驗證結果OK。。。

2、集羣模式

 

步驟一:修改demo代碼:

須要將本地模式的代碼註釋掉,而後以下代碼去掉註釋。

StormSubmitter.submitTopology("storm----kafka--test5", config, builder.createTopology());

步驟二:

因爲是依賴編譯,maven工程中須要添加以下腳本:

    <plugins>

      <plugin>

        <artifactId>maven-assembly-plugin</artifactId>

        <configuration>

          <archive>

            <manifest>

              <!--在這裏本身寫MainClass -->

              <mainClass>testdemo.KafkaTopology</mainClass>

            </manifest>

          </archive>

          <descriptorRefs>

            <descriptorRef>jar-with-dependencies</descriptorRef>

          </descriptorRefs>

        </configuration>

      </plugin>

    </plugins>

步驟三:執行編譯命令(對應目錄爲eclipse工程對應的目錄):

D:\softwareTools\spring-tool-suite-3.9.4.RELEASE-e4.7.3a-win32-x86_64\mywork\testdemo> mvn clean assembly:assembly

編譯成功後結果顯示以下:

 

步驟四:查看生成的jar(注意是包含依賴,不然執行會失敗)

 

 

步驟五:將上述包複製到storm的bin對應目錄下:

 

 

步驟六:而後在storm集羣中能夠執行以下命令:

D:\Study\codeproject\apache-storm-1.0.5\bin>storm jar ./testdemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar testdemo.KafkaTopology KafkaTopology

storm jar ./testdemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar testdemo.KafkaTopology KafkaTopology

注意:KafkaTopology爲執行類的入口。

打開storm的管理頁面,咱們就能夠看到提交的Topology,Topology名字對應代碼中:

      // 集羣模式
     StormSubmitter.submitTopology("Word1", config, builder.createTopology());

 

1.5.2  對應簡單代碼

package testdemo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.topology.TopologyBuilder;
import java.util.Map;

import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;

/**
 * @author 45014
 *
 */
public class KafkaTopology
{

    public KafkaTopology()
    {
    }

    public void run()
    {
        // 指定zk的地址
        BrokerHosts brokerHosts = new ZkHosts("localhost:2181");

        TopologyBuilder builder = new TopologyBuilder();
        // zookeeper連接地址
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        // KafkaSpout須要一個config,參數表明的意義1:zookeeper連接,2:消費kafka的topic,3,4:記錄消費offset的zookeeper地址
        
        // 集羣的/test/consume下面
        SpoutConfig sconfig = new SpoutConfig(hosts, "flume", "/test", "consume");
        // 消費的時候忽略offset從頭開始消費,這裏能夠註釋掉,由於消費的offset在zookeeper中能夠找到
        sconfig.ignoreZkOffsets = true;
        // sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
        builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);
        builder.setBolt("myWritebolt1", new MyWritebolt(), 1).shuffleGrouping("kafkaspout");
    
        Config config = new Config();
        config.setNumWorkers(1);
        try
        {
            // 集羣模式
            //StormSubmitter.submitTopology("Word1", config, builder.createTopology());
        
            // 本地模式
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("Word1", config, builder.createTopology());
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
    public static void main(String[] args)
    {
        new KafkaTopology().run();
    }
}
/**
 * 
 */
package testdemo;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.UUID;
/**
 * @author 45014
 *
 */
public class  MyWritebolt extends  BaseRichBolt
{
    private static final long serialVersionUID = 1L;
    OutputCollector collector = null;
    FileWriter writer = null;
    
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) 
    {
        this.collector = collector;
        try
        {
            writer = new FileWriter("D:/" + "word-" + UUID.randomUUID().toString());
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
    
    public void execute(Tuple input) 
    {
        try
        {
            //獲得的內容是byte數組,因此須要轉換
            String out = new String((byte[])input.getValue(0));
            System.out.println(out);
            writer.write(out);
            writer.write("\n");
            writer.flush();            
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
       // collector.ack(input);       
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer)
    {
         
    }
}

關鍵依賴:

     <dependencies>
  
       <dependency>
         <groupId>org.apache.storm</groupId>
         <artifactId>storm-core</artifactId>
         <version>1.0.5</version>
         <scope>provided</scope>
 
     </dependency>
     <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.9.2</artifactId>
         <version>0.8.2.2</version>
         <exclusions>
             <exclusion>
                 <groupId>org.apache.zookeeper</groupId>
                 <artifactId>zookeeper</artifactId>
             </exclusion>
             <exclusion>
                 <groupId>log4j</groupId>
                 <artifactId>log4j</artifactId>
             </exclusion>
         </exclusions>
     </dependency>
     <dependency>
         <groupId>org.apache.storm</groupId>
         <artifactId>storm-kafka</artifactId>
         <!-- version>0.9.6</version--> 
          <version>1.1.3</version> 
     </dependency>
        </dependencies>

 

-------結束-----------

相關文章
相關標籤/搜索