【Java】【Flume】Flume-NG閱讀源代碼AvroSink

  org.apache.flume.sink.AvroSink是用來經過網絡來數據傳輸的。可以將event發送到RPCserver(比方AvroSource),使用AvroSink和AvroSource可以組成分層結構。html

它繼承自AbstractRpcSink  extends AbstractSink implements Configurable這跟其它的sink同樣都得extends AbstractSink implements Configurable。因此重點也在confgure、start、process、stop這四個方法。實現了initializeRpcClient(Properties props)方法。算法

  1、configure(Context context)方法,先獲取配置文件裏的主機hostname和端口port。設置clientProps的屬性hosts=h1,hosts.h1=hostname:port。而後將配置信息中的所有信息放入clientProps中;獲取cxnResetInterval表示反覆創建鏈接的時間間隔。默認是0就是不反覆創建鏈接。apache

  2、start()方法是調用createConnection()創建鏈接,假設出現異常就調用destroyConnection()掐斷鏈接,避免資源泄漏。createConnection()方法主要是初始化client = initializeRpcClient(clientProps)以及建立一個線程。並運行在給定延遲cxnResetInterval後運行一次銷燬連接destroyConnection(),由於默認cxnResetInterval=0。因此是不會運行這個線程的。這點不是很是明確,爲何要銷燬???initializeRpcClient(clientProps)方法會依據配置文件裏的信息進行構造對應的RpcClient:首先會獲取"client.type"參數指定的類型可用的有四種(NettyAvroRpcClient(假設沒有"client.type"則使用這個做爲默認Client)、FailoverRpcClient、LoadBalancingRpcClient、ThriftRpcClient),實例化以後需要對其在進行必要的配置運行client.configure(properties)進行配置:網絡

  (1)NettyAvroRpcClient.configure(Properties properties)方法首先會獲取鎖,檢查connState鏈接狀態要保證是沒有配置過的;其次獲取"batch-size"設置batchSize,假設配置的小於1則使用默認值100;獲取「hosts」。假設配置了多個hosts則僅僅使用第一個。獲取"hosts."前綴。假設有多個則使用第一個。再解析出hostname和port,構建一個InetSocketAddress的對象address;獲取鏈接超時時間"connect-timeout"。設置connectTimeout,假設配置的小於1000則使用默認值20000,單位是ms。獲取對應時間"request-timeout"。設置requestTimeout,假設配置的小於1000,則使用默認值20000,單位ms;獲取壓縮類型"compression-type",假設有配置壓縮還需要獲取壓縮的等級compressionLevel;最後調用connect()連接RPCserver。app

  實際的連接在connect(long timeout, TimeUnit tu)方法中,先構造一個線程池callTimeoutPool;而後依據是否有壓縮構造對應的工廠類CompressionChannelFactory(有壓縮配置)或者NioClientSocketChannelFactory(無壓縮配置);構造一個dom

NettyTransceiver(this.address,socketChannelFactory,tu.toMillis(timeout))收發器對象transceiver。依據transceiver返回一個avroClient;最後設置連接狀態爲READY。socket

  (2)FailoverRpcClient.configure(Properties properties)方法會調用configureHosts(Properties properties)方法,這種方法會獲取配置文件裏的host列表hosts。獲取最大嘗試次數"max-attempts",設置maxTries,默認是hosts的大小;獲取批量大小字體

