Flume鏈接oracle實時推送數據到kafka

版本號:

RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.1java

flume安裝

RedHat6.5安裝單機flume1.6:RedHat6.5安裝單機flume1.6sql

kafka安裝

RedHat6.5安裝kafka集羣 : RedHat6.5安裝kafka集羣數據庫

一、下載flume-ng-sql-source-1.4.3.jar

CSDN下載地址:http://download.csdn.net/detail/chongxin1/9892184apache

flume-ng-sql-source-1.4.3.jar是flume用於鏈接數據庫的重要支撐jar包。oracle

二、把flume-ng-sql-source-1.4.3.jar放到flume的lib目錄下

 

三、把oracle(此處用的是oracle庫)的驅動包放到flume的lib目錄下

oracle的jdbc驅動包,放在oracle安裝目錄下,路徑爲:D:\app\product\11.2.0\dbhome_1\jdbc\libapp

如圖:async

把ojdbc5.jar放到flume的lib目錄下,如圖:ide

四、新建flume-sql.conf

在conf目錄新建flume-sql.conf :測試

 
  1. touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
  2. sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf

flume-sql.conf輸入如下內容:ui

 

 
  1. agentOne.channels = channelOne
  2. agentOne.sources = sourceOne
  3. agentOne.sinks = sinkOne
  4. ###########sql source#################
  5. # For each one of the sources, the type is defined
  6. agentOne.sources.sourceOne.type = org.keedio.flume.source.SQLSource
  7. agentOne.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
  8. # Hibernate Database connection properties
  9. agentOne.sources.sourceOne.hibernate.connection.user = flume
  10. agentOne.sources.sourceOne.hibernate.connection.password = 1234
  11. agentOne.sources.sourceOne.hibernate.connection.autocommit = true
  12. agentOne.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
  13. agentOne.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
  14. agentOne.sources.sourceOne.run.query.delay=10000
  15. agentOne.sources.sourceOne.status.file.path = /tmp
  16. agentOne.sources.sourceOne.status.file.name = sqlSource.status
  17. # Custom query
  18. agentOne.sources.sourceOne.start.from = 0
  19. agentOne.sources.sourceOne.custom.query = select sysdate from dual
  20. agentOne.sources.sourceOne.batch.size = 1000
  21. agentOne.sources.sourceOne.max.rows = 1000
  22. agentOne.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
  23. agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
  24. agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
  25. ##############################
  26. agentOne.channels.channelOne.type = memory
  27. agentOne.channels.channelOne.capacity = 10000
  28. agentOne.channels.channelOne.transactionCapacity = 10000
  29. agentOne.channels.channelOne.byteCapacityBufferPercentage = 20
  30. agentOne.channels.channelOne.byteCapacity = 800000
  31.  
  32. agentOne.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSink
  33. agentOne.sinks.sinkOne.topic = test
  34. agentOne.sinks.sinkOne.brokerList = 192.168.168.200:9092
  35. agentOne.sinks.sinkOne.requiredAcks = 1
  36. agentOne.sinks.sinkOne.batchSize = 20
  37. agentOne.sinks.sinkOne.channel = channelOne
  38.  
  39. agentOne.sinks.sinkOne.channel = channelOne
  40. agentOne.sources.sourceOne.channels=channelOne

五、flume-ng啓動flume-sql.conf和測試

 

 
  1. cd /usr/local/flume/apache-flume-1.6.0-bin
  2. bin/flume-ng agent --conf conf --conf-file conf/flume-sql.conf --name agentOne -Dflume.root.logger=INFO,console

運行成功日誌以下:

 
  1. 2017-07-08 00:12:55,393 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: sinkOne: Successfully registered new MBean.
  2. 2017-07-08 00:12:55,394 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: sinkOne started
  3. 2017-07-08 00:12:55,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test)
  4. 2017-07-08 00:12:55,528 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to localhost:9092 for producing
  5. 2017-07-08 00:12:55,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from localhost:9092
  6. 2017-07-08 00:12:55,582 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to slave2:9092 for producing

啓動kafka的消費者,監聽topic主題:

 
  1. kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

運行成功日誌以下:

 
  1. [root@master kafka_2.11-0.9.0.0]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic test  
  2. "2017-07-08 00:28:53.0"
  3. "2017-07-08 00:29:03.0"
  4. "2017-07-08 00:29:13.0"
  5. "2017-07-08 00:29:23.0"
  6. "2017-07-08 00:29:33.0"
  7. "2017-07-08 00:29:43.0"
  8. "2017-07-08 00:29:53.0"
  9. "2017-07-08 00:30:03.0"

 

六、常見報錯解決辦法

 
  1.  2017-06-27 16:26:01,293 (C3P0PooledConnectionPoolManager[identityToken->1hgey889o1sjxqn51anc3fr|29938ba5]-AdminTaskTimer) [WARN - com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector.run(ThreadPoolAsynchronousRunner.java:759)] com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@2d6227f3 -- APPARENT DEADLOCK!!! Complete Status:​

鏈接超時,形成死鎖,仔細檢查jdbc:oracle:thin:@192.168.168.100:1521/orcl,用戶名/密碼是否正確;

若是正確,仍是鏈接不上,請檢查oralce數據庫是否開啓了防火牆,若是是,添加入站規則或直接關閉防火牆。

相關文章
相關標籤/搜索