自定義Flume Sink:ElasticSearch Sink

Flume Sink的目的是從Flume Channel中獲取數據而後輸出到存儲或者其餘Flume Source中。Flume Agent啓動的時候,它會爲每個Sink都啓動一個SinkRunner的對象,SinkRunner.start()方法會啓動一個新的線程去管理每個Sink的生命週期。每個Sink須要實現start()、Stop()和process()方法。你能夠在start方法中去初始化Sink的參數和狀態,在stop方法中清理Sink的資源。最關鍵的是process方法,它將處理從Channel中拿出來的數據。另外若是Sink有一些配置則須要實現Configurable接口。apache

因爲Flume官方提供的Sink每每不能知足要求,因此咱們自定義Sink來實現定製化的需求,這裏以ElasticSearch爲例。在Sink中實現因此文檔的簡單的Insert功能。例子使用Flume 1.7。json

 

1. 編寫代碼bootstrap

首先新建類ElasticSearchSink類繼承AbstractSink類,因爲還但願有自定義的Sink的配置,因此實現Configurable接口。架構

public class ElasticSearchSink extends AbstractSink implements Configurable

ElasticSearch的IP以及索引的名稱能夠配置在配置文件裏面,配置文件就是使用flume的conf文件。你能夠重寫Configurable的configure的方法去獲取配置,代碼以下:elasticsearch

@Override
    public void configure(Context context)
    {
        esHost = context.getString("es_host");
        esIndex = context.getString("es_index");
    }

注意裏面的配置項「es_host」和「es_index」在conf配置文件中的語法:ide

agent.sinks = sink1
agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
agent.sinks.sink1.es_host = 192.168.50.213
agent.sinks.sink1.es_index = vehicle_event_test

 

接下來就是實現process方法,在這個方法中須要獲取channel,由於數據都是從channel中得到的。獲取消息以前,須要先獲取一個Channel是事務,處理完成以後須要commit和關閉這個事務。這樣才能讓channel知道這個消息已經消費完成,它能夠從它的內部隊列中刪除這個消息。若是消費失敗,須要從新消費的話,能夠rollback這個事務。事務的引入是flume對消息可靠性保證的關鍵。ui

process方法須要返回一個Status類型的枚舉,Ready和BackOff。若是你到了一個消息,並正常處理了,須要使用Ready。若是拿到的消息是null,則能夠返回BackOff。所謂BackOff(失效補償)就是當sink獲取不到 消息的時候, Sink的PollingRunner 線程須要等待一段backoff時間,等channel中的數據獲得了補償再來進行pollling 操做。spa

 

完整的代碼以下:線程

public class ElasticSearchSink extends AbstractSink implements Configurable
{
    private String esHost;
    private String esIndex;

    private TransportClient client;

    @Override
    public Status process() throws EventDeliveryException
    {

        Status status = null;
        // Start transaction
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        try
        {
            Event event = ch.take();

            if (event != null)
            {
                String body = new String(event.getBody(), "UTF-8");

                BulkRequestBuilder bulkRequest = client.prepareBulk();
                List<JSONObject> jsons = new ArrayList<JSONObject>();

                JSONObject obj = JSONObject.parseObject(body);

                String vehicleId = obj.getString("vehicle_id");
                String eventBeginCode = obj.getString("event_begin_code");
                String eventBeginTime = obj.getString("event_begin_time");

                //doc id in index
                String id = (vehicleId + "_" + eventBeginTime + "_" + eventBeginCode).trim();


                JSONObject json = new JSONObject();
                json.put("vehicle_id", vehicleId);

                bulkRequest.add(client.prepareIndex(esIndex, esIndex).setSource(json));

                BulkResponse bulkResponse = bulkRequest.get();

                status = Status.READY;
            }
            else
            {
                status = Status.BACKOFF;
            }

            txn.commit();
        }
        catch (Throwable t)
        {
            txn.rollback();
            t.getCause().printStackTrace();

            status = Status.BACKOFF;
        }
        finally
        {
            txn.close();
        }

        return status;

    }

    @Override
    public void configure(Context context)
    {
        esHost = context.getString("es_host");
        esIndex = context.getString("es_index");
    }

    @Override
    public synchronized void stop()
    {
        super.stop();
    }

    @Override
    public synchronized void start()
    {
        try
        {
            Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
            client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), 9300));
            super.start();

            System.out.println("finish start");
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
}

 

2. 打包、配置和運行code

因爲是自定義的Sink,因此須要打成jar包,而後copy到flume的lib文件夾下。而後配置agent的配置文件,最後啓動flume就能夠了。本例中,我使用了kafkasource、memorychannel和自定義的sink,完整的配置文件以下:

 

 

agent.sources = source1
agent.channels = channel1
agent.sinks = sink1

agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.source1.channels = channel1
agent.sources.source1.batchSize = 1
agent.sources.source1.batchDurationMillis = 2000
agent.sources.source1.kafka.bootstrap.servers = 192.168.50.116:9092,192.168.50.117:9092,192.168.50.118:9092,192.168.50.226:9092
agent.sources.source1.kafka.topics = iov-vehicle-event
agent.sources.source1.kafka.consumer.group.id = flume-vehicle-event-nick


agent.sinks.sink1.type = nick.test.flume.ElasticSearchSink
agent.sinks.sink1.es_host = 192.168.50.213
agent.sinks.sink1.es_index = vehicle_event_test

agent.sinks.sink1.channel = channel1

agent.channels.channel1.type = memory
agent.channels.channel1.capacity = 1000

 

 

 

 

 

架構點滴

相關文章
相關標籤/搜索