"batch-size"。設置batchSize。假設配置的小於1則使用默認大小100。將此client置爲活動的isActive=true。可以看出這個client可以使用多個host。

  (3)LoadBalancingRpcClient.configure(Properties properties)會獲取配置文件裏的host列表hosts,且不一樣意少於兩個,不然爆異常;獲取主機選擇器"host-selector",有兩種內置的選擇器:LoadBalancingRpcClient.RoundRobinHostSelector和LoadBalancingRpcClient.RandomOrderHostSelector。默認是ROUND_ROBIN(即RoundRobinHostSelector)輪詢的方式(也可以本身定義。要實現LoadBalancingRpcClient.HostSelector接口)。獲取"backoff"。設置backoff(是否使用推遲算法,就是sink.process出問題後對這個sink設置懲處時間,在此期間再也不以爲其可活動)的boolean值(默認false就是不啓用);獲取最大推遲時間"maxBackoff",設置maxBackoff。而後依據選擇器是ROUND_ROBIN仍是RANDOM選擇相應的類並實例化selector,最後設置主機selector.setHosts(hosts)。this

  這兩個內置選擇器:RoundRobinHostSelector實際使用的是RoundRobinOrderSelector;RandomOrderHostSelector實際使用的是RandomOrderSelector。這兩個都在Flume-NG源代碼閱讀之SinkGroups和SinkRunner 這篇文章中有介紹。這裏再也不說明。spa

  (4)ThriftRpcClient.configure(Properties properties)會獲取狀態鎖stateLock.lock()。獲取配置文件裏的host列表中的第一個。僅僅需要一個;獲取批量大小"batch-size",設置batchSize,假設配置的小於1則使用默認大小100;獲取主機名hostname和端口port。獲取響應時間requestTimeout,假設小於1000設置爲默認的20000ms;獲取鏈接池大小"maxConnections",設置connectionPoolSize,假設大小小於1則設置爲默認的值5。建立鏈接池管理對象connectionManager= new ConnectionPoolManager(connectionPoolSize);設置鏈接狀態爲READY,connState = State.READY;最後狀態鎖解鎖stateLock.unlock()。

  這四個Client都是extends AbstractRpcClient implements RpcClient。

  3、process()方法。代碼例如如下:

複製代碼
 1   public Status process() throws EventDeliveryException {
 2     Status status = Status.READY;
 3     Channel channel = getChannel();    //得到channel
 4     Transaction transaction = channel.getTransaction();    //建立事務
 5 
 6     try {
 7       transaction.begin();    //事務開始
 8 
 9       verifyConnection();    //確保存在連接且處於活動狀態,假設連接處於非活動狀態銷燬並重建連接
10 
11       List<Event> batch = Lists.newLinkedList();
12 
13       for (int i = 0; i < client.getBatchSize(); i++) {    //保證這批次的event數量不可能超過客戶端批量處理的最大處理數量
14         Event event = channel.take();
15 
16         if (event == null) {        //表示channel中沒有數據了
17           break;
18         }
19 
20         batch.add(event);    //增長event列表
21       }
22 
23       int size = batch.size();    //獲取這批次取得的event的數量
24       int batchSize = client.getBatchSize();        //獲取客戶端可以批量處理的大小
25 
26       if (size == 0) {
27         sinkCounter.incrementBatchEmptyCount();
28         status = Status.BACKOFF;
29       } else {
30         if (size < batchSize) {
31           sinkCounter.incrementBatchUnderflowCount();
32         } else {
33           sinkCounter.incrementBatchCompleteCount();
34         }
35         sinkCounter.addToEventDrainAttemptCount(size);
36         client.appendBatch(batch);        //批量處理event
37       }
38 
39       transaction.commit();        //事務提交
40       sinkCounter.addToEventDrainSuccessCount(size);
41 
42     } catch (Throwable t) {
43       transaction.rollback();    //事務回滾
44       if (t instanceof Error) {
45         throw (Error) t;
46       } else if (t instanceof ChannelException) {
47         logger.error("Rpc Sink " + getName() + ": Unable to get event from" +
48             " channel " + channel.getName() + ". Exception follows.", t);
49         status = Status.BACKOFF;
50       } else {
51         destroyConnection();        //銷燬連接
52         throw new EventDeliveryException("Failed to send events", t);
53       }
54     } finally {
55       transaction.close();    //事務關閉
56     }
57 
58     return status;
59   }
複製代碼

  即便本批次event的數量達不到client.getBatchSize()(channel中沒數據了)也會立刻發送到RPCserver。verifyConnection()方法是確保存在連接且處於活動狀態。假設連接處於非活動狀態銷燬並重建連接。

