flume之Http Source

1、介紹

  1. flume自帶的Http Source能夠經過Http Post接收事件。java

  2. 場景:對於有些應用程序環境,它可能不能部署Flume SDK及其依賴項,或客戶端代碼傾向於經過HTTP而不是Flume的PRC發送數據的狀況,此時HTTP SOURCE能夠用來將數據接收到Flume中。node

  3. 從客戶端的角度看,HTTP SOURCE表現的像web服務器同樣能接收flume事件
    web

2、參數

配置參數 默認值 描述
type
http (org.apache.fluem.source.httpSource)
bind
綁定的IP地址或主機名
port
綁定的端口號
enableSSL false
keystore
使用的keystore文件的路徑
keystorePassword
可以進入keystore的密碼
handler JSONHandler HTTP SOURCE使用的處理程序類
handler.*
傳給處理程序類的任何參數 能夠 經過使用此參數(*)配置傳入
  1. 爲了安全傳輸,http source也支持SSL,SSL支持的相關例子能夠參見個人關於flume之Avro Source博客shell

  2. Flume 事件使用一個可插拔的「handler」程序來實現轉換,它必須實現的HTTPSourceHandler接口。此處理程序須要一個HttpServletRequest和返回一個flume 事件列表。默認是:JSONHandler。apache

    例如:xxx.handler=org.pq.demo.HTTPSourceXmlHandler數組

  3. 自定義的handler若是想傳入參數,可使用handler.*配置瀏覽器

    如:xxx.handler.myparam=zhangsan安全

  4. 若是配置中沒有指定處理程序,HTTP SOURCE將使用與Flume綁定的處理程序,即:JSONHandler,它能處理JSON格式的事件。每一個事件能夠包含包裝爲數組的幾個事件,儘管Source寫入的管道可能有限制的事務能力。服務器

    處理程序接受UTF-8,UTF-16,UTF-32編碼的JSON格式的數據,而且將它轉換成一個列表的事件。dom

    格式:

    [ { "headers":{"":"","":""
                     },
         "body":"the first event"
       },
       { "headers":{"":"","":""
                     },
         "body":"the second event"
       }
       
    ]

3、應用

1.http source簡單例子

1)在conf文件下添加http_test.conf文件

 a1.sources=r1
 a1.sinks=k1
 a1.channels=c1
 
 a1.sources.r1.type=http
 a1.sources.r1.bind=192.168.1.102
 a1.sources.r1.port=50000
 a1.soces.r1.channels=c1
  
 a1.sinks.k1.type=logger
 a1.sinks.k1.channel=c1
 
 a1.channels.c1.type=memory
 a1.channels.c1.capacity=1000
 a1.channels.c1.transactionCapacity=100

2)啓動服務:

    $ bin/flume-ng agent -c conf -f conf/http_test.conf  -n a1 -Dflume.root.logger=INFO,console

3) 測試:

   $ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'  http://192.168.1.102:50000

4) 服務器端結果

......
2015-11-30 11:34:52,451 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: }
2015-11-30 11:34:52,452 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: }
2015-11-30 11:45:14,951 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{h1=v1, h2=v2} body: 68 65 6C 6C 6F 20 62 6F 64 79                   hello body }


2.http source handler自定義例子

假定xml請求格式,指望格式以下:

<events>
 <event>
     <headers><header1>value1</header1></headers>
     <body>test</body>
 </event>
 <event>
    <headers><header1>value1</header1></headers>
    <body>test2</body>
  </event>
 </events>

如今要求flume http source能夠處理這種請求的xml格式

操做步驟以下:

1)創建maven工程,pom.xml文件以下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.pq</groupId>
  <artifactId>flume-demo</artifactId>
  <packaging>jar</packaging>
  <version>1.0</version>
  <name>flume-demo Maven jar</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.8.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.6.0</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>
  <build>
    <finalName>flume-demo</finalName>
  </build>
</project>

2)開發代碼 ,自定義handler類

