CentOS6安裝各類大數據軟件 第一章:各個軟件版本介紹html
CentOS6安裝各類大數據軟件 第二章:Linux各個軟件啓動命令java
CentOS6安裝各類大數據軟件 第三章:Linux基礎軟件的安裝node
CentOS6安裝各類大數據軟件 第四章:Hadoop分佈式集羣配置數據庫
CentOS6安裝各類大數據軟件 第五章:Kafka集羣的配置apache
CentOS6安裝各類大數據軟件 第六章:HBase分佈式集羣的配置數組
CentOS6安裝各類大數據軟件 第七章:Flume安裝與配置bash
CentOS6安裝各類大數據軟件 第八章:Hive安裝和配置服務器
CentOS6安裝各類大數據軟件 第九章:Hue大數據可視化工具安裝和配置網絡
CentOS6安裝各類大數據軟件 第十章:Spark集羣安裝和部署dom
此flume安裝以用戶點擊行爲實時安裝爲例(2臺flume從日誌系統中獲取數據,並彙總到一臺flume上,並由這臺flume對數據進行分發,分別分發到Kafka和HBase等其餘應用上),安裝步驟以下所示
步驟一:上次壓縮包 步驟二:解壓到安裝目錄下 tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /export/servers/
#修改文件名稱 mv flume-env.sh.template flume-env.sh #配置Java環境變量 export JAVA_HOME=/opt/modules/jdk1.8.0_144
#步驟一:修改文件名稱 mv flume-conf.properties.template flume-conf.properties #步驟二:進行具體配置 #給三個線程起一個別名(數據來源,管道,下沉地) a2.sources = r1 a2.channels = c1 a2.sinks = k1 #設置Flume源(類型,數據來源的地址,數據經過什麼通道傳輸) a2.sources.r1.type = exec a2.sources.r1.command = tail -F /export/datas/access/access.log a2.sources.r1.channels = c1 #設置Flume通道類型 a2.channels.c1.type = memory #設置Flume通道大小 a2.channels.c1.capacity = 10000 #每次從源抓取數據的大小 a2.channels.c1.transactionCapacity = 10000 #超時事件的設定 a2.channels.c1.keep-alive = 5 #設置flume的sink(sink的類型,通道來自哪裏,往哪臺服務器發送數據,下沉的端口號) a2.sinks.k1.type = avro a2.sinks.k1.channel = c1 #下沉的服務器的ip地址 a2.sinks.k1.hostname = node01.ouyang.com #下沉的服務器的端口號 a2.sinks.k1.port = 5555
同上agent2配置便可,爲區別不一樣服務器,將a2修改a3便可(不修改不影響實際使用,但爲了區別不一樣服務器,建議修改)
說明:agent2和agent3節點的數據下沉地爲node01服務器的5555端口,因此agent1的數據來源只有一個,爲本機的5555端口,agent1的flume只須要從本機的5555端口獲取數據便可。但此flume的下沉地有2個,分別是HBase和Kafka,因此相應的此flume的通道也要有2個。
agent1具體配置請看下文的Flume整合HBase和Flume整合Kafka。
在實際工做中,Flume收集到的數據通常下層到HBase,Kafka,HDFS等應用,此處演示將Flume下層到HBase和Kafka的開發。以下圖:
從上述能夠看到咱們有兩個Flume服務器用來收集四臺WEB應用服務器的日誌信息,這兩臺服務器將數據彙總到另一臺總的Flume服務器上,也就是說另外兩臺的輸出做爲這臺總的服務器的輸入.這裏的輸入也就是avro.這臺總的服務器能夠將日誌信息直接推送到 Kafka消息系統中,或者通過清洗以後,存入HBase數據庫中.也就說咱們會在存入HBase數據庫以前進行二次開發.由於咱們的Hbase數據庫是非關係型數據庫,它的列是不固定的.因此,咱們須要對hbase sink進行自定義開發。
官網:http://flume.apache.org/download.html
請注意:源碼版本請和安裝的flume版本匹配,以下圖:
步驟一:解壓源碼壓縮包
步驟二:打開IDEA,點擊Open
步驟三:選擇導入flume-ng-sinks這個模塊
步驟四:選擇flume-ng-sinks模塊下的flume-ng-hbase-sink這個子模塊
步驟五:進入源碼
上述咱們首先將源碼導入IDEA中,待會咱們會針對Flume和Hbase的整合進行sink的自定義。
#修改文件名稱 mv flume-env.sh.template flume-env.sh #配置Java環境變量 export JAVA_HOME=/opt/modules/jdk1.8.0_144
#步驟一:修改文件名稱 mv flume-conf.properties.template flume-conf.properties #步驟二:進行具體配置 #定義三個線程的別稱(由於下沉地有2個,因此設置2個管道和2個下沉地的別稱) a1.sources = r1 a1.channels = hbaseChannel kafkaChannel a1.sinks = hbaseSink kfkSink #設置源 #設置源的格式 a1.sources.r1.type = avro #設置源接收的數據須要前往哪些管道 a1.sources.r1.channels = hbaseChannel kafkaChannel #設置數據來源的ip地址 a1.sources.r1.bind = node01.ouyang.com #設置數據來源的端口號 a1.sources.r1.port = 5555 #超時設置 a1.sources.r1.threads = 5 #設置hbaseChannel(配置hbase的管道信息) a1.channels.hbaseChannel.type = memory a1.channels.hbaseChannel.capacity = 100000 a1.channels.hbaseChannel.transactionCapacity = 100000 a1.channels.hbaseChannel.keep-alive = 20 #設置hbaseSink(配置hbase的下沉地信息) a1.sinks.hbaseSink.type = asynchbase a1.sinks.hbaseSink.table = access a1.sinks.hbaseSink.columnFamily = info a1.sinks.hbaseSink.serializer = [待定] #根據數據定義列的名稱 a1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl #設置sink來源的管道 a1.sinks.hbaseSink.channel = hbaseChannel
Flume官方提供的HbaseSink的實現是SimpleAsyncHbaseEventSerializer,這個實現不符合咱們本次項目中的要求,因此,咱們須要自定義HbaseSink.自定義仿照SimpleAsyncHbaseEventSerializer便可。
package org.apache.flume.sink.hbase; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest; import java.util.ArrayList; import java.util.List; @SuppressWarnings("all") public class HeimaAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { //表的名稱 private byte[] table; //列簇名 private byte[] cf; //列的數據 private byte[] payload; //列的Column private byte[] payloadColumn; private byte[] incrementColumn; //rowKey的前綴 private String rowPrefix; private byte[] incrementRow; //告訴咱們的keyType是哪個 private KeyType keyType; @Override public void initialize(byte[] table, byte[] cf) { this.table = table; this.cf = cf; } @Override public List<PutRequest> getActions() { List<PutRequest> actions = new ArrayList<PutRequest>(); if (payloadColumn != null) { byte[] rowKey; try { //獲取每一列 String[] columns = String.valueOf(payloadColumn).split(","); //獲取每一列的值 String[] values = String.valueOf(payload).split(","); for (int i = 0; i < columns.length; i++) { //獲取列的字節數組 byte[] colColumns = columns[i].getBytes(); //獲取每一類的值的字節數組 byte[] colValues = values[i].getBytes(Charsets.UTF_8); //對列和值的長度進行判斷,若是兩者不一致,直接能夠跳過,也就是不會進行數據落地 if(columns.length!=values.length){ continue; } //獲取userid和datatime String datatime = values[0].toString(); String userid = values[1].toString(); rowKey = SimpleRowKeyGenerator.getHeimaRowKey(userid,datatime); PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumns, colValues); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } @Override public List<AtomicIncrementRequest> getIncrements() { List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>(); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub } @Override public void configure(Context context) { String pCol = context.getString("payloadColumn", "pCol"); String iCol = context.getString("incrementColumn", "iCol"); rowPrefix = context.getString("rowPrefix", "default"); String suffix = context.getString("suffix", "uuid"); if (pCol != null && !pCol.isEmpty()) { if (suffix.equals("timestamp")) { keyType = KeyType.TS; } else if (suffix.equals("random")) { keyType = KeyType.RANDOM; } else if (suffix.equals("nano")) { keyType = KeyType.TSNANO; } else { keyType = KeyType.UUID; } payloadColumn = pCol.getBytes(Charsets.UTF_8); } if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void setEvent(Event event) { this.payload = event.getBody(); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub } }
public static byte[] getHeimaRowKey(String userid,String datatime) throws UnsupportedEncodingException { return (userid +"-"+ datatime +"-"+ String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); }
將自定義的類放置到導入的flume-sink工程中,跟示例類SimpleAsyncHbaseEventSerializer同一個目錄下,而後打包該工程便可。
步驟一:首先打開Project Structure
步驟二:點擊Artifacts這一項,增長一個jar
步驟三:選擇須要打包的模塊
步驟四:對須要打包的模塊進行配置
步驟五:確認配置
步驟六:進行打包,選擇build這個菜單項
步驟七:獲得打包的結果
步驟一:刪除flume目錄的lib目錄下的flume-ng-hbase-sink-1.7.0.jar包
rm -rf flume-ng-hbase-sink-1.7.0.jar
步驟二:將打包好的jar包重命名爲flume-ng-hbase-sink-1.7.0.jar,並上傳到lib下
步驟三:修改flume-conf.properties,將自定義sink類的全限定名添加上去
a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.HeimaAsyncHbaseEventSerializer
flume-conf.properties
Flume和Kafka集成只要配置flume-conf.properties配置文件便可,該配置文件的數據來源能夠參考上述flume和HBase的集合,下述爲Flume和Kafka集成的管道和下沉地的配置。
#設置kafkaChannel(配置Kafka的管道信息) a1.channels.kafkaChannel.type = memory a1.channels.kafkaChannel.capacity = 100000 a1.channels.kafkaChannel.transactionCapacity = 100000 a1.channels.kafkaChannel.keep-alive = 20 #設置kfkSink(下沉地設置) #設置下沉的數據來源(來自kafkaChannel管道) a1.sinks.kfkSink.channel = kafkaChannel #設置下沉的類型 a1.sinks.kfkSink.type = org.apache.flume.sink.kafka.KafkaSink #設置下沉到kafka中的主題名 a1.sinks.kfkSink.topic = access #設置kafka服務的ip和端口號 a1.sinks.kfkSink.brokerList = node01.ouyang.com:9092,node02.ouyang.com:9092,node03.ouyang.com:9092 #設置zookeeper的ip和端口號(由於要使用kafka需基於zookeeper) a1.sinks.kfkSink.zookeeperConnect = node01.ouyang.com:2181,node02.ouyang.com:2181,node03.ouyang.com:2181 #設置kafka的生產者消息防丟失機制(默認爲1) a1.sinks.kfkSink.requiredAcks = 1 #設置一次傳輸數據的大小 a1.sinks.kfkSink.batchSize = 1 #設置序列化類,讓數據能夠進行網絡傳輸 a1.sinks.kfkSink.serializer.class = kafka.serializer.StringEncoder
Flume啓動前請先啓動Flume所依賴的應用服務,如上述配置,需先啓動HBase和Kafka,而HBase和Kafka又依賴Hadoop和Zookeeper,因此請先將這些依賴服務啓動;Flume啓動時請先啓動分節點,再啓動聚會節點,若是上述配置,請先啓動agent2和agent3,再啓動agent1。
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf 指定 flume 自身的配置文件所在目錄
-f conf/netcat-logger.con 指定咱們所描述的採集方案
-n a1 指定咱們這個 agent 的名字
以agent2和agent3收集數據,agent1彙總數據,並將數據分別分發到HBase和Kafka中爲例:
#步驟一:給每臺服務器的flume配置好環境變量 export FLUME_HOME=/export/servers/flume export PATH=${FLUME_HOME}/bin:$PATH #步驟二:在agent2和agent3服務器的flume的bin目錄下編寫啓動腳本: #/bin/bash echo "................flume-2 starting......................" /export/servers/flume/bin/flume-ng agent --conf /export/servers/flume/conf/ -f /export/servers/flume/conf/flume-conf.properties -n a2 -Dflume.root.logger=INFO,console agent2和agent3啓動腳本基本一致,就名字不一樣,該名字爲在配置文件中配置的別名 編寫完啓動腳本後能夠進行啓動測試 #步驟三:在agent1服務器的flume的bin目錄下編寫啓動腳本: #/bin/bash echo "................flume-1 starting......................" /export/servers/flume/bin/flume-ng agent --conf /export/servers/flume/conf/ -f /export/servers/flume/conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console 由於Flume節點1是聚合節點,因此,須要依賴其餘不少服務,因此在這裏咱們先不作任何測試驗證,待會再進行統一的測試驗證。 #步驟四:在onekye目錄下編寫一鍵啓動腳本: cat /export/onekey/slave | while read line do { echo "Flume開始啓動 --> "$line ssh $line "source /etc/profile;nohup sh ${FLUME_HOME}/bin/flume-access-start.sh >/dev/null 2>&1 &" }& wait done echo "★★★Flume啓動完成★★★" #保存退出