elasticsearch-jdbc實現MySQL同步到ElasticSearch深刻詳解

1.如何實現mysql與elasticsearch的數據同步?java

逐條轉換爲json顯然不合適,須要藉助第三方工具或者本身實現。核心功能點:同步增、刪、改、查同步。node

二、mysql與elasticsearch同步的方法有哪些?優缺點對比?mysql

目前該領域比較牛的插件有:git

1)、elasticsearch-jdbc,嚴格意義上它已經不是第三方插件。已經成爲獨立的第三方工具。https://github.com/jprante/elasticsearch-jdbc 2)、elasticsearch-river-mysql插件 https://github.com/scharron/elasticsearch-river-mysql 3)、go-mysql-elasticsearch(國內做者siddontang) https://github.com/siddontang/go-mysql-elasticsearchgithub

1-3同步工具/插件對比:sql

go-mysql-elasticsearch仍處理開發不穩定階段。 爲何選擇elasticsearch-jdbc而不是elasticsearch-river-mysql插件的緣由?(參考:http://stackoverflow.com/questions/23658534/using-elasticsearch-river-mysql-to-stream-data-from-mysql-database-to-elasticsea) 1)通用性角度:elasticsearch-jdbc更通用, 2)版本更新角度:elasticsearch-jdbc GitHub活躍度很高,最新的版本2.3.3.02016年5月28日兼容Elasticsearch2.3.3版本。 而elasticsearch-river-mysql 2012年12月13往後便再也不更新。 綜上,選擇elasticsearch-jdbc做爲mysql同步Elasticsearch的工具理所固然。數據庫

elasticsearch-jdbc的缺點與不足(他山之石):json

1)、go-mysql-elasticsearch做者siddontang在博客提到的: elasticsearch-river-jdbc的功能是很強大,但並無很好的支持增量數據更新的問題,它須要對應的表只增不減,而這個幾乎在項目中是不可能辦到的。 http://www.jianshu.com/p/05cff717563c 2)、 博主leotse90在博文中提到elasticsearch-jdbc的缺點:那就是刪除操做不能同步(物理刪除)! http://leotse90.com/2015/11/11/ElasticSearch與MySQL數據同步以及修改表結構/app

我截止2016年6月16日沒有測試到,不妄加評論。curl

這裏寫圖片描述

三、elasticsearch-jdbc如何使用?要不要安裝?

3.1 和早期版本不一樣點

elasticsearch-jdbcV2.3.2.0版本不須要安裝。如下筆者使用的elasticsearch也是2.3.2測試。 操做系統:CentOS release 6.6 (Final) 看到這裏,你可能會問早期的版本有什麼不一樣呢?很大不一樣。從我搜集資料來看,不一樣點以下: 1)早期1.x版本,做爲插件,須要安裝。 2)配置也會有不一樣。

3.2 elasticsearch-jdbc使用(同步方法一)

前提: 1)elasticsearch 2.3.2 安裝成功,測試ok。 2)mysql安裝成功,能實現增、刪、改、查。 可供測試的數據庫爲test,表爲cc,具體信息以下:

mysql> select * from cc; +----+------------+ | id | name | +----+------------+ | 1 | laoyang | | 2 | dluzhang | | 3 | dlulaoyang | +----+------------+ 3 rows in set (0.00 sec)

第一步:下載工具。 址:http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.2.0/elasticsearch-jdbc-2.3.2.0-dist.zip 第二步:導入Centos。路徑本身定,筆者放到根目錄下,解壓。unzip elasticsearch-jdbc-2.3.2.0-dist.zip 第三步:設置環境變量。

[root@5b9dbaaa148a /]# vi /etc/profile export JDBC_IMPORTER_HOME=/elasticsearch-jdbc-2.3.2.0

使環境變量生效: [root@5b9dbaaa148a /]# source /etc/profile 第四步:配置使用。詳細參考:https://github.com/jprante/elasticsearch-jdbc 1)、根目錄下新建文件夾odbc_es 以下:

