首先是關於flume的基礎介紹git
組件名稱 github |
功能介紹web |
Agent代理apache |
使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。fetch |
Client客戶端ui |
生產數據,運行在一個獨立的線程。spa |
Source源插件 |
從Client收集數據,傳遞給Channel。線程 |
Sink接收器3d |
從Channel收集數據,進行相關操做,運行在一個獨立線程。 |
Channel通道 |
鏈接 sources 和 sinks ,這個有點像一個隊列。 |
Events事件 |
傳輸的基本數據負載。 |
目前來講flume是支持多種source
其中是支持讀取jms消息隊列消息,網上並無找到關於讀取rabbitmq的教程
雖然flume並不支持讀取rabbitMq,因此須要對flume進行二次開發
這裏主要就是flume怎麼從rabbitMq讀取數據
這裏從git上找到了一個關於flume從rabbitMq讀取數據的插件
下載地址是:https://github.com/gmr/rabbitmq-flume-plugin
上面有一些英文的描述,你們能夠看下
環境介紹
centOS 7.3 jdk1.8 cdh5.14.0
1.用 mvn 打包該項目,會生成兩個JAR包
2.由於我這邊使用的以cdh方式安裝集成flume的,因此把這兩個jar 扔到 /usr/lib 下面
若是是普通的安裝方式須要把這兩個jar包複製到 flume安裝目錄的lib下面
3.進入cdh管理頁面配置Agent
下面是詳細的配置,我這邊是直接把消息寫入kafka集羣裏 的
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
tier1.sources.source1.bind = 127.0.0.1
tier1.sources.source1.port = 5672
tier1.sources.source1.virtual-host = /
tier1.sources.source1.username = guest
tier1.sources.source1.password = guest
tier1.sources.source1.queue = test
tier1.sources.source1.prefetchCount = 10
tier1.sources.source1.channels = channel1
tier1.sources.source1.threads = 2
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
tier1.sources.source1.interceptors.i1.preserveExisting = true
tier1.channels.channel1.type = memory
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = flume_out
tier1.sinks.sink1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,27.0.0.1:9094
tier1.sinks.sink1.requiredAcks = 1
tier1.sinks.sink11.batchSize = 20
配置完成更新配置從新啓動Agent
這個是接收到rabbitMq消息
大功告成,若是配置中有疑問的能夠留言,我看到後會回覆