Flume簡介與使用(二)——Thrift Source採集數據

Flume簡介與使用(二)——Thrift Source採集數據
html

  繼上一篇安裝Flume後,本篇將介紹如何使用Thrift Source採集數據。
java

  Thrift是Google開發的用於跨語言RPC通訊,它擁有功能強大的軟件堆棧和代碼生成引擎,容許定義一個簡單的IDL文件來生成不一樣語言的代碼,服務器端和客戶端經過共享這個IDL文件來構建來完成通訊。ios

  Flume的Thrift Source是其實現的衆多Source中的一個,Flume已經實現了服務器端,所以咱們能夠用任意本身熟悉的語言編寫本身的Thrift Source客戶端來採集數據,而後發送給Thrift Source服務器端。c++

  [一]、生成C++代碼apache

  下載源碼版的Flume,在apache-flume-1.6.0-src\flume-ng-sdk\src\main\thrift目錄下有Flume定義好的flume.thrift文件,如今只要用這個文件來生成咱們須要的代碼就好了。服務器

  flume.thrift文件內容以下:app

 1 namespace java org.apache.flume.thrift
 2 
 3 struct ThriftFlumeEvent {
 4   1: required map <string, string> headers,
 5   2: required binary body,
 6 }
 7 
 8 enum Status {
 9   OK,
10   FAILED,
11   ERROR,
12   UNKNOWN
13 }
14 
15 service ThriftSourceProtocol {
16   Status append(1: ThriftFlumeEvent event),
17   Status appendBatch(1: list<ThriftFlumeEvent> events),
18 }

  一、定義了一個ThriftFlumeEvent結構體,用來封裝發送的數據;socket

  二、定義了一個service類ThriftSourceProtocol,服務器端具體實現ThriftSourceProtocol裏面的兩個方法,再由客戶端調用這些方法把數據傳給Thrift Source服務器端。ide

  三、運行下面的命令:thrift --gen cpp flume.thrift,會在當前目錄生成gen-cpp目錄,裏面是Thrift自動生成c++頭文件和代碼。(在這以前要先安裝Thrift)
