(一)idea工具開發數據生成模擬程序java
1.在idea開發工具中構建weblogs項目,編寫數據生成模擬程序。web
package main.java;shell
import java.io.*;bash
public class ReadWrite {app
static String readFileName;ide
static String writeFileName;工具
public static void main(String args[]){oop
readFileName = args[0];開發工具
writeFileName = args[1];測試
try {
// readInput();
readFileByLines(readFileName);
}catch(Exception e){
}
}
public static void readFileByLines(String fileName) {
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String tempString = null;
try {
System.out.println("以行爲單位讀取文件內容,一次讀一整行:");
fis = new FileInputStream(fileName);// FileInputStream
// 從文件系統中的某個文件中獲取字節
isr = new InputStreamReader(fis,"GBK");
br = new BufferedReader(isr);
int count=0;
while ((tempString = br.readLine()) != null) {
count++;
// 顯示行號
Thread.sleep(300);
String str = new String(tempString.getBytes("UTF8"),"GBK");
System.out.println("row:"+count+">>>>>>>>"+tempString);
method1(writeFileName,tempString);
//appendMethodA(writeFileName,tempString);
}
isr.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (isr != null) {
try {
isr.close();
} catch (IOException e1) {
}
}
}
}
public static void method1(String file, String conent) {
BufferedWriter out = null;
try {
out = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file, true)));
out.write("\n");
out.write(conent);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.參照前面idea工具項目打包方式,將該項目打成weblogs.jar包,而後上傳至bigdata-pro01.kfk.com節點的/opt/jars目錄下(目錄須要提早建立)
3.將weblogs.jar分發到另外兩個節點
1)在另外兩個節點上分別建立/opt/jars目錄
mkdir /opt/jars
2)將weblogs.jar分發到另外兩個節點
scp weblogs.jar bigdata-pro02.kfk.com:/opt/jars/
scp weblogs.jar bigdata-pro03.kfk.com:/opt/jars/
4.編寫運行模擬程序的shell腳本
1)在bigdata-pro02.kfk.com節點的/opt/datas目錄下,建立weblog-shell.sh腳本。
vi weblog-shell.sh
#/bin/bash
echo "start log......"
#第一個參數是原日誌文件,第二個參數是日誌生成輸出文件
java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log
修改weblog-shell.sh可執行權限
chmod 777 weblog-shell.sh
2)將bigdata-pro02.kfk.com節點上的/opt/datas/目錄拷貝到bigdata-pro03節點.kfk.com
scp -r /opt/datas/ bigdata-pro03.kfk.com:/opt/datas/
3)修改bigdata-pro02.kfk.com和bigdata-pro03.kfk.com節點上面日誌採集文件路徑。以bigdata-pro02.kfk.com節點爲例。
vi flume-conf.properties
agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1
agent2.sources.r1.type = exec
#修改採集日誌文件路徑,bigdata-pro03.kfk.com節點也是修改此處
agent2.sources.r1.command = tail -F /opt/datas/weblog-flume.log
agent2.sources.r1.channels = c1
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000
agent2.channels.c1.transactionCapacity = 10000
agent2.channels.c1.keep-alive = 5
agent2.sinks.k1.type = avro
agent2.sinks.k1.channel = c1
agent2.sinks.k1.hostname = bigdata-pro01.kfk.com
agent2.sinks.k1.port = 5555
(二)編寫啓動flume服務程序的shell腳本
1.在bigdata-pro02.kfk.com節點的flume安裝目錄下編寫flume啓動腳本。
vi flume-kfk-start.sh
#/bin/bash
echo "flume-2 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent2 -Dflume.root.logger=INFO,console
2.在bigdata-pro03.kfk.com節點的flume安裝目錄下編寫flume啓動腳本。
vi flume-kfk-start.sh
#/bin/bash
echo "flume-3 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent3 -Dflume.root.logger=INFO,console
3.在bigdata-pro01.kfk.com節點的flume安裝目錄下編寫flume啓動腳本。
vi flume-kfk-start.sh
#/bin/bash
echo "flume-1 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console
(三)編寫Kafka Consumer執行腳本
1.在bigdata-pro01.kfk.com節點的Kafka安裝目錄下編寫Kafka Consumer執行腳本
vi kfk-test-consumer.sh
#/bin/bash
echo "kfk-kafka-consumer.sh start ......"
bin/kafka-console-consumer.sh --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --from-beginning --topic weblogs
2.將kfk-test-consumer.sh腳本分發另外兩個節點
scp kfk-test-consumer.sh bigdata-pro02.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/
scp kfk-test-consumer.sh bigdata-pro03.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/
(四)啓動模擬程序並測試
在bigdata-pro02.kfk.com節點啓動日誌產生腳本,模擬產生日誌是否正常。
/opt/datas/weblog-shell.sh
(五)啓動數據採集全部服務
1.啓動Zookeeper服務
bin/zkServer.sh start
2.啓動hdfs服務
sbin/start-dfs.sh
3.啓動HBase服務
bin/start-hbase.sh
建立hbase業務表
bin/hbase shell
create 'weblogs','info'
4.啓動Kafka服務
bin/kafka-server-start.sh config/server.properties &
建立業務數據topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic weblogs --replication-factor 1 --partitions 1
5.配置flume相關環境變量
vi flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.7.0_67
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0
(六)完成數據採集全流程測試
1.在bigdata-pro01.kfk.com節點上啓動flume聚合腳本,將採集的數據分發到Kafka集羣和hbase集羣。
./flume-kfk-start.sh
2.在bigdata-pro02.kfk.com節點上完成數據採集
1)使用shell腳本模擬日誌產生
cd /opt/datas/
./weblog-shell.sh
2)啓動flume採集日誌數據發送給聚合節點
./flume-kfk-start.sh
3.在bigdata-pro03.kfk.com節點上完成數據採集
1)使用shell腳本模擬日誌產生
cd /opt/datas/
./weblog-shell.sh
2)啓動flume採集日誌數據發送給聚合節點
./flume-kfk-start.sh
4.啓動Kafka Consumer查看flume日誌採集狀況
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic weblogs --from-beginning
5.查看hbase數據寫入狀況
./hbase-shell
count 'weblogs'