Flume 讀取RabbitMq消息隊列消息,並將消息寫入kafka

首先是關於flume的基礎介紹git

組件名稱    github

功能介紹web

Agent代理apache

使用JVM 運行Flume。每臺機器運行一個agent,可是能夠在一個agent中包含多個sources和sinks。fetch

Client客戶端ui

生產數據,運行在一個獨立的線程。spa

Source源插件

從Client收集數據,傳遞給Channel。線程

Sink接收器3d

從Channel收集數據,進行相關操做,運行在一個獨立線程。

Channel通道

鏈接 sources 和 sinks ,這個有點像一個隊列。

Events事件

傳輸的基本數據負載。

目前來講flume是支持多種source

其中是支持讀取jms消息隊列消息,網上並無找到關於讀取rabbitmq的教程

雖然flume並不支持讀取rabbitMq,因此須要對flume進行二次開發

這裏主要就是flume怎麼從rabbitMq讀取數據

這裏從git上找到了一個關於flume從rabbitMq讀取數據的插件

  下載地址是:https://github.com/gmr/rabbitmq-flume-plugin

上面有一些英文的描述,你們能夠看下

環境介紹

centOS 7.3   jdk1.8   cdh5.14.0

1.用 mvn 打包該項目,會生成兩個JAR包

 

 

2.由於我這邊使用的以cdh方式安裝集成flume的,因此把這兩個jar  扔到  /usr/lib   下面

  若是是普通的安裝方式須要把這兩個jar包複製到 flume安裝目錄的lib下面

 

 

3.進入cdh管理頁面配置Agent

 

下面是詳細的配置,我這邊是直接把消息寫入kafka集羣裏 的

tier1.sources  = source1

tier1.channels = channel1

tier1.sinks    = sink1

tier1.sources.source1.type     = com.aweber.flume.source.rabbitmq.RabbitMQSource

tier1.sources.source1.bind     = 127.0.0.1

tier1.sources.source1.port     = 5672

tier1.sources.source1.virtual-host = /

tier1.sources.source1.username = guest

tier1.sources.source1.password = guest

tier1.sources.source1.queue = test

tier1.sources.source1.prefetchCount = 10

tier1.sources.source1.channels = channel1

tier1.sources.source1.threads = 2

tier1.sources.source1.interceptors = i1

tier1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

tier1.sources.source1.interceptors.i1.preserveExisting = true

tier1.channels.channel1.type   = memory

tier1.sinks.sink1.channel      = channel1

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink 

tier1.sinks.sink1.topic = flume_out 

tier1.sinks.sink1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,27.0.0.1:9094

tier1.sinks.sink1.requiredAcks = 1 

tier1.sinks.sink11.batchSize = 20 

配置完成更新配置從新啓動Agent

 

 

 

這個是接收到rabbitMq消息

 

 

大功告成,若是配置中有疑問的能夠留言,我看到後會回覆

相關文章
相關標籤/搜索