[root@5b9dbaaa148a /]# ll /odbc_es/ drwxr-xr-x 2 root root 4096 Jun 16 03:11 logs -rwxrwxrwx 1 root root 542 Jun 16 04:03 mysql_import_es.sh

2)、新建腳本mysql_import_es.sh,內容以下;

[root@5b9dbaaa148a odbc_es]# cat mysql_import_es.sh ’#!/bin/sh bin=$JDBC_IMPORTER_HOME/bin lib=$JDBC_IMPORTER_HOME/lib echo '{ "type" : "jdbc", "jdbc": { "elasticsearch.autodiscover":true, "elasticsearch.cluster":"my-application", #簇名,詳見:/usr/local/elasticsearch/config/elasticsearch.yml "url":"jdbc:mysql://10.8.5.101:3306/test", #mysql數據庫地址 "user":"root", #mysql用戶名 "password":"123456", #mysql密碼 "sql":"select * from cc", "elasticsearch" : { "host" : "10.8.5.101", "port" : 9300 }, "index" : "myindex", #新的index "type" : "mytype" #新的type } }'| java
-cp "${lib}/*"
-Dlog4j.configurationFile=${bin}/log4j2.xml
org.xbib.tools.Runner
org.xbib.tools.JDBCImporter

3)、爲 mysql_import_es.sh 添加可執行權限。 [root@5b9dbaaa148a odbc_es]# chmod a+x mysql_import_es.sh 4)執行腳本mysql_import_es.sh [root@5b9dbaaa148a odbc_es]# ./mysql_import_es.sh

第五步:測試數據同步是否成功。 使用elasticsearch檢索查詢:

[root@5b9dbaaa148a odbc_es]# curl -XGET 'http://10.8.5.101:9200/myindex/mytype/_search?pretty'

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 3,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWH",
  "_score" : 1.0,
  "_source" : {
  "id" : 1,
  "name" : "laoyang"
  }
  }, {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWI",
  "_score" : 1.0,
  "_source" : {
  "id" : 2,
  "name" : "dluzhang"
  }
  }, {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWJ",
  "_score" : 1.0,
  "_source" : {
  "id" : 3,
  "name" : "dlulaoyang"
  }
  } ]
  }
}

出現以上包含mysql數據字段的信息則爲同步成功。

四、 elasticsearch-jdbc 同步方法二

[root@5b9dbaaa148a odbc_es]# cat mysql_import_es_simple.sh #!/bin/sh bin=$JDBC_IMPORTER_HOME/bin lib=$JDBC_IMPORTER_HOME/lib java
-cp "${lib}/*"
-Dlog4j.configurationFile=${bin}/log4j2.xml
org.xbib.tools.Runner
org.xbib.tools.JDBCImporter statefile.json

[root@5b9dbaaa148a odbc_es]# cat statefile.json

{
"type" : "jdbc",
"jdbc": {
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"my-application",
"url":"jdbc:mysql://10.8.5.101:3306/test",
"user":"root",
"password":"123456",
"sql":"select * from cc",
"elasticsearch" : {
  "host" : "10.8.5.101",
  "port" : 9300
},
"index" : "myindex_2",
"type" : "mytype_2"
}
}

腳本和json文件分開,腳本執行前先加載json文件。 執行方式:直接運行腳本 ./mysql_import_es_simple.sh 便可。

五、Mysql與elasticsearch等價查詢

目標:實現從表cc中查詢id=3的name信息。 1)MySQL中sql語句查詢:

mysql> select * from cc where id=3; +----+------------+ | id | name | +----+------------+ | 3 | dlulaoyang | +----+------------+ 1 row in set (0.00 sec)

2)elasticsearch檢索:

[root@5b9dbaaa148a odbc_es]# curl http://10.8.5.101:9200/myindex/mytype/_search?pretty -d '

{
"filter" : { "term" : { "id" : "3" } }
}'
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 1,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "myindex",
  "_type" : "mytype",
  "_id" : "AVVXKgeEun6ksbtikOWJ",
  "_score" : 1.0,
  "_source" : {
  "id" : 3,
  "name" : "dlulaoyang"
  }
  } ]
  }
}

常見錯誤:

