提要:本文中項目代碼運行在windows的eclipse開發環境下。html
注意事項:項目中須要引用Flume提供的jar包,最新版的Flume-1.6.0的jar包須要JDK1.7的環境,JDK1.8會報 ClassNotFoundException:org.apache.flume.clients.log4jappender.Log4jAppender 錯誤。java
一、將flume安裝路徑下的tools文件下的jar包:flume-ng-log4jappender-1.6.0-jar-with-dependencies.jarnode
引入項目。(這個包是包含了全部依賴包的包。有的示例裏說能夠引用flume-ng-core-1.6.0.jar之類的等等,那種運行會報錯,會報不少類找不到,還須要引用好多jar包,應用不便)web
如圖:apache
OS:做爲一個小新手,以前查了好多資料,不一樣版本包名不同,好多隻說引入包,就不寫路徑或者路徑真的不太同樣,真是神煩。windows
二、 配置log4j,如下爲配置文件: app
log4j.rootLogger=INFO,console,file,flume log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-5p %d [<%t>%F,%L] - %m%n log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.datePattern=yyyy-MM-dd-HH'.log' log4j.appender.file.append=true log4j.appender.file.layout.ConversionPattern=%-5p %d [%F,%L] - %m%n log4j.appender.file.File=D:/logs/logs.log log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname=192.xxx.x.xxx(此處爲flume配置文件中配置的IP) log4j.appender.flume.Port=44444 log4j.appender.flume.UnsafeMode=true
三、寫一個測試項目用於測試flume:eclipse
1)新建一個Dynamic Web Projectjsp
2)引入 flume-ng-log4jappender-1.6.0-jar-with-dependencies.jar 和slf4j-log4j12-1.7.21.jaride
3)配置log4j.properties,具體配置如2。
4)寫一個servlet
package com.flume.test; import java.util.Date; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlumeTest implements ServletContextListener{ protected static final Logger logger = LoggerFactory.getLogger(FlumeTest.class); @Override public void contextDestroyed(ServletContextEvent arg0) { } @SuppressWarnings("static-access") @Override public void contextInitialized(ServletContextEvent arg0) { while (true) { //每隔兩秒log輸出一下當前系統時間戳 logger.info(""+new Date().getTime()); Thread t = new Thread(); try { t.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
5)配置一下web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0"> <display-name>FlumeTest</display-name> <welcome-file-list> <welcome-file>index.html</welcome-file> <welcome-file>index.htm</welcome-file> <welcome-file>index.jsp</welcome-file> </welcome-file-list> <listener> <listener-class>com.flume.test.FlumeTest</listener-class> </listener> </web-app>
6)配置寫入hdfs的flume代理文件
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 192.xxx.x.xxx #上面IP根據我的配置 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=hdfs://localhost:9000/user/hadoop/flume #hdfs://localhost:9000/user/hadoop/flume 根據我的的hdfs配置更改 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.rollSize=10240 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=60 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
此時啓動hdfs,flume代理、web項目便可實現flume寫入hdfs。