package org.pq.flumeDemo.sources;
import com.google.common.base.Preconditions;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.http.HTTPBadRequestException;
import org.apache.flume.source.http.HTTPSourceHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import javax.servlet.http.HttpServletRequest;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class HTTPSourceXMLHandler implements HTTPSourceHandler {
    private final String ROOT = "events";
    private final String EVENT_TAG = "event";
    private final String HEADERS_TAG = "headers";
    private final String BODY_TAG = "body";

    private final String CONF_INSERT_TIMESTAMP = "insertTimestamp";
    private final String TIMESTAMP_HEADER = "timestamp";
    private final DocumentBuilderFactory documentBuilderFactory
            = DocumentBuilderFactory.newInstance();

    // Document builders are not thread-safe.
    // So make sure we have one for each thread.
    private final ThreadLocal<DocumentBuilder> docBuilder
            = new ThreadLocal<DocumentBuilder>();

    private boolean insertTimestamp;
    private static final Logger LOG = LoggerFactory.getLogger(HTTPSourceXMLHandler.class);


    public List<Event> getEvents(HttpServletRequest httpServletRequest) throws HTTPBadRequestException, Exception {
        if (docBuilder.get() == null) {
            docBuilder.set(documentBuilderFactory.newDocumentBuilder());
        }
        Document doc;
        final List<Event> events;
        try {
            doc = docBuilder.get().parse(httpServletRequest.getInputStream());            
            Element root = doc.getDocumentElement();        

            root.normalize();
            // Verify that the root element is "events"
            Preconditions.checkState(
                    ROOT.equalsIgnoreCase(root.getTagName()));

            NodeList nodes = root.getElementsByTagName(EVENT_TAG);
            LOG.info("get nodes={}",nodes);

            int eventCount = nodes.getLength();
            events = new ArrayList<Event>(eventCount);
            for (int i = 0; i < eventCount; i++) {
                Element event = (Element) nodes.item(i);
                // Get all headers. If there are multiple header sections,
                // combine them.
                NodeList headerNodes
                        = event.getElementsByTagName(HEADERS_TAG);
                Map<String, String> eventHeaders
                        = new HashMap<String, String>();
                for (int j = 0; j < headerNodes.getLength(); j++) {
                    Node headerNode = headerNodes.item(j);
                    NodeList headers = headerNode.getChildNodes();
                    for (int k = 0; k < headers.getLength(); k++) {
                        Node header = headers.item(k);

                        // Read only element nodes
                        if (header.getNodeType() != Node.ELEMENT_NODE) {
                            continue;
                        }
                        // Make sure a header is inserted only once,
                        // else the event is malformed
                        Preconditions.checkState(
                                !eventHeaders.containsKey(header.getNodeName()),
                                "Header expected only once " + header.getNodeName());
                        eventHeaders.put(
                                header.getNodeName(), header.getTextContent());
                    }
                }
                Node body = event.getElementsByTagName(BODY_TAG).item(0);
                if (insertTimestamp) {
                    eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System
                            .currentTimeMillis()));
                }
                events.add(EventBuilder.withBody(
                        body.getTextContent().getBytes(
                                httpServletRequest.getCharacterEncoding()),
                        eventHeaders));
            }
        } catch (SAXException ex) {
            throw new HTTPBadRequestException(
                    "Request could not be parsed into valid XML", ex);
        } catch (Exception ex) {
            throw new HTTPBadRequestException(
                    "Request is not in expected format. " +
                            "Please refer documentation for expected format.", ex);
        }
        return events;
    }

    public void configure(Context context) {
        insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP,
                false);
    }
}

3)在該工程的flume-demo目錄下執行命令mvn package,會將該工程打成jar包,會生產target目錄,從中找到flume-demo.jar

   ps:操做系統須要安裝apache maven及配置了對應的環境變量。

4)在$FLUME_HOME目錄下新建文件目錄 

$ mkdir plugins.d
$ mkdir penqiang
$ cd plugins.d/pengqiang
$ mkdir lib
$ mkdir libext
$ mkdir native

5)將flume-demo.jar拷貝到plugins.d/pengqiang/lib下

6)配置flume文件,http_test.conf

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=http
a1.sources.r1.bind=localhost
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.pq.flumeDemo.sources.HTTPSourceXMLHandler
a1.sources.r1.insertTimestamp=true

a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

     

7)啓動服務

   $ bin/flume-ng agent -c conf -f conf/http_test.conf  -n a1 -Dflume.root.logger=INFO,console

8)測試

    工具:firefox瀏覽器+HttpRequester插件

     

9)服務器端結果:

.....
2015-12-02 14:35:53,809 (1214826834@qtp-1250857451-0) [INFO - org.pq.flumeDemo.sources.HTTPSourceXMLHan
dler.getEvents(HTTPSourceXMLHandler.java:64)] get nodes=org.apache.xerces.dom.DeepNodeListImpl@1beaed6
2015-12-02 14:35:54,490 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.
LoggerSink.process(LoggerSink.java:94)] Event: { headers:{header1=value1} body: 74 65 73 74                  test }
2015-12-02 14:35:54,491 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink
.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{header1=value1} body: 74 65 73 74 32                                  test2 }
相關文章
相關標籤/搜索