錯誤日誌位置:/odbc_es/logs 日誌內容: [root@5b9dbaaa148a logs]# tail -f jdbc.log [04:03:39,570][INFO ][org.xbib.elasticsearch.helper.client.BaseTransportClient][pool-3-thread-1] after auto-discovery connected to [{5b9dbaaa148a}{aksn2ErNRlWjUECnp_8JmA}{10.8.5.101}{10.8.5.101:9300}{master=true}]

Bug一、[02:46:23,894][ERROR][importer.jdbc ][pool-3-thread-1] error while processing request: cluster state is RED and not YELLOW, from here on, everything will fail! 緣由: you created an index with replicas but you had only one node in the cluster. One way to solve this problem is by allocating them on a second node. Another way is by turning replicas off. 你建立了帶副本 replicas 的索引,可是在你的簇中只有一個節點。

解決方案: 方案一:容許分配‘它們’到第二個節點。 方案二:關閉副本replicas(很是可行)。以下:

curl -XPUT 'localhost:9200/_settings' -d '
{
  "index" : {
  "number_of_replicas" : 0
  }
}

Bug二、[13:00:37,137][ERROR][importer.jdbc ][pool-3-thread-1] error while processing request: no cluster nodes available, check settings {autodiscover=false, client.transport.ignore_cluster_name=false, client.transport.nodes_sampler_interval=5s, client.transport.ping_timeout=5s, cluster.name=elasticsearch, org.elasticsearch.client.transport.NoNodeAvailableException: no cluster nodes available, check 解決方案: 見上腳本中新增: 「elasticsearch.cluster」:」my-application」, #簇名,和/usr/local/elasticsearch/config/elasticsearch.yml 簇名保持一致。

參考: http://stackoverflow.com/questions/11944915/getting-an-elasticsearch-cluster-to-green-cluster-setup-on-os-x

下載地址 http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.3.0/elasticsearch-jdbc-2.3.3.0-dist.zip 解壓, 設置環境變量 修改bin中腳本 運行。

注意:包下載下來沒有包含statefile.json 文件,第一次運行sh文件生成該配置,後面使用都用該文件配置

./mysql-goodstaxi.sh & touch jdbc.log

#!/bin/sh

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=/httx/run/elasticsearch-jdbc-2.3.3.0/bin
lib=/httx/run/elasticsearch-jdbc-2.3.3.0/lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://10.7.*.*:8066/tete?useUnicode=true&characterEncoding=utf-8",
        "statefile" : "statefile.json",
        "schedule" : "0 0-59 0-23 ? * *",
        "user" : "54645",
        "password" : "456456",
        "sql" :  [
            {
                "statement" : "select *,TradeId as _id from Trade where stampDate > ?",
                "parameter" : [ "$metrics.lastexecutionstart" ]
            }
        ],
		 "index_settings" : {
            "analysis" : {
            "analyzer" : {
                "ik" : {
                    "tokenizer" : "ik"
                }
            }
        }
        },
        "elasticsearch" : {
			 "cluster" : "565",
			 "host" : "10.7.*.*",
			 "port" : 9300
		},
        "index" : "goods",
        "type" : "goods",
        "index_settings" : {
			"index" : {
				"number_of_shards" : 1
			}
		}
    }
}
' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporte

16546

#!/bin/sh

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=/httx/6/elasticsearch-jdbc-2.3.3.0/bin
lib=/httx/6/elasticsearch-jdbc-2.3.3.0/lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://10.7.*.*:8066/good?useUnicode=true&characterEncoding=utf-8",
        "statefile" : "statefile.json",
        "schedule" : "0 0-59 0-23 ? * *",
        "user" : "admin",
        "password" : "45456",
        "sql" : "select *,6TradeId as _id from 6Trade",
        "elasticsearch" : {
			 "cluster" : "6",
			 "host" : "10.7.*.*",
			 "port" : 9300
		},
        "index" : "good",
        "type" : "goods",
        "index_settings" : {
			"index" : {
				"number_of_shards" : 1
			}
		}
    }
}
' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter
相關文章
相關標籤/搜索