flume採集數據實時存儲hive兩種解決方案

說明:本文不只提供兩種方案,還詳細的記錄了一些相關信息。html

方案一

        本方案的核心是flume採集數據後,按照hive表的結構,將採集數據輸送到對應的地址中,達到數據實時存儲的目的,這種實時其實是一種準實時。java

        假設hadoop集羣已經正常啓動,hive也已經正常啓動,而且hive的文件地址是/hive/warehouse,而後hive裏存在一張由如下建表語句建立的表apache

create table flume_test(uuid string);

可推斷,表flume_test地址在/hive/warehouse/flume_test,下面介紹flume:bash

        flume安裝步驟app

#下載
cd /opt
mkdir flume
wget http://archive.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
tar xvzf apache-flume-1.6.0-bin.tar.gz
cd apache-flume-1.6.0-bin/conf
cp flume-env.sh.template flume-env.sh

        打開flume-env文件,添加java變量dom

export JAVA_HOME=/usr/java/jdk1.8.0_111

        而後添加環境變量,爲了一次過,分別在profile和bashrc末尾添加eclipse

export FLUME_HOME=/opt/flume/apache-flume-1.6.0-bin 
export FLUME_CONF_DIR=$FLUME_HOME/conf  
export PATH=$PATH:$FLUME_HOME/bin

        而後maven

source /etc/profile

        到此flume安裝完畢,下面進行配置,切換到conf文件夾複製flume-conf.properties.template爲agent.conf,而後編輯ide

#定義活躍列表
agent.sources=avroSrc
agent.channels=memChannel
agent.sinks=hdfsSink

#定義source
agent.sources.avroSrc.type=avro
agent.sources.avroSrc.channels=memChannel
agent.sources.avroSrc.bind=0.0.0.0
agent.sources.avroSrc.port=4353
agent.sources.avroSrc.interceptors=timestampinterceptor
agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp
agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

#定義channel
agent.channels.memChannel.type=memory
agent.channels.memChannel.capacity = 1000
agent.channels.memChannel.transactionCapacity = 100

#定義sink
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel=memChannel
#agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/flume/test/%{topic}/%Y%m%d%H
agent.sinks.hdfsSink.hdfs.path=hdfs://hadoop-n:9000/hive/warehouse/flume_test
agent.sinks.hdfsSink.hdfs.filePrefix=stu-flume
agent.sinks.hdfsSink.hdfs.inUsePrefix=inuse-stu-flume
agent.sinks.hdfsSink.hdfs.inUseSuffix=.temp
agent.sinks.hdfsSink.hdfs.rollInterval=0
agent.sinks.hdfsSink.hdfs.rollSize=10240000
agent.sinks.hdfsSink.hdfs.rollCount=0
agent.sinks.hdfsSink.hdfs.idleTimeout=0
agent.sinks.hdfsSink.hdfs.batchSize=100
agent.sinks.hdfsSink.hdfs.minBlockReplicas=1
# agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream

        具體的每一項配置可參照下面這篇博客http://lxw1234.com/archives/2015/10/527.htm,須要警戒的是rollInterval、rollSize、rollCount、idleTimeout這四個屬性,若是進行了配置發現不起做用,就要檢查一下minBlockReplicas這個屬性是否配置,而且值是不是1,下面這個鏈接是緣由http://doc.okbase.net/chiweitree/archive/126197.htmloop

        配置完畢後能夠啓動,啓動命令

./flume-ng agent -f ../conf/agent.conf -n agent -c conf -Dflume.monitoring.type=http \-Dflume.monitoring.port=5653 -Dflume.root.logger=DEBUG,console

        注意:-n 指的是agent的名稱,須要對應到配置文件的第一個值,本啓動命令還開啓了監控,監控地址http://host:5653/metrics;-f 指的是配置文件的路徑及名稱。flume的conf修改後不用重啓,默認30秒刷新一次,自動裝載最新的配置。

        flume安裝並啓動完畢後,編寫測試程序。打開eclipse,建立maven項目

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>scc</groupId>
	<artifactId>stu-flume</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>
	<name>stu-flume</name>
	<dependencies>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.9</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flume.flume-ng-clients</groupId>
			<artifactId>flume-ng-log4jappender</artifactId>
			<version>1.6.0</version>
		</dependency>
	</dependencies>
</project>

測試servlet

public class GenerLogServlet extends HttpServlet {
    private static final Logger LOGGER = Logger.getLogger(GenerLogServlet.class);
    private static final long serialVersionUID = 1L;
    
    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        for (;;) {
            LOGGER.info(UUID.randomUUID().toString());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    @Override
    protected void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        this.doGet(request, response);
    }

}

log4j.properties

#log4j settings
#log4j.rootLogger=debug, CONSOLE
log4j.logger.scc.stu_flume.GenerLogServlet=debug,GenerLogServlet
#log4j.rootLogger=INFO

log4j.appender.GenerLogServlet=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.GenerLogServlet.Hostname=10.5.3.100
log4j.appender.GenerLogServlet.Port=4353
log4j.appender.GenerLogServletUnsafeMode=false

        啓動項目,訪問http://localhost:8080/log開始生產數據。須要注意的是,若是flume配置基於時間戳作文件分組(此種狀況能夠匹配hive根據時間進行分區),那麼須要agent.conf中的source必定要配置

agent.sources.avroSrc.interceptors=timestampinterceptor
agent.sources.avroSrc.interceptors.timestampinterceptor.type=timestamp
agent.sources.avroSrc.interceptors.timestampinterceptor.preserveExisting=false

不然flume的sink會報找不到timestamp錯誤,由於源碼org.apache.flume.clients.log4jappender.Log4jAvroHeaders中定義timestamp的key是flume.client.log4j.timestamp而不是timestamp,因此須要手動添加一個timestamp,若是對這個timestamp要求必須是數據生產的時間,能夠修改源碼或者爲source添加攔截器手動配置。

        flume具備很是靈活的使用方式,能夠自定義source、sink、攔截器、channel選擇器等等,適應絕大部分採集、數據緩衝等場景。

        觀察hadoop目錄,發現flume已經按配置將數據移動到相應的hive表目錄中,以下圖:

        打開hive客戶端,數據查詢命令,發現數據已可被查詢!而且針對hive的分區表和桶表flume均可以實現按照hive表數據規則寫入,進而達到數據實時插入,至此,方案一結束。

        本方案缺點:

            因爲flume在寫入文件的時候,獨佔正在寫入的文件資源,致使hive不能讀取正在被寫入的文件的內容,也就是說假如每5分鐘生成一個文件,那麼正在寫的文件不會被hive讀取到內容,也就意味了hive存在最大5分鐘的延遲。而若是把時間變小,那麼延遲就會下降,可是哪怕是設置30分鐘或1個小時,flume流量不大的狀況下,也會生成許多零散的小文件,這點與hive的特長相悖,hive擅長處理大文件,對於零散小文件hive性能會下降不少。

方案二

       對比方案一,測試程序、source不變,sink改爲hbase-sink,數據實時插入到hbase中,而後在hive創建一張hbase映射表,hive從hbase中讀取數據,這樣可達到實時插入的效果。因爲字數限制,方案二記錄在以下博客鏈接中:

        https://my.oschina.net/shyloveliyi/blog/790227

相關文章
相關標籤/搜索