Play! Akka Flume實現的完整數據收集

  1. 前言前端

    現現在,大數據如火如荼。針對用戶行爲,用戶喜愛等後續大數據分析也是十分火熱。這個小項目實現了後臺數據收集的一系列完整流程。java

  2. 項目整體流程以及用到的技術git

    Play ! 做爲web服務器,使用RESTful 規範編寫接口(客戶端事先埋點,而後調用接口上傳數據)github

       Play !接口接受到的記錄(json形式)通過處理後,先保存到 concurrentQueue中 web

       Play! 啓動後,start一個Akka schedulable actor.他每隔一段時間,讓子actor去poll queue中的數據 算法

       調用flume的封裝的rpc,將數據發送到指定的端口。shell

       Flume source端接收數據,按照配置重定向數據,sink到console.
json


  3. 後臺實現服務器

      3.1 編寫接口併發

       採用RESTful編寫接口,首先在play! 的conf中routes定義接口:

#run log
POST  /events/runlogs       controllers.RunLogs.create()

     而後編寫controller

    public static Result create(){

        JsonNode js = request().body().asJson();
        RunLog.create(js);
        //return ok anyway
        return ok();
    }

   而後是model

    public static void create(JsonNode js) {

        if (js.has(LOG)) {
            String logBody = js.findPath(LOG).asText();
            //add one log into queue
            QueueManager.INSTANCE.addRunLog(logBody);
        }

    }

能夠看到,這些代碼遵循MVC規範,首先讓play!知道接口的定義,前端發送過來請求的時候,知道調用哪一個controller中的哪一個方法,並返回數據。而controller將數據從請求體中剝離出來,併發送給真正處理數據的model.

  3.2 Queue

 看到model中,接收到數據後,添加到queue中保存。

 定義爲:ConcurrentLinkedQueue<String> 

 3.3 Akka 定時調度

 Akka負責定時從queue取出數據,而後經過rpc發送給flume。

 akka的初始化,啓動是伴隨着play! 的啓動而進行的,每一個play!只有一個akka system。因此首先要編寫一個Global extends GlobalSetting(GlobalSettings is instantiated by the framework when an application starts,to let you perform specific tasks at start-up or shut-down),而後override onStart方法,在此方法中初始化akka的調度器。

代碼以下:

        ActorRef dispatcher = Akka.system().actorOf(new Props(Dispatcher.class));

        Akka.system().scheduler().schedule(
                Duration.create(200,TimeUnit.MICROSECONDS),
                Duration.create(2,TimeUnit.SECONDS),
                dispatcher,
                "run",
                Akka.system().dispatcher(),
                null
        );

 能夠看到,每一個兩秒鐘,scheduler就會調用dispatcher,讓他工做。

dispatcher在這裏至關於一個master,他接到工做後,會通知本身的slave去工做(發送數據給rpc)。

代碼以下:

    ActorRef workRouter = getContext().actorOf(
            new Props(WorkerActor.class).withRouter(new RoundRobinRouter(40))
            , "transferRouter");

    @Override
    public void onReceive(Object message) throws Exception {
        ConcurrentLinkedQueue<String> runLogs = QueueManager.INSTANCE.getRunLogs();
        dispatch(runLogs);

    }

    private void dispatch(Queue<?> queue) {

        if (!queue.isEmpty()) {
            Object obj = queue.poll();
            List<String> data = new ArrayList<>();
            while (obj != null) {
                data.add(obj.toString());
                obj = queue.poll();
            }

            workRouter.tell(new DispatchMsg("runlogs", data), getSelf());
        }
    }

 首先定義了一個router,他負責按照輪訓算法,找到到底要讓哪一個slave去工做。

當dispatcher收到消息後,就讓router通知WorkerActor去工做,並把從queue取出的數據給他,讓他將這些數據經過rpc發送給遠端的flume。

這樣設計的目的在於:

    接口接受消息,暫時保存在queue中,快速回復客戶端,不堵塞。

    利用akka併發能力,從queue中取出消息,找到一個worker去進行耗時較長的rpc工做。


workeractor

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof DispatchMsg) {
            DispatchMsg msg = (DispatchMsg) message;
            String business = msg.business;
            List<String> datas = (List<String>) msg.data;

            sendMsg.ToFlume.sendDataToFlume(datas);

        }
    }

3.4 rpc發送

flume集成了rpc

    public void sendDataToFlume(List<String> datas) {

        List<Event> es = new LinkedList<Event>();
        for (String data : datas){
            // Create a Flume Event object that encapsulates the sample data
            Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
            es.add(event);
        }

        // Send the event
        try {
            client.appendBatch(es);
            System.out.println("data sent");
        } catch (EventDeliveryException e) {
            // clean up and recreate the client

            client.close();
            client = null;
            client = RpcClientFactory.getDefaultInstance(hostname, port);
        }
    }

這裏接受的數據爲list,批處理。

首先新建client,這裏注意hostname跟port,是flume服務器端source的ip跟端口。

而後批量發送數據。

4. Flume

flume的配置內容以下:exemple.conf

a1.channels = c1
a1.sources = r1
a1.sinks = k1

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414

a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

此處flume端爲簡單的單點配置,source接收41414的rpc消息,而後保存到channel中,sink到console中(數據收集通常sink到HDFS中,而且能夠多點收集)。

啓動命令以下:

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console


好了,啓動flume後,啓動play!而後利用客戶端發送消息,就能夠在flume端看到消息打印到console了。

項目的全部代碼在 https://github.com/YulinGUO/collectEvents

若是有問題,請留言。

結束

相關文章
相關標籤/搜索