Flume(3)source組件之NetcatSource使用介紹

1、概述:

本節首先提供一個基於netcat的source+channel(memory)+sink(logger)的數據傳輸過程。而後剖析一下NetcatSource中的代碼執行邏輯。java

2、flume配置文件:

下面的配置文件netcat.conf中定義了source使用netcat,它會監聽44444端口。node

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = locahost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、命令行啓動:

切換到flume的安裝目錄下,執行下述代碼:apache

bin/flume-ng agent --conf conf --conf-file study/netcat.conf --name a1 -Dflume.root.logger=INFO,console

4、利用telnet來直接訪問且發送數據:

在命令行中鍵入如下代碼:其中node5是flume所在的主機名。編程

telnet node5 44444

在telnet命令行輸入信息:eclipse

image

在flume的啓動界面就會輸出接收到的數據:socket

image

由此,使用netcat做爲source的功能即演示成功了。this

除了利用telnet來發送數據之外,也能夠本身實現一個socket編程來向node5主機的44444端口發送數據。spa

固然,咱們發現了一個問題,明明在telnet中發送的數據是:This is flume netcat source!,接收到的數據倒是This is flume ne。數據不完整。後面經過分析一下源碼,看能不能找到緣由。命令行

出現上述的顯示不完整的狀況,是由於咱們使用的是LoggerSink組件,它內部的實現邏輯致使了僅打印了16個字符。線程

image

 

5、agent啓動的基本步驟:

image

6、NetcatSource源碼剖析:

該類的全路徑爲org.apache.flume.source.NetcatSource,繼承了AbstractSource 並實現了Configurable接口。

因爲NetcatSource一個監聽服務,因此它是經過EventDrivenSourceRunner來啓動一個線程,調用其start()方法的。

image

首先在正式啓動source以前,會首先執行configure方法,初始化配置文件中提供的參數:bind\port\ack-every-event\max-line-length。

start()方法以下:

image

該方法內建立一個AcceptHandler內部類實例,實際的監聽工做就是在該類的run方法中來實現的。

image

相關文章
相關標籤/搜索