ui

  [二]、下面是編寫本身的客戶端代碼,我這裏是接收遠程傳過來的數據,而後發送給Flume的Thrift Source服務器。

  1 #include <arpa/inet.h>
  2 #include <sys/types.h>
  3 #include <sys/socket.h>
  4 #include <pthread.h>
  5 #include <unistd.h>
  6 #include <stdlib.h>
  7 #include "include/MESA_prof_load.h"
  8 #include "include/MESA_handle_logger.h"
  9 
 10 #include <string>
 11 #include <iostream>
 12 #include "gen-cpp/flume_constants.h"
 13 #include "gen-cpp/flume_types.h"
 14 #include "gen-cpp/ThriftSourceProtocol.h"
 15 #include <thrift/protocol/TBinaryProtocol.h>
 16 #include <thrift/protocol/TCompactProtocol.h>
 17 #include <thrift/transport/TSocket.h>
 18 #include <thrift/transport/TTransportUtils.h>
 19 using namespace std;
 20 using namespace apache::thrift;
 21 using namespace apache::thrift::protocol;
 22 using namespace apache::thrift::transport;
 23 
 24 #define LOG_PATH "/home/zjf/DFcode/trafficlog/traffic_source.log"
 25 #define DATA_BUFFER 2048    //send buffer data length
 26 #define BUFLEN   2048       //received buffer data length
 27 #define BATCH_SIZE 1000     //send event num to flume once
 28 
 29 //defined my C++ object
 30 class ThriftClient{
 31     public:
 32         // Thrift protocol needings...
 33         boost::shared_ptr<TTransport> socket;
 34         boost::shared_ptr<TTransport> transport;
 35         boost::shared_ptr<TProtocol> protocol;
 36         ThriftSourceProtocolClient* pClient;
 37 
 38     public:
 39         ThriftClient();
 40 };
 41 //cconstruction function, init the thrift source server ip and port
 42 ThriftClient::ThriftClient():
 43     socket(new TSocket("10.208.129.12",5497)),
 44     transport(new TFramedTransport(socket)),
 45     protocol(new TCompactProtocol(transport))
 46 {
 47     pClient = new ThriftSourceProtocolClient(protocol);
 48 }
 49 
 50 //log
 51 struct log_info_t{
 52     char *path;
 53     int log_level;
 54     void * handle;
 55 };
 56 struct log_info_t log_info;
 57 const char *module = "zjf_traffic_data_collector";
 58 
 59 //類的對象
 60 ThriftClient *client = new ThriftClient();
 61 std::map<std::string, std::string>  headers;
 62 std::vector<ThriftFlumeEvent> eventbatch;
 63 unsigned long long pkt_num_tgl = 0;
 64 
 65 int RecvAndSendUDP(){
 66     MESA_handle_runtime_log(log_info.handle, RLOG_LV_INFO, module, "RecvUDP be called");
 67     int listen_socket;          //socket id
 68     struct sockaddr_in    local;    //client IP, where to recevied data
 69     struct sockaddr_in    from;      //server IP(local host)
 70     char server_addr[16] = "10.208.129.12";    //received traffic IP
 71     int server_port = 6789;                    //received traffic port
 72     char send_buf[DATA_BUFFER] = {0};          //data send to flume
 73     char Buf[BUFLEN] = {0};
 74     int fromlen;
 75     int len;
 76 
 77     //init socket
 78 reconnect:
 79     memset(&local, 0, sizeof(local));
 80     local.sin_family = AF_INET;
 81     local.sin_addr.s_addr = inet_addr(server_addr);
 82     local.sin_port = htons(server_port);
 83     listen_socket = socket(AF_INET, SOCK_DGRAM, 0); // UDP socket
 84     if(listen_socket < 0) {
 85         printf("error udp socket\n");
 86     }else{
 87         printf("listen_socket create OK\n");
 88     }
 89     if(bind(listen_socket, (struct sockaddr *)&local, sizeof(local)) < 0) {
 90         printf("error udp bind\n");
 91         return -1;
 92     }else{
 93         printf("socket bind OK\n");
 94     }
 95 
 96     while(1){
 97         char sip[16] = {0};
 98         char dip[16] = {0};
 99         char srcport[6] = {0};
100         char destport[6] = {0};
101         char url[BUFLEN] = {0};
102         memset(Buf,0,BUFLEN);
103         fromlen = sizeof(from);
104         len = recvfrom(listen_socket, (void *)Buf, (size_t)BUFLEN, 0, (struct sockaddr *)&from,(socklen_t *)&fromlen);
105         if(len == -1) {
106             printf("error udp recvfrom\n");
107             close(listen_socket);
108             goto reconnect;
109         }
110         //parse received buf, transform to key-value
111         int i;
112         int sip_loc = 0;
113         int sport_loc = 0;
114         int dip_loc = 0;
115         int dport_loc = 0;
116         int dotcount = 0;
117         for(i=0;Buf[i] != '\0';i++){
118             if(Buf[i] == '.'){
119                 dotcount++;
120                 if(dotcount == 4){
121                     sip_loc = i;
122                     memcpy(sip,Buf,i);
123                 }
124                 else if(dotcount == 8){
125                     dip_loc = i;
126                     memcpy(dip,Buf+sport_loc+1,dip_loc-sport_loc-1);
127                 }
128                 else if(dotcount == 9){
129                     dport_loc = i;
130                     memcpy(destport,Buf+dip_loc+1,dport_loc-dip_loc-1);
131                     break;
132                 }
133                 else{}
134             }
135             if(Buf[i] == '>'){
136                 sport_loc = i;
137                 memcpy(srcport,Buf+sip_loc+1,sport_loc-sip_loc-1);
138             }
139         }
140         memcpy(url,Buf+dport_loc+1,strlen(Buf)-dport_loc);
141         unsigned long src_ip = inet_addr(sip);
142         unsigned long dst_ip = inet_addr(dip);
143         sprintf(send_buf,"SrcIP=%u SrcPort=%s DestIP=%u DestPort=%s",ntohl(src_ip),srcport,ntohl(dst_ip),destport);
144         //construct an event and append to send
145         if(0 != strlen(send_buf) ){
146             pkt_num_tgl++;
147             string sBody(send_buf);
148             ThriftFlumeEvent tfEvent;
149             tfEvent.__set_headers(headers);
150             tfEvent.__set_body(sBody);
151             eventbatch.push_back(tfEvent);
152             if(eventbatch.size() >= BATCH_SIZE){
153                 if(!client->transport->isOpen())
154                     client->transport->open();
155                 Status::type res = client->pClient->appendBatch(eventbatch);
156                 if(res != Status::OK){
157                     MESA_handle_runtime_log(log_info.handle, RLOG_LV_FATAL, module, "WARNING: send event via thrift failed, return code:%d",res);
158                 }else{
159                     //printf("sended %lld event data to flume successful\n", pkt_num_tgl);
160                 }
161                 eventbatch.clear();
162             }
163         }
164         bzero(send_buf,DATA_BUFFER);
165     }
166 }
167 
168 
169 int main()
170 {
171     //create――logger
172     log_info.path = (char *)LOG_PATH;
173     log_info.log_level = 10;
174     log_info.handle = MESA_create_runtime_log_handle(log_info.path, log_info.log_level);
175     //open thrift connection
176     if(!client->transport->isOpen()){
177         client->transport->open();
178     }
179     eventbatch.clear();
180     RecvAndSendUDP();
181     return 0;
182 }

 [三]、編譯並運行

  g++ -g -DHAVE_NETINET_IN_H -I. -I/usr/local/include/thrift -L/usr/local/lib rec_send_traffic_thrift.cpp gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp  -o  rec_send_traffic_thrift  -lthrift   -lpcap -L/usr/lib64 -lMESA_htable -lpthread -lMESA_handle_logger

  用守護進程啓動程序:

 1 #!/bin/sh
 2 
 3 while [ 1 ]; do
 4     ulimit -c unlimited
 5     #./jz
 6     #cgexec -g cpu,memory:/MESA/jz ./jz >> jz.log
 7     ./rec_send_traffic_thrift
 8     #./jz
 9     echo program crashed, restart at `date +"%w %Y/%m/%d, %H:%M:%S"` >> RESTART.log
10     sleep 10
11 done

 


推薦博文:【1】http://www.micmiu.com/soa/rpc/thrift-sample/

     【2】http://www.mamicode.com/info-detail-869223.html

     【3】http://blog.csdn.net/yuzx2008/article/details/50179033

     【4】http://shiyanjun.cn/archives/456.html

     【5】http://flume.apache.org/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift

轉載請註明原文出處,謝謝

相關文章
相關標籤/搜索