ClickHouse - 使用flume同步oracle數據至clickhouse

clickhouse在上一篇轉載的博客中已初步介紹,最近在公司項目中,遇到了數據庫大量數據查詢慢的問題,藉此來實戰clickhouse,本文重點介紹數據同步。java

接下來重點講一下,使用flume同步oracle數據至clickhouse。git

安裝flume

1. 下載flume

wget http://www.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gzgithub

2. 解壓安裝

tar zxvf apache-flume-1.5.2-bin.tar.gzsql


打包flume-ng-sql-source

1. 在github上下載源碼

https://github.com/keedio/flume-ng-sql-source數據庫

2. 本地編譯打包

(1)爲了能正確同步到clickhouse,需修改代碼,如圖: apache

將默認分隔符由‘,’改成‘\t’;(不改的話,插入數據到clickhouse會報錯)oracle

(2)編譯打包:mvn package -Dmaven.test.skip=truemaven

(3)將打包的jar包flume-ng-sql-source-1.5.2.jar,上傳至flume的lib目錄下。ide


打包flume-clickhouse-sink

這個相對麻煩一些,網上沒有詳細的資料,我這裏儘可能詳細敘述,有問題能夠聯繫我。工具

1. 下載flume-ng-kafka-sink

https://github.com/camathieu/flume-ng-kafka-sink.git

這裏爲何下載flume-ng-kafka-sink,主要做爲參考,將項目中的kafka-sink改成clickhouse-sink。

2. 改造

(1)將org.apache.flume.sink.kafka包下的KafkaSink所有屏蔽。

(2)修改pom,如圖:

內容以下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <name>Apache Flume ClickHouse Sink</name>
  <description>Kafka 0.8+ sink for Apache Flume NG</description>

  <groupId>org.apache.flume.sink.clickhouse</groupId>
  <artifactId>flume-clickhouse-sink</artifactId>
  <version>1.5.2</version>
  <packaging>jar</packaging>
  
  <parent>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-sinks</artifactId>
    <version>1.4.0</version>
  </parent>

  <dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
    </dependency>
    <dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka_2.10</artifactId>
	    <version>0.8.0</version>
    </dependency>

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>19.0</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.6</source>
          <target>1.6</target>
        </configuration>
      </plugin>
    </plugins>
  </build> 
</project>

(3)下載clickhouse-sink代碼

下載地址:https://reviews.apache.org/r/50692/diff/1#2

以下圖,將代碼拷貝至本地

(4)修改代碼

在測試過程當中,遇到了同步的數據中文亂碼的問題,須要修改類ClickHouseSink,封裝HttpEntity的時候指定StreamUtils.UTF_8格式,如圖:

其中涉及到了三個工具類(StreamUtils、StringUtils、Utils),直接複製拷貝到工程中便可。

這三個類,能夠從項目中找到:https://github.com/yandex/clickhouse-jdbc.git

3. 本地編譯打包

執行 mvn package,生成包:flume-clickhouse-sink-1.5.2.jar

4. 使用jar包

將打包的jar包flume-ng-sql-source-1.5.2.jar,上傳至flume的lib目錄下。


配置文件

1. 建立配置文件

在flume的conf目錄下建立配置文件:oracle-clickhouse.conf

2. 修改配置文件

配置文件格式參考flume官網文檔,這裏不做過多介紹,直接上示例,我這裏同步了兩張表,每張表都有一個channel、source和sink:

agent.channels = channelMProductPL channelMProductPP
agent.sources = sourceMProductPL sourceMProductPP
agent.sinks = sinkMProductPL sinkMProductPP

###########sql source#################
# For each Test of the sources, the type is defined
agent.sources.sourceMProductPL.type = org.keedio.flume.source.SQLSource
agent.sources.sourceMProductPL.hibernate.connection.url = jdbc:oracle:thin:@10.10.10.10:1521:orcl

# Hibernate Database connection properties
agent.sources.sourceMProductPL.hibernate.connection.user = user
agent.sources.sourceMProductPL.hibernate.connection.password = password
agent.sources.sourceMProductPL.hibernate.connection.autocommit = true
agent.sources.sourceMProductPL.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
agent.sources.sourceMProductPL.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
#agent.sources.sourceMProductPL.hibernate.table = M_PRODUCT_PL
agent.sources.sourceMProductPL.run.query.delay=10000
agent.sources.sourceMProductPL.enclose.by.quotes = false

agent.sources.sourceMProductPL.status.file.path = /tmp/flume/apache-flume-1.5.2-bin
agent.sources.sourceMProductPL.status.file.name = agent.sqlSource.status.mProductPL

