flume自帶的Http Source能夠經過Http Post接收事件。java
場景:對於有些應用程序環境,它可能不能部署Flume SDK及其依賴項,或客戶端代碼傾向於經過HTTP而不是Flume的PRC發送數據的狀況,此時HTTP SOURCE能夠用來將數據接收到Flume中。node
從客戶端的角度看,HTTP SOURCE表現的像web服務器同樣能接收flume事件
web
配置參數 | 默認值 | 描述 |
type | http (org.apache.fluem.source.httpSource) | |
bind | 綁定的IP地址或主機名 | |
port | 綁定的端口號 | |
enableSSL | false | |
keystore | 使用的keystore文件的路徑 | |
keystorePassword | 可以進入keystore的密碼 | |
handler | JSONHandler | HTTP SOURCE使用的處理程序類 |
handler.* | 傳給處理程序類的任何參數 能夠 經過使用此參數(*)配置傳入 |
爲了安全傳輸,http source也支持SSL,SSL支持的相關例子能夠參見個人關於flume之Avro Source博客shell
Flume 事件使用一個可插拔的「handler」程序來實現轉換,它必須實現的HTTPSourceHandler接口。此處理程序須要一個HttpServletRequest和返回一個flume 事件列表。默認是:JSONHandler。apache
例如:xxx.handler=org.pq.demo.HTTPSourceXmlHandler數組
自定義的handler若是想傳入參數,可使用handler.*配置瀏覽器
如:xxx.handler.myparam=zhangsan安全
若是配置中沒有指定處理程序,HTTP SOURCE將使用與Flume綁定的處理程序,即:JSONHandler,它能處理JSON格式的事件。每一個事件能夠包含包裝爲數組的幾個事件,儘管Source寫入的管道可能有限制的事務能力。服務器
處理程序接受UTF-8,UTF-16,UTF-32編碼的JSON格式的數據,而且將它轉換成一個列表的事件。dom
格式:
[ { "headers":{"":"","":"" }, "body":"the first event" }, { "headers":{"":"","":"" }, "body":"the second event" } ] |
1)在conf文件下添加http_test.conf文件
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=http a1.sources.r1.bind=192.168.1.102 a1.sources.r1.port=50000 a1.soces.r1.channels=c1 a1.sinks.k1.type=logger a1.sinks.k1.channel=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100
2)啓動服務:
$ bin/flume-ng agent -c conf -f conf/http_test.conf -n a1 -Dflume.root.logger=INFO,console
3) 測試:
$ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]' http://192.168.1.102:50000
4) 服務器端結果
...... 2015-11-30 11:34:52,451 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: } 2015-11-30 11:34:52,452 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: } 2015-11-30 11:45:14,951 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{h1=v1, h2=v2} body: 68 65 6C 6C 6F 20 62 6F 64 79 hello body } |
假定xml請求格式,指望格式以下:
<events> <event> <headers><header1>value1</header1></headers> <body>test</body> </event> <event> <headers><header1>value1</header1></headers> <body>test2</body> </event> </events> |
如今要求flume http source能夠處理這種請求的xml格式
操做步驟以下:
1)創建maven工程,pom.xml文件以下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.pq</groupId> <artifactId>flume-demo</artifactId> <packaging>jar</packaging> <version>1.0</version> <name>flume-demo Maven jar</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.8.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.6.0</version> <scope>compile</scope> </dependency> </dependencies> <build> <finalName>flume-demo</finalName> </build> </project>
2)開發代碼 ,自定義handler類
package org.pq.flumeDemo.sources; import com.google.common.base.Preconditions; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.http.HTTPBadRequestException; import org.apache.flume.source.http.HTTPSourceHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import org.xml.sax.SAXException; import javax.servlet.http.HttpServletRequest; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class HTTPSourceXMLHandler implements HTTPSourceHandler { private final String ROOT = "events"; private final String EVENT_TAG = "event"; private final String HEADERS_TAG = "headers"; private final String BODY_TAG = "body"; private final String CONF_INSERT_TIMESTAMP = "insertTimestamp"; private final String TIMESTAMP_HEADER = "timestamp"; private final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); // Document builders are not thread-safe. // So make sure we have one for each thread. private final ThreadLocal<DocumentBuilder> docBuilder = new ThreadLocal<DocumentBuilder>(); private boolean insertTimestamp; private static final Logger LOG = LoggerFactory.getLogger(HTTPSourceXMLHandler.class); public List<Event> getEvents(HttpServletRequest httpServletRequest) throws HTTPBadRequestException, Exception { if (docBuilder.get() == null) { docBuilder.set(documentBuilderFactory.newDocumentBuilder()); } Document doc; final List<Event> events; try { doc = docBuilder.get().parse(httpServletRequest.getInputStream()); Element root = doc.getDocumentElement(); root.normalize(); // Verify that the root element is "events" Preconditions.checkState( ROOT.equalsIgnoreCase(root.getTagName())); NodeList nodes = root.getElementsByTagName(EVENT_TAG); LOG.info("get nodes={}",nodes); int eventCount = nodes.getLength(); events = new ArrayList<Event>(eventCount); for (int i = 0; i < eventCount; i++) { Element event = (Element) nodes.item(i); // Get all headers. If there are multiple header sections, // combine them. NodeList headerNodes = event.getElementsByTagName(HEADERS_TAG); Map<String, String> eventHeaders = new HashMap<String, String>(); for (int j = 0; j < headerNodes.getLength(); j++) { Node headerNode = headerNodes.item(j); NodeList headers = headerNode.getChildNodes(); for (int k = 0; k < headers.getLength(); k++) { Node header = headers.item(k); // Read only element nodes if (header.getNodeType() != Node.ELEMENT_NODE) { continue; } // Make sure a header is inserted only once, // else the event is malformed Preconditions.checkState( !eventHeaders.containsKey(header.getNodeName()), "Header expected only once " + header.getNodeName()); eventHeaders.put( header.getNodeName(), header.getTextContent()); } } Node body = event.getElementsByTagName(BODY_TAG).item(0); if (insertTimestamp) { eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System .currentTimeMillis())); } events.add(EventBuilder.withBody( body.getTextContent().getBytes( httpServletRequest.getCharacterEncoding()), eventHeaders)); } } catch (SAXException ex) { throw new HTTPBadRequestException( "Request could not be parsed into valid XML", ex); } catch (Exception ex) { throw new HTTPBadRequestException( "Request is not in expected format. " + "Please refer documentation for expected format.", ex); } return events; } public void configure(Context context) { insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP, false); } }
3)在該工程的flume-demo目錄下執行命令mvn package,會將該工程打成jar包,會生產target目錄,從中找到flume-demo.jar
ps:操做系統須要安裝apache maven及配置了對應的環境變量。
4)在$FLUME_HOME目錄下新建文件目錄
$ mkdir plugins.d $ mkdir penqiang $ cd plugins.d/pengqiang $ mkdir lib $ mkdir libext $ mkdir native
5)將flume-demo.jar拷貝到plugins.d/pengqiang/lib下
6)配置flume文件,http_test.conf
a1.sources=r1 a1.sinks=k1 a1.channels=c1 a1.sources.r1.type=http a1.sources.r1.bind=localhost a1.sources.r1.port=50000 a1.sources.r1.channels=c1 a1.sources.r1.handler=org.pq.flumeDemo.sources.HTTPSourceXMLHandler a1.sources.r1.insertTimestamp=true a1.sinks.k1.type=logger a1.sinks.k1.channel=c1 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100
7)啓動服務
$ bin/flume-ng agent -c conf -f conf/http_test.conf -n a1 -Dflume.root.logger=INFO,console
8)測試
工具:firefox瀏覽器+HttpRequester插件
9)服務器端結果:
..... 2015-12-02 14:35:53,809 (1214826834@qtp-1250857451-0) [INFO - org.pq.flumeDemo.sources.HTTPSourceXMLHan dler.getEvents(HTTPSourceXMLHandler.java:64)] get nodes=org.apache.xerces.dom.DeepNodeListImpl@1beaed6 2015-12-02 14:35:54,490 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink. LoggerSink.process(LoggerSink.java:94)] Event: { headers:{header1=value1} body: 74 65 73 74 test } 2015-12-02 14:35:54,491 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink .LoggerSink.process(LoggerSink.java:94)] Event: { headers:{header1=value1} body: 74 65 73 74 32 test2 }