clickhouse在上一篇轉載的博客中已初步介紹,最近在公司項目中,遇到了數據庫大量數據查詢慢的問題,藉此來實戰clickhouse,本文重點介紹數據同步。java
接下來重點講一下,使用flume同步oracle數據至clickhouse。git
wget http://www.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gzgithub
tar zxvf apache-flume-1.5.2-bin.tar.gzsql
https://github.com/keedio/flume-ng-sql-source數據庫
(1)爲了能正確同步到clickhouse,需修改代碼,如圖: apache
將默認分隔符由‘,’改成‘\t’;(不改的話,插入數據到clickhouse會報錯)oracle
(2)編譯打包:mvn package -Dmaven.test.skip=truemaven
(3)將打包的jar包flume-ng-sql-source-1.5.2.jar,上傳至flume的lib目錄下。ide
這個相對麻煩一些,網上沒有詳細的資料,我這裏儘可能詳細敘述,有問題能夠聯繫我。工具
https://github.com/camathieu/flume-ng-kafka-sink.git
這裏爲何下載flume-ng-kafka-sink,主要做爲參考,將項目中的kafka-sink改成clickhouse-sink。
(1)將org.apache.flume.sink.kafka包下的KafkaSink所有屏蔽。
(2)修改pom,如圖:
內容以下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <name>Apache Flume ClickHouse Sink</name> <description>Kafka 0.8+ sink for Apache Flume NG</description> <groupId>org.apache.flume.sink.clickhouse</groupId> <artifactId>flume-clickhouse-sink</artifactId> <version>1.5.2</version> <packaging>jar</packaging> <parent> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sinks</artifactId> <version>1.4.0</version> </parent> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>19.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
(3)下載clickhouse-sink代碼
下載地址:https://reviews.apache.org/r/50692/diff/1#2
以下圖,將代碼拷貝至本地
(4)修改代碼
在測試過程當中,遇到了同步的數據中文亂碼的問題,須要修改類ClickHouseSink,封裝HttpEntity的時候指定StreamUtils.UTF_8格式,如圖:
其中涉及到了三個工具類(StreamUtils、StringUtils、Utils),直接複製拷貝到工程中便可。
這三個類,能夠從項目中找到:https://github.com/yandex/clickhouse-jdbc.git
執行 mvn package,生成包:flume-clickhouse-sink-1.5.2.jar
將打包的jar包flume-ng-sql-source-1.5.2.jar,上傳至flume的lib目錄下。
在flume的conf目錄下建立配置文件:oracle-clickhouse.conf
配置文件格式參考flume官網文檔,這裏不做過多介紹,直接上示例,我這裏同步了兩張表,每張表都有一個channel、source和sink:
agent.channels = channelMProductPL channelMProductPP agent.sources = sourceMProductPL sourceMProductPP agent.sinks = sinkMProductPL sinkMProductPP ###########sql source################# # For each Test of the sources, the type is defined agent.sources.sourceMProductPL.type = org.keedio.flume.source.SQLSource agent.sources.sourceMProductPL.hibernate.connection.url = jdbc:oracle:thin:@10.10.10.10:1521:orcl # Hibernate Database connection properties agent.sources.sourceMProductPL.hibernate.connection.user = user agent.sources.sourceMProductPL.hibernate.connection.password = password agent.sources.sourceMProductPL.hibernate.connection.autocommit = true agent.sources.sourceMProductPL.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect agent.sources.sourceMProductPL.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver #agent.sources.sourceMProductPL.hibernate.table = M_PRODUCT_PL agent.sources.sourceMProductPL.run.query.delay=10000 agent.sources.sourceMProductPL.enclose.by.quotes = false agent.sources.sourceMProductPL.status.file.path = /tmp/flume/apache-flume-1.5.2-bin agent.sources.sourceMProductPL.status.file.name = agent.sqlSource.status.mProductPL agent.sources.sourceMProductPL.inputCharset = UTF-8 # Custom query agent.sources.sourceMProductPL.start.from = 0 agent.sources.sourceMProductPL.custom.query = SELECT PRODUCT_PL,TXTSH FROM M_PRODUCT_PL WHERE "TO_NUMBER"(PRODUCT_PL) > $@$ agent.sources.sourceMProductPL.batch.size = 1 #1000 agent.sources.sourceMProductPL.max.rows = 1 #1000 agent.sources.sourceMProductPL.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agent.sources.sourceMProductPL.hibernate.c3p0.min_size=1 agent.sources.sourceMProductPL.hibernate.c3p0.max_size=10 ############################## # For each Test of the sources, the type is defined agent.sources.sourceMProductPP.type = org.keedio.flume.source.SQLSource agent.sources.sourceMProductPP.hibernate.connection.url = jdbc:oracle:thin:@10.10.10.10:1521:orcl # Hibernate Database connection properties agent.sources.sourceMProductPP.hibernate.connection.user = user agent.sources.sourceMProductPP.hibernate.connection.password = password agent.sources.sourceMProductPP.hibernate.connection.autocommit = true agent.sources.sourceMProductPP.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect agent.sources.sourceMProductPP.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver #agent.sources.sourceMProductPP.hibernate.table = M_PRODUCT_PP agent.sources.sourceMProductPP.run.query.delay=10000 agent.sources.sourceMProductPP.enclose.by.quotes = false agent.sources.sourceMProductPP.status.file.path = /tmp/flume/apache-flume-1.5.2-bin agent.sources.sourceMProductPP.status.file.name = agent.sqlSource.status.mProductPP agent.sources.sourceMProductPP.inputCharset = UTF-8 # Custom query agent.sources.sourceMProductPP.start.from = 0 agent.sources.sourceMProductPP.custom.query = SELECT PRODUCT_PP,TXTSH FROM M_PRODUCT_PP WHERE "TO_NUMBER"(PRODUCT_PP) > $@$ agent.sources.sourceMProductPP.batch.size = 1 #1000 agent.sources.sourceMProductPP.max.rows = 1 #1000 agent.sources.sourceMProductPP.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agent.sources.sourceMProductPP.hibernate.c3p0.min_size=1 agent.sources.sourceMProductPP.hibernate.c3p0.max_size=10 ############################## agent.channels.channelMProductPL.type = memory agent.channels.channelMProductPL.capacity = 1000 agent.channels.channelMProductPL.transactionCapacity = 1000 agent.channels.channelMProductPL.byteCapacityBufferPercentage = 20 agent.channels.channelMProductPL.byteCapacity = 1600000 agent.channels.channelMProductPP.type = memory agent.channels.channelMProductPP.capacity = 1000 agent.channels.channelMProductPP.transactionCapacity = 1000 agent.channels.channelMProductPP.byteCapacityBufferPercentage = 20 agent.channels.channelMProductPP.byteCapacity = 1600000 agent.sinks.sinkMProductPL.type = org.apache.flume.sink.clickhouse.ClickHouseSink agent.sinks.sinkMProductPL.host = http://10.122.1.229 agent.sinks.sinkMProductPL.port = 8123 agent.sinks.sinkMProductPL.database = store_analysis agent.sinks.sinkMProductPL.table = M_PRODUCT_PL agent.sinks.sinkMProductPL.batchSize = 1 #3000 agent.sinks.sinkMProductPL.format = TabSeparated agent.sinks.sinkMProductPP.type = org.apache.flume.sink.clickhouse.ClickHouseSink agent.sinks.sinkMProductPP.host = http://10.122.1.229 agent.sinks.sinkMProductPP.port = 8123 agent.sinks.sinkMProductPP.database = store_analysis agent.sinks.sinkMProductPP.table = M_PRODUCT_PP agent.sinks.sinkMProductPP.batchSize = 1 #3000 agent.sinks.sinkMProductPP.format = TabSeparated agent.sinks.sinkMProductPL.channel = channelMProductPL agent.sources.sourceMProductPL.channels=channelMProductPL agent.sinks.sinkMProductPP.channel = channelMProductPP agent.sources.sourceMProductPP.channels=channelMProductPP
參數根據具體狀況自定義設置。
在flume的bin目錄下,執行命令:
./flume-ng agent --conf ../conf -conf-file ../conf/oracle-clickhouse.conf -name agent -Dflume.root.logger=INFO,console
若是要後臺執行,加nohup &
至此,同步工做已完成!