agent.sources.sourceMProductPL.inputCharset = UTF-8

# Custom query
agent.sources.sourceMProductPL.start.from = 0
agent.sources.sourceMProductPL.custom.query = SELECT PRODUCT_PL,TXTSH FROM M_PRODUCT_PL WHERE "TO_NUMBER"(PRODUCT_PL) > $@$

agent.sources.sourceMProductPL.batch.size = 1
#1000
agent.sources.sourceMProductPL.max.rows = 1
#1000
agent.sources.sourceMProductPL.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sourceMProductPL.hibernate.c3p0.min_size=1
agent.sources.sourceMProductPL.hibernate.c3p0.max_size=10

##############################

# For each Test of the sources, the type is defined
agent.sources.sourceMProductPP.type = org.keedio.flume.source.SQLSource
agent.sources.sourceMProductPP.hibernate.connection.url = jdbc:oracle:thin:@10.10.10.10:1521:orcl
# Hibernate Database connection properties
agent.sources.sourceMProductPP.hibernate.connection.user = user
agent.sources.sourceMProductPP.hibernate.connection.password = password
agent.sources.sourceMProductPP.hibernate.connection.autocommit = true
agent.sources.sourceMProductPP.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
agent.sources.sourceMProductPP.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
#agent.sources.sourceMProductPP.hibernate.table = M_PRODUCT_PP
agent.sources.sourceMProductPP.run.query.delay=10000
agent.sources.sourceMProductPP.enclose.by.quotes = false

agent.sources.sourceMProductPP.status.file.path = /tmp/flume/apache-flume-1.5.2-bin
agent.sources.sourceMProductPP.status.file.name = agent.sqlSource.status.mProductPP

agent.sources.sourceMProductPP.inputCharset = UTF-8

# Custom query
agent.sources.sourceMProductPP.start.from = 0
agent.sources.sourceMProductPP.custom.query = SELECT PRODUCT_PP,TXTSH FROM M_PRODUCT_PP WHERE "TO_NUMBER"(PRODUCT_PP) > $@$

agent.sources.sourceMProductPP.batch.size = 1
#1000
agent.sources.sourceMProductPP.max.rows = 1
#1000
agent.sources.sourceMProductPP.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sourceMProductPP.hibernate.c3p0.min_size=1
agent.sources.sourceMProductPP.hibernate.c3p0.max_size=10

##############################

agent.channels.channelMProductPL.type = memory
agent.channels.channelMProductPL.capacity = 1000
agent.channels.channelMProductPL.transactionCapacity = 1000
agent.channels.channelMProductPL.byteCapacityBufferPercentage = 20
agent.channels.channelMProductPL.byteCapacity = 1600000

agent.channels.channelMProductPP.type = memory
agent.channels.channelMProductPP.capacity = 1000
agent.channels.channelMProductPP.transactionCapacity = 1000
agent.channels.channelMProductPP.byteCapacityBufferPercentage = 20
agent.channels.channelMProductPP.byteCapacity = 1600000

agent.sinks.sinkMProductPL.type = org.apache.flume.sink.clickhouse.ClickHouseSink
agent.sinks.sinkMProductPL.host = http://10.122.1.229
agent.sinks.sinkMProductPL.port = 8123
agent.sinks.sinkMProductPL.database = store_analysis
agent.sinks.sinkMProductPL.table = M_PRODUCT_PL
agent.sinks.sinkMProductPL.batchSize = 1
#3000
agent.sinks.sinkMProductPL.format = TabSeparated

agent.sinks.sinkMProductPP.type = org.apache.flume.sink.clickhouse.ClickHouseSink
agent.sinks.sinkMProductPP.host = http://10.122.1.229
agent.sinks.sinkMProductPP.port = 8123
agent.sinks.sinkMProductPP.database = store_analysis
agent.sinks.sinkMProductPP.table = M_PRODUCT_PP
agent.sinks.sinkMProductPP.batchSize = 1
#3000
agent.sinks.sinkMProductPP.format = TabSeparated

agent.sinks.sinkMProductPL.channel = channelMProductPL
agent.sources.sourceMProductPL.channels=channelMProductPL

agent.sinks.sinkMProductPP.channel = channelMProductPP
agent.sources.sourceMProductPP.channels=channelMProductPP

參數根據具體狀況自定義設置。

執行同步

在flume的bin目錄下,執行命令:

./flume-ng agent --conf ../conf -conf-file ../conf/oracle-clickhouse.conf -name agent -Dflume.root.logger=INFO,console

若是要後臺執行,加nohup &

至此,同步工做已完成!

相關文章
相關標籤/搜索