新聞實時分析系統-數據採集/存儲/分發完整流程測試

(一)idea工具開發數據生成模擬程序java

1.在idea開發工具中構建weblogs項目,編寫數據生成模擬程序。web

package main.java;shell

import java.io.*;bash

public class ReadWrite {app

      static String readFileName;ide

      static String writeFileName;工具

      public static void main(String args[]){oop

           readFileName = args[0];開發工具

           writeFileName = args[1];測試

          try {

             // readInput();

            readFileByLines(readFileName);

          }catch(Exception e){

          }

      }

 

    public static void readFileByLines(String fileName) {

        FileInputStream fis = null;

        InputStreamReader isr = null;

        BufferedReader br = null;

        String tempString = null;

        try {

            System.out.println("以行爲單位讀取文件內容,一次讀一整行:");

            fis = new FileInputStream(fileName);// FileInputStream

            // 從文件系統中的某個文件中獲取字節

            isr = new InputStreamReader(fis,"GBK");

            br = new BufferedReader(isr);

            int count=0;

            while ((tempString = br.readLine()) != null) {

                count++;

                // 顯示行號

                Thread.sleep(300);

                String str = new String(tempString.getBytes("UTF8"),"GBK");

                System.out.println("row:"+count+">>>>>>>>"+tempString);

                method1(writeFileName,tempString);

                //appendMethodA(writeFileName,tempString);

            }

            isr.close();

        } catch (IOException e) {

            e.printStackTrace();

        } catch (InterruptedException e) {

            e.printStackTrace();

        } finally {

            if (isr != null) {

                try {

                    isr.close();

                } catch (IOException e1) {

                }

            }

        }

    }

    public static void method1(String file, String conent) {

        BufferedWriter out = null;

        try {

            out = new BufferedWriter(new OutputStreamWriter(

                    new FileOutputStream(file, true)));

            out.write("\n");

            out.write(conent);

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            try {

                out.close();

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

}

2.參照前面idea工具項目打包方式,將該項目打成weblogs.jar包,而後上傳至bigdata-pro01.kfk.com節點的/opt/jars目錄下(目錄須要提早建立)

3.將weblogs.jar分發到另外兩個節點

1)在另外兩個節點上分別建立/opt/jars目錄

mkdir /opt/jars

2)將weblogs.jar分發到另外兩個節點

scp weblogs.jar bigdata-pro02.kfk.com:/opt/jars/

scp weblogs.jar bigdata-pro03.kfk.com:/opt/jars/

4.編寫運行模擬程序的shell腳本

1)在bigdata-pro02.kfk.com節點的/opt/datas目錄下,建立weblog-shell.sh腳本。

vi weblog-shell.sh

#/bin/bash

echo "start log......"

#第一個參數是原日誌文件,第二個參數是日誌生成輸出文件

java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

修改weblog-shell.sh可執行權限

chmod 777 weblog-shell.sh

2)將bigdata-pro02.kfk.com節點上的/opt/datas/目錄拷貝到bigdata-pro03節點.kfk.com

scp -r /opt/datas/ bigdata-pro03.kfk.com:/opt/datas/

3)修改bigdata-pro02.kfk.com和bigdata-pro03.kfk.com節點上面日誌採集文件路徑。以bigdata-pro02.kfk.com節點爲例。

vi flume-conf.properties

agent2.sources = r1

agent2.channels = c1

agent2.sinks = k1

 

agent2.sources.r1.type = exec

#修改採集日誌文件路徑,bigdata-pro03.kfk.com節點也是修改此處

agent2.sources.r1.command = tail -F /opt/datas/weblog-flume.log

agent2.sources.r1.channels = c1

 

agent2.channels.c1.type = memory

agent2.channels.c1.capacity = 10000

agent2.channels.c1.transactionCapacity = 10000

agent2.channels.c1.keep-alive = 5

 

agent2.sinks.k1.type = avro

agent2.sinks.k1.channel = c1

agent2.sinks.k1.hostname = bigdata-pro01.kfk.com

agent2.sinks.k1.port = 5555

(二)編寫啓動flume服務程序的shell腳本

1.在bigdata-pro02.kfk.com節點的flume安裝目錄下編寫flume啓動腳本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-2 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent2 -Dflume.root.logger=INFO,console

2.在bigdata-pro03.kfk.com節點的flume安裝目錄下編寫flume啓動腳本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-3 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent3 -Dflume.root.logger=INFO,console

3.在bigdata-pro01.kfk.com節點的flume安裝目錄下編寫flume啓動腳本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-1 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console

(三)編寫Kafka Consumer執行腳本

1.在bigdata-pro01.kfk.com節點的Kafka安裝目錄下編寫Kafka Consumer執行腳本

vi kfk-test-consumer.sh

#/bin/bash

echo "kfk-kafka-consumer.sh start ......"

bin/kafka-console-consumer.sh --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --from-beginning --topic weblogs

2.將kfk-test-consumer.sh腳本分發另外兩個節點

scp kfk-test-consumer.sh bigdata-pro02.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

scp kfk-test-consumer.sh bigdata-pro03.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

(四)啓動模擬程序並測試

在bigdata-pro02.kfk.com節點啓動日誌產生腳本,模擬產生日誌是否正常。

/opt/datas/weblog-shell.sh

 

(五)啓動數據採集全部服務

1.啓動Zookeeper服務

bin/zkServer.sh start

2.啓動hdfs服務

sbin/start-dfs.sh

3.啓動HBase服務

 bin/start-hbase.sh

建立hbase業務表

bin/hbase shell

create 'weblogs','info'

4.啓動Kafka服務

bin/kafka-server-start.sh config/server.properties &

建立業務數據topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic weblogs --replication-factor 1 --partitions 1

5.配置flume相關環境變量

vi flume-env.sh

export JAVA_HOME=/opt/modules/jdk1.7.0_67

export HADOOP_HOME=/opt/modules/hadoop-2.5.0

export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0

(六)完成數據採集全流程測試

1.在bigdata-pro01.kfk.com節點上啓動flume聚合腳本,將採集的數據分發到Kafka集羣和hbase集羣。

./flume-kfk-start.sh

2.在bigdata-pro02.kfk.com節點上完成數據採集

1)使用shell腳本模擬日誌產生

cd /opt/datas/

./weblog-shell.sh

2)啓動flume採集日誌數據發送給聚合節點

./flume-kfk-start.sh

3.在bigdata-pro03.kfk.com節點上完成數據採集

1)使用shell腳本模擬日誌產生

cd /opt/datas/

./weblog-shell.sh

2)啓動flume採集日誌數據發送給聚合節點

./flume-kfk-start.sh

4.啓動Kafka Consumer查看flume日誌採集狀況

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic weblogs --from-beginning

5.查看hbase數據寫入狀況

./hbase-shell

count 'weblogs'

相關文章
相關標籤/搜索