目前咱們的數據是經過OGG->Kafka->Spark Streaming->HBase。因爲以前咱們發現HBase的列表put沒法保證順序,所以改了程序,若是是在同一個SparkStreaming的批次裏面對同一條數據進行操做,則寫入HBase的數據時間戳就很是相近,只會差幾毫秒,若是是不一樣批次則會差好幾秒。此爲背景。java
如今有一條數據,理應先刪除再插入,可是結果變成了先插入再刪除,結果以下apache
hbase(main):002:0> get 'XDGL_ACCT_PAYMENT_SCHEDULE','e5ad-***', {COLUMN=>'cf1:SQLTYPE',VERSIONS=>10}COLUMN CELL cf1:SQLTYPE timestamp=1498445308420, value=D cf1:SQLTYPE timestamp=1498445301336, value=I
其中,兩條記錄的時間戳換算過來正好相差了7秒
2017-06-26 10:48:21 I
2017-06-26 10:48:28 D oracle
很明顯這兩條數據並無在同一個批次獲得處理,很明顯Spark獲取到數據的前後順序出了點問題。dom
首先SparkStream接收到數據後根據數據的pos排序,而後再根據主鍵排序。從現象看,是SparkStreaming分了兩個批次纔拿到,而SparkStreaming從Kafka拿數據也是順序拿的。那麼出現問題的可能性就只有兩個:
一、OGG發給Kafka的數據順序是錯誤的。
二、OGG發給Kafka的數據順序是正確的,可是發到了不一樣的Kafka Partition。函數
爲了驗證上面的兩個猜測,我把kafka的數據再次獲取出來進行分析。重點分析數據的partition、key、value。
獲得的結果以下:this
能夠看到數據的同一個表數據寫到了不一樣的分區,能夠看到OGG的同一分區下的數據順序是正確的。
正好說明2.1裏面的第二個猜測。看來是OGG寫入的時候並無按照數據的表名寫入不一樣的分區。spa
在OGG 文檔
http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-2561CA12-9BAC-454B-A2E3-2D36C5C60EE5.htm#GADBD449
中的 5.1.4 Kafka Handler Configuration 的屬性 gg.handler.kafkahandler.ProducerRecordClass 裏面提到了,默認使用的是oracle.goldengate.handler.kafka.DefaultProducerRecord這個類對錶名進行分區的。若是要自定義的話須要實現CreateProducerRecord這個接口3d
原話是 The unit of data in Kafka - a
ProducerRecord
holds the key field with the value representing the payload. This key is used for partitioning a Kafka Producer record that holds change capture data. By default, the fully qualified table name is used to partition the records. In order to change this key or behavior, theCreateProducerRecord
Kafka Handler Interface needs to be implemented and this property needs to be set to point to the fully qualified name of the customProducerRecord
class.code
然而寫入kafka的結果卻不是這樣子的。這點讓人費解。看來咱們須要查看OGG的源代碼。htm
在OGG的安裝包裏面有一個名叫ggjava/resources/lib/ggkafka-****.jar
的文件,咱們將其導入一個工程以後就能夠直接看到它的源代碼了。
咱們直接查看oracle.goldengate.handler.kafka.DefaultProducerRecord
這個類
public class DefaultProducerRecord implements CreateProducerRecord { public DefaultProducerRecord() { } public ProducerRecord createProducerRecord(String topicName, Tx transaction, Op operation, byte[] data, TxOpMode handlerMode) { ProducerRecord pr; if(handlerMode.isOperationMode()) { pr = new ProducerRecord(topicName, operation.getTableName().getOriginalName().getBytes(), data); } else { pr = new ProducerRecord(topicName, (Object)null, data); } return pr; }}
這個類只返回一個ProducerRecord,這個是用於發送給Kafka的一條消息。咱們先無論這個,繼續看他是如何寫給kafka的
首先是OGG與Kafka相關的配置類 oracle.goldengate.handler.kafka.impl.KafkaProperties
。這個類裏面定義了一堆參數,咱們只須要關心partitioner.class
這個參數,該參數用於定義寫入Kafka的時候獲取分區的類。很遺憾,這個類沒有該參數配置。
這裏有一個抽象類oracle.goldengate.handler.kafka.impl.AbstractKafkaProducer
,他有兩個子類,分別叫BlockingKafkaProducer
和NonBlockingKafkaProducer
(默認是NonBlockingKafkaProducer)
這兩個類都是直接將經過producer對象將record發送給了kafka。所以想要指導Kafka的分區信息還須要看Kafka是怎麼獲取分區的。
進入kafka的producer發送record的函數
public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); }public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
發送的方法在doSend裏面,裏面內容不少,請看我勾出來的這兩段
因爲寫入的時候都沒有對Record指定分區,所以這段代碼的partition都爲空。因此代碼總會執行到 this.partitioner.partition(record.topic(), record.key(), serializedKey,record.value(), serializedValue,cluster)
該函數是kafka的Partitioner這個抽象類裏面的
因爲2.3.2 Kafka配置類中沒有指定分區的class,所以只會使用Kafka默認的分區類org.apache.kafka.clients.producer.internals.DefaultPartitioner
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
這裏先是獲取了一個隨機值,而後再獲取了Kafka中對應topic的可用分區列表,而後根據分區數和隨機值進行取餘獲得分區數的值。
流程走到這裏,咱們基本能夠獲得一個結論。
事情到了這裏,咱們能夠判定,寫入分區錯亂的問題是由於gg.handler.kafkahandler.Mode
是事務模式,致使多條消息一次發送了,沒法使用表名做爲key,OGG就用了null做爲key發送給了Kafka,最終Kafka拿到空值以後只能隨機發送給某個partition,因此纔會出現這樣的問題。
最終,修改了ogg的操做模式以後能夠看到,寫入的分區正常了。