(1)安裝mysql
(2)開啓mysql binlog row模式,並啓動mysql
(3)安裝jdk
(4)安裝Elasticsearch並啓動(我安裝的是6.4.0,主要目前canal adapter1.1.3還不支持7.0.0的版本)
(5)安裝kibana並啓動
(6)安裝並啓動canal-server
(7)安裝並啓動canal-adapterhtml
https://dev.mysql.com/downloa...java
wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
目前版本已經很高了,可是我使用的是57
安裝mysql源node
yum -y install mysql57-community-release-el7-11.noarch.rpm
查看效果:mysql
yum repolist enabled | grep mysql.*
yum install mysql-community-server
systemctl start mysqld.service
查看mysql服務的狀態:linux
systemctl status mysqld.service
grep "password" /var/log/mysqld.log
登陸:git
mysql -u root -p
數據庫沒有受權,只支持localhost本地訪問github
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION; FLUSH PRIVILEGES;
用戶名:root
密碼:123456
指向ip:%表明全部Ip,此處也能夠輸入Ip來指定Ipspring
找到my.cnf文件,我本地目錄是/etc/my.cnf
添加便可sql
log-bin=mysql-bin binlog-format=ROW server-id=1
而後重啓mysql,檢查一下binlog是否正確啓動數據庫
show variables like '%log_bin%';
我裝的是jdk版本是1.8.0_202
下載網址:
https://www.oracle.com/techne...
(1)將jdk-8u202-linux-x64.tar.gz放入/usr/local目錄
(2)解壓縮等一系列處理
tar -xzvf jdk-8u202-linux-x64.tar.gz mv jdk-8u202-linux-x64 jdk rm -rf jdk-8u202-linux-x64.tar.gz
命令執行完成以後在/usr/local目錄下就會生成一個jdk目錄
(3)配置環境變量
vi /etc/profile 增長: export JAVA_HOME=/usr/local/jdk export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar export PATH=$JAVA_HOME/bin:$PATH
(4)檢查JDK是否安裝成功
java -version
官網地址:https://www.elastic.co/downlo...
執行以下命令,對於安裝包也能夠手動下載以後上傳
cd /usr/local wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.0-linux-x86_64.tar.gz tar -xzvf elasticsearch-6.4.0-linux-x86_64.tar.gz mv elasticsearch-6.4.0-linux-x86_64 elasticsearch rm -rf elasticsearch-6.4.0-linux-x86_64.tar.gz
命令執行完成以後在/usr/local目錄下就會生成一個elasticsearch目錄
因爲elasticsearch不能使用root帳戶啓動。
下面執行以下命令:
useradd elasticsearch chown -R elasticsearch /usr/local/elasticsearch su elasticsearch
使用elasticsearch用戶來啓動ES
vim /etc/security/limits.conf
增長:
* soft nofile 65536 * hard nofile 65536 * soft nproc 2048 * hard nproc 4096 #鎖住swapping所以須要在這個配置文件下再增長兩行代碼 elasticsearch soft memlock unlimited elasticsearch hard memlock unlimited
vim /etc/sysctl.conf
增長:
vm.max_map_count=655360 fs.file-max=655360
注意:以後須要執行一句命令sysctl -p使系統配置生效(使用root用戶)
vim /usr/local/elasticsearch/config/elasticsearch.yml
# ======================== Elasticsearch Configuration ========================= # # NOTE: Elasticsearch comes with reasonable defaults for most settings. # Before you set out to tweak and tune the configuration, make sure you # understand what are you trying to accomplish and the consequences. # # The primary way of configuring a node is via this file. This template lists # the most important settings you may want to configure for a production cluster. # # Please consult the documentation for further information on configuration options: # https://www.elastic.co/guide/en/elasticsearch/reference/index.html # # ---------------------------------- Cluster ----------------------------------- # # Use a descriptive name for your cluster: # cluster.name: my-application # # ------------------------------------ Node ------------------------------------ # # Use a descriptive name for the node: # node.name: node-1 # # Add custom attributes to the node: # #node.attr.rack: r1 # # ----------------------------------- Paths ------------------------------------ # # Path to directory where to store the data (separate multiple locations by comma): # path.data: /usr/local/elasticsearch-6.4.0/data # # Path to log files: # path.logs: /usr/local/elasticsearch-6.4.0/logs # # ----------------------------------- Memory ----------------------------------- # # Lock the memory on startup: # #bootstrap.memory_lock: true # # Make sure that the heap size is set to about half the memory available # on the system and that the owner of the process is allowed to use this # limit. # # Elasticsearch performs poorly when the system is swapping the memory. # # ---------------------------------- Network ----------------------------------- # # Set the bind address to a specific IP (IPv4 or IPv6): # network.host: 192.168.254.131 # # Set a custom port for HTTP: # http.port: 9200 # # For more information, consult the network module documentation. # # --------------------------------- Discovery ---------------------------------- # # Pass an initial list of hosts to perform discovery when new node is started: # The default list of hosts is ["127.0.0.1", "[::1]"] # discovery.zen.ping.unicast.hosts: ["192.168.254.131"] # # Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1): # #discovery.zen.minimum_master_nodes: # # For more information, consult the zen discovery module documentation. # # ---------------------------------- Gateway ----------------------------------- # # Block initial recovery after a full cluster restart until N nodes are started: # #gateway.recover_after_nodes: 3 # # For more information, consult the gateway module documentation. # # ---------------------------------- Various ----------------------------------- # # Require explicit names when deleting indices: # #action.destructive_requires_name: true transport.tcp.port: 9300 transport.tcp.compress: true http.cors.enabled: true http.cors.allow-origin: "*"
(3)啓動elasticsearch
cd /usr/local/elasticsearch ./bin/elasticsearch -d
檢查是否啓動成功:
curl http://192.168.254.131:9200
官網地址:https://www.elastic.co/downlo...
執行以下命令,對於安裝包也能夠手動下載以後上傳
cd /usr/local wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.0-linux-x86_64.tar.gz tar -xzvf kibana-6.4.0-linux-x86_64.tar.gz mv kibana-6.4.0-linux-x86_64 kibana rm -rf kibana-6.4.0-linux-x86_64.tar.gz
命令執行完成以後在/usr/local目錄下就會生成一個kibana目錄
vim /usr/local/kibana/config/kibana.yml
# Kibana is served by a back end server. This setting specifies the port to use. server.port: 5601 # Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values. # The default is 'localhost', which usually means remote machines will not be able to connect. # To allow connections from remote users, set this parameter to a non-loopback address. server.host: "192.168.254.131" # Enables you to specify a path to mount Kibana at if you are running behind a proxy. # Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath # from requests it receives, and to prevent a deprecation warning at startup. # This setting cannot end in a slash. #server.basePath: "" # Specifies whether Kibana should rewrite requests that are prefixed with # `server.basePath` or require that they are rewritten by your reverse proxy. # This setting was effectively always `false` before Kibana 6.3 and will # default to `true` starting in Kibana 7.0. #server.rewriteBasePath: false # The maximum payload size in bytes for incoming server requests. #server.maxPayloadBytes: 1048576 # The Kibana server's name. This is used for display purposes. #server.name: "your-hostname" # The URL of the Elasticsearch instance to use for all your queries. elasticsearch.url: "http://192.168.254.131:9200" # When this setting's value is true Kibana uses the hostname specified in the server.host # setting. When the value of this setting is false, Kibana uses the hostname of the host # that connects to this Kibana instance. #elasticsearch.preserveHost: true # Kibana uses an index in Elasticsearch to store saved searches, visualizations and # dashboards. Kibana creates a new index if the index doesn't already exist. kibana.index: ".kibana6" # The default application to load. #kibana.defaultAppId: "home" # If your Elasticsearch is protected with basic authentication, these settings provide # the username and password that the Kibana server uses to perform maintenance on the Kibana # index at startup. Your Kibana users still need to authenticate with Elasticsearch, which # is proxied through the Kibana server. #elasticsearch.username: "user" #elasticsearch.password: "pass" # Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively. # These settings enable SSL for outgoing requests from the Kibana server to the browser. #server.ssl.enabled: false #server.ssl.certificate: /path/to/your/server.crt #server.ssl.key: /path/to/your/server.key # Optional settings that provide the paths to the PEM-format SSL certificate and key files. # These files validate that your Elasticsearch backend uses the same key files. #elasticsearch.ssl.certificate: /path/to/your/client.crt #elasticsearch.ssl.key: /path/to/your/client.key # Optional setting that enables you to specify a path to the PEM file for the certificate # authority for your Elasticsearch instance. #elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ] # To disregard the validity of SSL certificates, change this setting's value to 'none'. #elasticsearch.ssl.verificationMode: full # Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of # the elasticsearch.requestTimeout setting. #elasticsearch.pingTimeout: 1500 # Time in milliseconds to wait for responses from the back end or Elasticsearch. This value # must be a positive integer. #elasticsearch.requestTimeout: 30000 # List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side # headers, set this value to [] (an empty list). #elasticsearch.requestHeadersWhitelist: [ authorization ] # Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten # by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration. #elasticsearch.customHeaders: {} # Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable. #elasticsearch.shardTimeout: 30000 # Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying. #elasticsearch.startupTimeout: 5000 # Logs queries sent to Elasticsearch. Requires logging.verbose set to true. #elasticsearch.logQueries: false # Specifies the path where Kibana creates the process ID file. #pid.file: /var/run/kibana.pid # Enables you specify a file where Kibana stores log output. #logging.dest: stdout # Set the value of this setting to true to suppress all logging output. #logging.silent: false # Set the value of this setting to true to suppress all logging output other than error messages. #logging.quiet: false # Set the value of this setting to true to log all events, including system usage information # and all requests. #logging.verbose: false # Set the interval in milliseconds to sample system and process performance # metrics. Minimum is 100ms. Defaults to 5000. #ops.interval: 5000 # The default locale. This locale can be used in certain circumstances to substitute any missing # translations. #i18n.defaultLocale: "en"
cd /usr/local/kibana nohup ./bin/kibana &
檢查是否啓動成功
在瀏覽器中打開http://192.168.254.131:5601
詳情請查詢官網文檔:
https://github.com/alibaba/ca...
直接下載 訪問:https://github.com/alibaba/canal/releases ,會列出全部歷史的發佈版本包 下載方式,好比以1.0.17版本爲例子: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz or 本身編譯 git clone git@github.com:alibaba/canal.git cd canal; mvn clean install -Dmaven.test.skip -Denv=release 編譯完成後,會在根目錄下產生target/canal.deployer-$version.tar.gz
mkdir /usr/local/canal tar zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
cd /usr/local/canal vim conf/example/instance.properties
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.254.131:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=root canal.instance.dbPassword=123456 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
cd /usr/local/canal ./bin/startup.sh
cat logs/canal/canal.log 2019-05-03 10:58:31.938 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2019-05-03 10:58:32.106 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-05-03 10:58:32.120 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations 2019-05-03 10:58:32.143 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server. 2019-05-03 10:58:32.277 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.254.131:11111] 2019-05-03 10:58:34.235 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2019-05-03 10:58:35.470 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set 2019-05-03 10:58:36.317 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2019-05-03 10:58:36.317 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2019-05-03 10:58:37.106 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ...... 2019-05-03 10:58:37.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2019-05-03 10:58:37.241 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position by switch ::1556597413000 2019-05-03 10:58:39.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4450,serverId=1,gtid=,timestamp=1556596874000] cost : 1915ms , the next step is binlog dump
cat logs/example/example.log
訪問:https://github.com/alibaba/canal/releases ,會列出全部歷史的發佈版本包 下載方式,好比以1.0.17版本爲例子: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
mkdir /usr/local/canal-adapter tar canal.adapter-1.1.3.tar.gz -C /usr/local/canal-adapter
cd /usr/local/canal-adapter vim conf/application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 192.168.254.131:11111 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://192.168.254.131:3306/mytest?useUnicode=true username: root password: 123456 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es hosts: 192.168.254.131:9300 properties: cluster.name: my-application
vim conf/es/mytest_user.yml
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: mytest_user _type: _doc _id: _id upsert: true # pk: id sql: "select a.id as _id, a.name, a.role_id, a.c_time from user a" # objFields: # _labels: array:; # etlCondition: "where a.c_time>='{0}'" commitBatch: 3000
create database mytest; use mytest; create table user ( `id` int(10) NOT NULL, `name` varchar(100) DEFAULT NULL, `role_id` int(10) NOT NULL, `c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) );
PUT /mytest_user { "mappings": { "_doc": { "properties": { "name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "role_id": { "type": "long" }, "c_time": { "type": "date" } } } } }
cd /usr/local/canal-adapter ./bin/startup.sh
查看日誌:
cat logs/adapter/adapter.log
沒有數據更新前
GET /mytest_user/_search { "took": 1, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 0, "max_score": null, "hits": [] } }
insert user(id, name, role_id) values(7, "test", 7);
GET /mytest_user/_doc/7 { "_index": "mytest_user", "_type": "_doc", "_id": "7", "_version": 1, "found": true, "_source": { "name": "test", "role_id": 7, "c_time": "2019-05-04T06:11:31-05:00" } }
update user set name = 'zhengguo' where id = 7;
GET /mytest_user/_doc/7 { "_index": "mytest_user", "_type": "_doc", "_id": "7", "_version": 2, "found": true, "_source": { "name": "zhengguo", "role_id": 7, "c_time": "2019-05-04T06:11:31-05:00" } }
delete from user where id = 7;
GET /mytest_user/_doc/7 { "_index": "mytest_user", "_type": "_doc", "_id": "7", "found": false }
能夠看到操做都成功了。
以後可能canal會優化掉
目前若是使用adapter1.1.3增量同步的話,若是Elasticsearch的版本是7.X.X的,那麼在數據增量同步的時候,會報ESSyncService - sync error, es index: mytest_user, DML : Dml{destination='example', database='mytest', table='user', type='INSERT', es=1556597413000, ts=1556597414139, sql='', data=[{id=4, name=junge, role_id=4, c_time=2019-04-30 00:10:13.0, c_utime=2019-04-30 00:10:13.0}], old=null} ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{lTIHs6ZsTe-PqHs9CToQYQ}{192.168.254.131}{192.168.254.131:9300}]] 沒法鏈接ES的錯誤。
也就是目前還不支持7版本的增量同步。更換成6.X.X就OK了。