中間件——canal小記

接到個小需求,將mysql的部分數據增量同步到es,可是不只僅是使用canal而已,總體的流程是mysql>>canal>>flume>>kafka>>es,說難倒也不難,只是作起來碰到的坑實在太多,特別是中間套了那麼多中間件,出了故障找起來真的特別麻煩。html

先來了解一下MySQL的主從備份:mysql

從上層來看,複製分紅三步:
master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
slave將master的binary log events拷貝到它的中繼日誌(relay log);
slave重作中繼日誌中的事件,將改變反映它本身的數據。spring

問題一:測試環境一切正常,可是正式環境中,這幾個字段全爲0,不知道爲何

最後發現是溝通問題。。。sql

排查過程:數據庫

  1. 起初,懷疑是es的問題,會不會是string轉爲long中出現了問題,PUT了個,無異常,這種狀況排除。
  2. 再而後覺得是代碼有問題,但是想了下,rowData.getAfterColumnsList().forEach(column -> data.put(column.getName(), column.getValue()))這句不可能有什麼其餘的問題啊,並且測試環境中一切都是好好的。
  3. canal安裝出錯,從新查看了一次canal.properties和instance.properties,並無發現配置錯了啥,若是錯了,那爲何只有那幾個字段出現異常,其餘的都是好好的,鬱悶。並且,用測試環境的canal配置生產中的數據庫,而後本地調試,結果依舊同樣。可能問題出在mysql。

最後發現,竟然是溝通問題。。。。測試環境中是從正式環境導入的,用的insert,但是在正式環境裏,用的確實insert後update字段,以後發現竟然還用delete,,,,暈。。。。以前明確問過了只更新insert的,人與人之間的信任在哪裏。。。。less

問題二:canal.properties中四種模式的差異

簡單的說,canal維護一份增量訂閱和消費關係是依靠解析位點和消費位點的,目前提供了一下四種配置,一開始我也是懵的。分佈式

#canal.instance.global.spring.xml = classpath:spring/local-instance.xml
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

local-instance
我也不知道啥。。ide

memory-instance
全部的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啓後又會回到初始位點進行解析
特色:速度最快,依賴最少(不須要zookeeper)
場景:通常應用在quickstart,或者是出現問題後,進行數據分析的場景,不該該將其應用於生產環境。
我的建議是調試的時候使用該模式,即新增數據的時候,客戶端能立刻捕獲到改日誌,可是因爲位點一直都是canal啓動的時候最新的,不適用與生產環境。post

file-instance
全部的組件(parser , sink , store)都選擇了基於file持久化模式,注意,不支持HA機制.
特色:支持單機持久化
場景:生產環境,無HA需求,簡單可用.
採用該模式的時候,若是關閉了canal,會在destination中生成一個meta.dat,用來記錄關鍵信息。若是想要啓動canal以後立刻訂閱最新的位點,須要把該文件刪掉。
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":".\.."},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.6.71","port":3306}},"postion":{"included":false,"journalName":"binlog.008335","position":221691106,"serverId":88888,"timestamp":1524294834000}}}],"destination":"example"}測試

default-instance
全部的組件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集羣共享。
特色:支持HA
場景:生產環境,集羣化部署.
該模式會記錄集羣中全部運行的節點,主要用與HA主備模式,節點中的數據以下,能夠關閉某一個canal服務來查看running的變化信息。

問題三:若是要訂閱的是mysql的從庫改怎麼作?

生產環境中的主庫是不能隨便重啓的,因此訂閱的話必須訂閱mysql主從的從庫,而從庫中是默認下只將主庫的操做寫進中繼日誌,並寫到本身的二進制日誌的,因此須要讓其成爲canal的主庫,必須讓其將日誌也寫到本身的二進制日誌裏面。處理方法:修改/etc/my.cnf,增長一行log_slave_updates=1,重啓數據庫後就能夠了。

問題四:部分字段沒有更新

最終版本是以mysql的id爲es的主鍵,用canal同步到flume,再由flume到kafka,而後再由一箇中間件寫到es裏面去,結果發現,一天之中,會有那麼一段時間得出的結果少一丟丟,甚至是驟降,如圖。不得不從頭開始排查狀況,canal到flume,加了canal的重試,以及發送到flume的重試機制,沒有報錯,全部數據正常發送。flume到kafka不敢懷疑,畢竟公司一直在用,怎麼可能有問題。kafka到es的中間件?組長寫的,並且一直在用,不可能==最後確認的是flume到kafka,kafka的parition處理速度不一樣,

check一下flume的文檔,能夠知道

Property Name Description
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId.

大概意思是flume若是不自定義partitionIdHeader,那麼消息將會被分佈式kafka的partion處理,kafka自己的設置就是高吞吐量的消息系統,同一partion的消息是能夠按照順序發送的,可是多個partion就不肯定了,若是須要將消息按照順序發送,那麼就必需要指定一個parition,即在flume的配置文件中添加:a1.channels.channel1.partitionIdHeader=1,指定parition便可。所有修改完以後,在kibana查看一下曲線:

用sql在數據庫確認了下,終於一致了,不容易。。。

相關文章
相關標籤/搜索