假設本批次沒有event,則不會想RPC發送不論什麼數據。client.appendBatch(batch)方法是批量發送event。

  (1)NettyAvroRpcClient.appendBatch(batch)方法會調用appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS)方法,該方法會首先確認連接處於READY狀態,不然報錯;而後將每個event又一次封裝成AvroFlumeEvent,放入avroEvents列表中;而後構造一個CallFuture和avroEvents一同封裝成一個Callable放入線程池 handshake = callTimeoutPool.submit(callable)中去運行,其call方法內容是avroClient.appendBatch(avroEvents, callFuture)就是在此批量提交到RPCserver。而後handshake.get(connectTimeout, TimeUnit.MILLISECONDS)在規定時間等待運行的返回結果以及等待append的完畢waitForStatusOK(callFuture, timeout, tu)。具體的可看這裏Flume的Avro Sink和Avro Source研究之二 : Avro Sink 。有對於這兩個future更深刻的分析。

一個批次傳輸的event的數量是min(batchSize,events.size())

  (2)FailoverRpcClient.appendBatch(batch)方法會作最多maxTries次嘗試直到獲取到可以正確發送events的Client,經過localClient=getClient()--》getNextClient()來獲取client,這種方法每次會獲取hosts中的下一個HostInfo,並使用NettyAvroRpcClient來做爲RPC Client,這就又回到了(1)中。這種方法另外一個要注意的就是會先從當前的lastCheckedhost+1位置向後找可以使用的Client,假設不行會再從開始到到lastCheckedhost再找,再找不到就報錯。使用localClient.appendBatch(events)來處理events。可參考(1)。

  (3)LoadBalancingRpcClient.appendBatch(batch)方法,首先會獲取可以發送到的RPCserver的迭代器Iterator<HostInfo> it = selector.createHostIterator()。而後取一個HostInfo,RpcClient client = getClient(host)這個Client和(2)同樣都是NettyAvroRpcClient。但是getClient方法會設置一個保存名字和client映射的clientMap;client.appendBatch(events)運行以後就會跳出循環,下一次appendBatch會選擇下一個client運行。

  (4)ThriftRpcClient.appendBatch(batch)方法,從connectionManager.checkout()獲取一個client。ConnectionPoolManager類主要維護倆對象availableClients用來存放可用的client(是一個ClientWrapper。維護一個ThriftSourceProtocol.Client client 是用來批量處理event的)、checkedOutClients用來存儲從availableClients中拿出的Client表示正在使用的Client;ConnectionPoolManager.checkout()用於從availableClients中remove出client並放入checkedOutClients中,返回這個client。ConnectionPoolManager.checkIn(ClientWrapper client)方法用於將指定的Client從checkedOutClient中remove出並放入availableClients中;ConnectionPoolManager.destroy(ClientWrapper client)用於將checkedOutClients中的指定Client   remove並close。appendBatch方法中得到client後,會每次封裝min(batchSize,events.size())個event,把他們封裝成ThriftFlumeEvent增長thriftFlumeEvents列表,而後假設thriftFlumeEvents>0則運行doAppendBatch(client, thriftFlumeEvents).get(requestTimeout,TimeUnit.MILLISECONDS)堵塞等待傳輸完成。

doAppendBatch方法會構建一個Callable其call方法運行client.client.appendBatch(e)。將這個Callable放入線程池callTimeoutPool中運行並返回運行結果Future。

  以上四種RpcClient的append(Event event)方法也比較easy理解,再也不講述。

  4、stop()方法主要是銷燬連接,關閉cxnResetExecutor。

  

  事實上flume支持avro和thrift兩種(眼下)傳輸,上面的(2)和(3)僅僅只是是對(1)的上層業務作了一次封裝而已,本質上還它們是相同的avro(基於netty)。還記得在同一時間avrosink它支持壓縮。

相關文章
相關標籤/搜索