各項配置能夠直接參考canal官方文檔,因爲1.1.4支持的es版本爲6.x以上,其餘版本須要替換依賴從新編譯client-adapter.elasticsearch模塊,如下爲es5.5.0低版本兼容方案以及我的踩的坑。html
修改client-adapter模塊的pom.xml,將es的依賴修改成es版本適配的5.5.0。java
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.5.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>6.4.3</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.4.3</version> </dependency>
因爲5.5.0版本無rest-client,所以只修改transport相關版本,後續僅測試tcp鏈接es同步,rest不肯定兼容性。mysql
ESConnection.java: transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(host.substring(0, i)), Integer.parseInt(host.substring(i + 1)))); 修改成 transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host.substring(0, i)), Integer.parseInt(host.substring(i + 1))));
mvn clean install -Dmaven.test.skip -Denv=release
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure: Compilation failure: [ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未報告的異常錯誤java.io.IOException; 必須對其進行捕獲或聲明以便拋出 [ERROR] canal/client-adapter/elasticsearch/src/main/java/org/elasticsearch/client/RestHighLevelClientExt.java:[24,13] 方法引用無效
5.x版本的transportclient不兼容rest-client,註釋掉rest致使的異常。git
RestHighLevelClientExt::getMapping @Deprecated public static GetMappingsResponse getMapping(RestHighLevelClient restHighLevelClient, GetMappingsRequest getMappingsRequest, RequestOptions options) throws IOException,IllegalAccessException { throw new IllegalAccessException("es 5.x unsupport this method, use tcp mode"); } ESConnection::getMapping ... if (mode == ESClientMode.TRANSPORT) { ... } else { try { GetMappingsRequest request = new GetMappingsRequest(); request.indices(index); GetMappingsResponse response; // try { // response = restHighLevelClient // .indices() // .getMapping(request, RequestOptions.DEFAULT); // // 6.4如下版本直接使用該接口會報錯 // } catch (Exception e) { // logger.warn("Low ElasticSearch version for getMapping"); response = RestHighLevelClientExt.getMapping(restHighLevelClient, request, RequestOptions.DEFAULT); // } mappings = response.mappings(); } catch (NullPointerException e) { throw new IllegalArgumentException("Not found the mapping info of index: " + index); } catch (IOException | IllegalAccessException e) {//此處增長一個異常捕獲 logger.error(e.getMessage(), e); return null; } ... }
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.7.0:compile (default-compile) on project client-adapter.elasticsearch: Compilation failure [ERROR] canal/client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java:[502,75] 未報告的異常錯誤java.io.IOException; 必須對其進行捕獲或聲明以便拋出
緣由以下,getSourceAsMap方法在6.4.3拋出runtimeException(ElasticsearchParseException是子類),而5.5.0版本拋出IOException,須要顯示捕獲。github
//6.4.3拋出的異常時runtimeException public Map<String, Object> getSourceAsMap() throws ElasticsearchParseException { return this.sourceAsMap(); } //5.5.0版本 public Map<String, Object> getSourceAsMap() throws IOException { return sourceAsMap(); }
修改ESTemplate的getEsType方法捕獲異常便可sql
ESTemplate::getEsType Map<String, Object> sourceMap = null; try{ sourceMap = mappingMetaData.getSourceAsMap(); }catch (IOException e){ logger.error(e.getMessage(), e); return null; }
編譯後,替換canal.adapter-1.1.4\plugin下的 client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar 文件。shell
執行deploy和adapter啓動腳本便可。apache
啓動後報錯:app
2020-07-07 14:36:08.223 [main] INFO org.elasticsearch.plugins.PluginsService - loaded plugin [org.elasticsearch.transport.Netty4Plugin] 2020-07-07 14:36:08.473 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es failed java.lang.RuntimeException: java.lang.IllegalArgumentException: unknown setting [mode] please check that any required plugins are installed, or check the breaking changes documentation for removed settings at com.alibaba.otter.canal.client.adapter.es.ESAdapter.init(ESAdapter.java:137) at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:172)
查看canal源碼,未發現拋出異常日誌的代碼,再搜索依賴的包,發現異常是es建立transportClient時拋出的異常,因而猜想是canal-adpapter配置中的某個mode參數被引入建立transportClient的setting中致使建立失敗,因而註釋掉,並重啓。curl
- name: es hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode properties: #mode: transport # transport # or rest //註釋了這行,是1.1.4的坑,代碼中properties下的全部配置都會被傳入transportClient的setting中,rest模式則不會,因此transport模式除了cluster.name外的配置會致使es鏈接建立失敗 # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch
重啓後,向mysql插入數據後,adapter打印出日誌
[pool-2-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":21,"name":"測試用戶","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777991,"type":"INSERT"} [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Destination: example_instance, database:canal, table:class, type:INSERT, affected index count: 1 [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Prepared to sync index: canal_test, destination: example_instance [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Single table insert to es index, destination:example_instance, table: class, index: canal_test, id: 21 [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 1 ms,destination: example_instance, es index: canal_test [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync completed: canal_test, destination: example_instance [pool-2-thread-1] TRACE c.a.otter.canal.client.adapter.es.service.ESSyncService - Sync elapsed time: 2 ms, affected indexes count:1, destination: example_instance [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":21,"name":"測試用戶","time":null}],"database":"canal","destination":"example_instance","es":1594347777000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"class","ts":1594347777993,"type":"INSERT"} Affected indexes: canal_test
查看es數據
curl 127.0.0.1:9200/canal_test/canal/21 { "_index": "canal_test", "_type": "canal", "_id": "21", "_version": 1, "found": true, "_source": { "name": "測試用戶" } }