前言前端
現現在,大數據如火如荼。針對用戶行爲,用戶喜愛等後續大數據分析也是十分火熱。這個小項目實現了後臺數據收集的一系列完整流程。java
項目整體流程以及用到的技術git
Play ! 做爲web服務器,使用RESTful 規範編寫接口(客戶端事先埋點,而後調用接口上傳數據)github
Play !接口接受到的記錄(json形式)通過處理後,先保存到 concurrentQueue中 web
Play! 啓動後,start一個Akka schedulable actor.他每隔一段時間,讓子actor去poll queue中的數據 算法
調用flume的封裝的rpc,將數據發送到指定的端口。shell
Flume source端接收數據,按照配置重定向數據,sink到console.
json
3. 後臺實現服務器
3.1 編寫接口併發
採用RESTful編寫接口,首先在play! 的conf中routes定義接口:
#run log POST /events/runlogs controllers.RunLogs.create()
而後編寫controller
public static Result create(){ JsonNode js = request().body().asJson(); RunLog.create(js); //return ok anyway return ok(); }
而後是model
public static void create(JsonNode js) { if (js.has(LOG)) { String logBody = js.findPath(LOG).asText(); //add one log into queue QueueManager.INSTANCE.addRunLog(logBody); } }
能夠看到,這些代碼遵循MVC規範,首先讓play!知道接口的定義,前端發送過來請求的時候,知道調用哪一個controller中的哪一個方法,並返回數據。而controller將數據從請求體中剝離出來,併發送給真正處理數據的model.
3.2 Queue
看到model中,接收到數據後,添加到queue中保存。
定義爲:ConcurrentLinkedQueue<String>
3.3 Akka 定時調度
Akka負責定時從queue取出數據,而後經過rpc發送給flume。
akka的初始化,啓動是伴隨着play! 的啓動而進行的,每一個play!只有一個akka system。因此首先要編寫一個Global extends GlobalSetting(GlobalSettings is instantiated by the framework when an application starts,to let you perform specific tasks at start-up or shut-down),而後override onStart方法,在此方法中初始化akka的調度器。
代碼以下:
ActorRef dispatcher = Akka.system().actorOf(new Props(Dispatcher.class)); Akka.system().scheduler().schedule( Duration.create(200,TimeUnit.MICROSECONDS), Duration.create(2,TimeUnit.SECONDS), dispatcher, "run", Akka.system().dispatcher(), null );
能夠看到,每一個兩秒鐘,scheduler就會調用dispatcher,讓他工做。
dispatcher在這裏至關於一個master,他接到工做後,會通知本身的slave去工做(發送數據給rpc)。
代碼以下:
ActorRef workRouter = getContext().actorOf( new Props(WorkerActor.class).withRouter(new RoundRobinRouter(40)) , "transferRouter"); @Override public void onReceive(Object message) throws Exception { ConcurrentLinkedQueue<String> runLogs = QueueManager.INSTANCE.getRunLogs(); dispatch(runLogs); } private void dispatch(Queue<?> queue) { if (!queue.isEmpty()) { Object obj = queue.poll(); List<String> data = new ArrayList<>(); while (obj != null) { data.add(obj.toString()); obj = queue.poll(); } workRouter.tell(new DispatchMsg("runlogs", data), getSelf()); } }
首先定義了一個router,他負責按照輪訓算法,找到到底要讓哪一個slave去工做。
當dispatcher收到消息後,就讓router通知WorkerActor去工做,並把從queue取出的數據給他,讓他將這些數據經過rpc發送給遠端的flume。
這樣設計的目的在於:
接口接受消息,暫時保存在queue中,快速回復客戶端,不堵塞。
利用akka併發能力,從queue中取出消息,找到一個worker去進行耗時較長的rpc工做。
workeractor
@Override public void onReceive(Object message) throws Exception { if (message instanceof DispatchMsg) { DispatchMsg msg = (DispatchMsg) message; String business = msg.business; List<String> datas = (List<String>) msg.data; sendMsg.ToFlume.sendDataToFlume(datas); } }
3.4 rpc發送
flume集成了rpc
public void sendDataToFlume(List<String> datas) { List<Event> es = new LinkedList<Event>(); for (String data : datas){ // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); es.add(event); } // Send the event try { client.appendBatch(es); System.out.println("data sent"); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); } }
這裏接受的數據爲list,批處理。
首先新建client,這裏注意hostname跟port,是flume服務器端source的ip跟端口。
而後批量發送數據。
4. Flume
flume的配置內容以下:exemple.conf
a1.channels = c1 a1.sources = r1 a1.sinks = k1 a1.channels.c1.type = memory a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger
此處flume端爲簡單的單點配置,source接收41414的rpc消息,而後保存到channel中,sink到console中(數據收集通常sink到HDFS中,而且能夠多點收集)。
啓動命令以下:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
好了,啓動flume後,啓動play!而後利用客戶端發送消息,就能夠在flume端看到消息打印到console了。
項目的全部代碼在 https://github.com/YulinGUO/collectEvents
若是有問題,請留言。
結束