Kafka鏈接器是Kafka的一部分,是在Kafka和其它技術之間構建流式管道的一個強有力的框架。它可用於將數據從多個地方(包括數據庫、消息隊列和文本文件)流式注入到Kafka,以及從Kafka將數據流式傳輸到目標端(如文檔存儲、NoSQL、數據庫、對象存儲等)中。javascript
現實世界並不完美,出錯是不免的,所以在出錯時Kafka的管道能儘量優雅地處理是最好的。一個常見的場景是獲取與特定序列化格式不匹配的主題的消息(好比預期爲Avro時實際爲JSON,反之亦然)。自從Kafka 2.0版本發佈以來,Kafka鏈接器包含了錯誤處理選項,即將消息路由到php
在本文中將介紹幾種處理問題的常見模式,並說明如何實現。css
有時可能但願在發生錯誤時當即中止處理,可能遇到質量差的數據是因爲上游的緣由致使的,必須由上游來解決,繼續嘗試處理其它的消息已經沒有意義。java
這是Kafka鏈接器的默認行爲,也可使用下面的配置項顯式地指定:sql
errors.tolerance = none
複製代碼
在本示例中,該鏈接器配置爲從主題中讀取JSON格式數據,而後將其寫入純文本文件。注意這裏爲了演示使用的是FileStreamSinkConnector
鏈接器,不建議在生產中使用。數據庫
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_01",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file":"/data/file_sink_01.txt"
}
}'
複製代碼
主題中的某些JSON格式消息是無效的,鏈接器會當即終止,進入如下的FAILED
狀態:apache
$ curl -s "http://localhost:8083/connectors/file_sink_01/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_01","FAILED"]
複製代碼
查看Kafka鏈接器工做節點的日誌,能夠看到錯誤已經記錄而且任務已經終止:json
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
…
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
…
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 98)): was expecting double-quote to start field name
at [Source: (byte[])"{brokenjson-:"bar 1"}"; line: 1, column: 3]
複製代碼
要修復管道,須要解決源主題上的消息問題。除非事先指定,Kafka鏈接器是不會簡單地「跳過」無效消息的。若是是配置錯誤(例如指定了錯誤的序列化轉換器),那最好了,改正以後從新啓動鏈接器便可。不過若是確實是該主題的無效消息,那麼須要找到一種方式,即不要阻止全部其它有效消息的處理。ruby
若是隻是但願處理一直持續下去:bash
errors.tolerance = all
複製代碼
在實際中大概以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_05",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file":"/data/file_sink_05.txt",
"errors.tolerance": "all"
}
}'
複製代碼
啓動鏈接器以後(仍是原來的源主題,其中既有有效的,也有無效的消息),就能夠持續地運行:
$ curl -s "http://localhost:8083/connectors/file_sink_05/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_05","RUNNING"]
複製代碼
這時即便鏈接器讀取的源主題上有無效的消息,也不會有錯誤寫入Kafka鏈接器工做節點的輸出,而有效的消息會按照預期寫入輸出文件:
$ head data/file_sink_05.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
…
複製代碼
配置了errors.tolerance = all
以後,Kafka鏈接器就會忽略掉無效的消息,而且默認也不會記錄被丟棄的消息。若是確認配置errors.tolerance = all
,那麼就須要仔細考慮是否以及如何知道實際上發生的消息丟失。在實踐中這意味着基於可用指標的監控/報警,和/或失敗消息的記錄。
肯定是否有消息被丟棄的最簡單方法,是將源主題上的消息數與寫入目標端的數量進行對比:
$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l 150 $ wc -l data/file_sink_05.txt 100 data/file_sink_05.txt 複製代碼
這個作法雖然不是很優雅,可是確實能看出發生了消息的丟失,而且由於日誌中沒有記錄,因此用戶仍然對此一無所知。
一個更加可靠的辦法是,使用JMX指標來主動監控和報警錯誤消息率:
這時能夠看到發生了錯誤,可是並不知道那些消息發生了錯誤,不過這是用戶想要的。其實即便以後這些被丟棄的消息被寫入了/dev/null
,實際上也是能夠知道的,這也正是死信隊列概念出現的點。
Kafka鏈接器能夠配置爲將沒法處理的消息(例如上面提到的反序列化錯誤)發送到一個單獨的Kafka主題,即死信隊列。有效消息會正常處理,管道也會繼續運行。而後能夠從死信隊列中檢查無效消息,並根據須要忽略或修復並從新處理。
[圖片上傳失敗...(image-9dbb1e-1554814992353)]
進行以下的配置能夠啓用死信隊列:
errors.tolerance = all
errors.deadletterqueue.topic.name =
複製代碼
若是運行於單節點Kafka集羣,還須要配置errors.deadletterqueue.topic.replication.factor = 1
,其默認值爲3。
具備此配置的鏈接器配置示例大體以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_02",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_02.txt",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_02",
"errors.deadletterqueue.topic.replication.factor": 1
}
}'
複製代碼
使用和以前相同的源主題,而後處理混合有有效和無效的JSON數據,會看到新的鏈接器能夠穩定運行:
$ curl -s "http://localhost:8083/connectors/file_sink_02/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_02","RUNNING"]
複製代碼
源主題中的有效記錄將寫入目標文件:
$ head data/file_sink_02.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
複製代碼
這樣管道能夠繼續正常運行,而且還有了死信隊列主題中的數據,這能夠從指標數據中看出:
檢查主題自己也能夠看出來:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------
dlq_file_sink_02 | false | 1 | 1 | 0 | 0
test_topic_json | false | 1 | 1 | 1 | 1
---------------------------------------------------------------------------------------------------
ksql> PRINT 'dlq_file_sink_02' FROM BEGINNING;
Format:STRING
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 1"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 2"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 3"}
…
複製代碼
從輸出中能夠看出,消息的時間戳爲(1/24/19 5:16:03 PM UTC
),鍵爲(NULL
),而後爲值。這時能夠看到值是無效的JSON格式{foo:"bar 1"}
(foo
也應加上引號),所以JsonConverter在處理時會拋出異常,所以最終會輸出到死信主題。
可是隻有看到消息才能知道它是無效的JSON,即使如此,也只能假設消息被拒絕的緣由,要肯定Kafka鏈接器將消息視爲無效的實際緣由,有兩個方法:
下面會分別介紹。
消息頭是使用Kafka消息的鍵、值和時間戳存儲的附加元數據,是在Kafka 0.11版本中引入的。Kafka鏈接器能夠將有關消息拒絕緣由的信息寫入消息自己的消息頭中。這個作法比寫入日誌文件更好,由於它將緣由直接與消息聯繫起來。
配置以下的參數,能夠在死信隊列的消息頭中包含拒絕緣由:
errors.deadletterqueue.context.headers.enable = true
複製代碼
配置示例大體以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_03",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_03.txt",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_03",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable":true
}
}'
複製代碼
和以前一致,鏈接器能夠正常運行(由於配置了errors.tolerance=all
)。
$ curl -s "http://localhost:8083/connectors/file_sink_03/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_03","RUNNING"]
複製代碼
源主題中的有效消息會正常寫入目標文件:
$ head data/file_sink_03.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
複製代碼
可使用任何消費者工具來檢查死信隊列上的消息(以前使用了KSQL),不過這裏會使用kafkacat,而後立刻就會看到緣由,最簡單的操做大體以下:
kafkacat -b localhost:9092 -t dlq_file_sink_03
% Auto-selecting Consumer mode (use -P or -C to override)
{foo:"bar 1"}
{foo:"bar 2"}
…
複製代碼
不過kafkacat有更強大的功能,能夠看到比消息自己更多的信息:
kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 \
-f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
複製代碼
這個命令將獲取最後一條消息(-o-1
,針對偏移量,使用最後一條消息),只讀取一條消息(-c1
),而且經過-f
參數對其進行格式化,以更易於理解:
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 34
Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
due to serialization error:
[…]
複製代碼
也能夠只顯示消息頭,並使用一些簡單的技巧將其拆分,這樣能夠更清楚地看到該問題的更多信息:
$ kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 -f '%h'|tr ',' '\n'
__connect.errors.topic=test_topic_json
__connect.errors.partition=0
__connect.errors.offset=94
__connect.errors.connector.name=file_sink_03
__connect.errors.task.id=0
__connect.errors.stage=VALUE_CONVERTER
__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter
__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException
__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error:
複製代碼
Kafka鏈接器處理的每條消息都來自源主題和該主題中的特定點(偏移量),消息頭已經準確地說明了這一點。所以可使用它來回到原始主題並在須要時檢查原始消息,因爲死信隊列已經有一個消息的副本,這個檢查更像是一個保險的作法。
根據從上面的消息頭中獲取的詳細信息,能夠再檢查一下源消息:
__connect.errors.topic=test_topic_json
__connect.errors.offset=94
複製代碼
將這些值分別插入到kafkacat的表明主題和偏移的-t
和-o
參數中,能夠獲得:
$ kafkacat -b localhost:9092 -C \
-t test_topic_json -o94 \
-f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Topic: %t\n'
複製代碼
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 94
Topic: test_topic_json
複製代碼
與死信隊列中的上述消息相比,能夠看到徹底相同,甚至包括時間戳,惟一的區別是主題、偏移量和消息頭。
記錄消息的拒絕緣由的第二個選項是將其寫入日誌。根據安裝方式不一樣,Kafka鏈接器會將其寫入標準輸出或日誌文件。不管哪一種方式都會爲每一個失敗的消息生成一堆詳細輸出。進行以下配置可啓用此功能:
errors.log.enable = true
複製代碼
經過配置errors.log.include.messages = true
,還能夠在輸出中包含有關消息自己的元數據。此元數據中包括一些和上面提到的消息頭中同樣的項目,包括源消息的主題和偏移量。注意它不包括消息鍵或值自己。
這時的鏈接器配置以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_04",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_04.txt",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true
}
}'
複製代碼
鏈接器是能夠成功運行的:
$ curl -s "http://localhost:8083/connectors/file_sink_04/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_04","RUNNING"]
Valid records from the source topic get written to the target file:
$ head data/file_sink_04.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
複製代碼
這時去看Kafka鏈接器的工做節點日誌,會發現每一個失敗的消息都有錯誤記錄:
ERROR Error encountered in task file_sink_04-0. Executing stage 'VALUE_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
[…]
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
複製代碼
能夠看到錯誤自己,還有就是和錯誤有關的信息:
{topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}
複製代碼
如上所示,能夠在kafkacat等工具中使用該主題和偏移量來檢查源主題上的消息。根據拋出的異常也可能會看到記錄的源消息:
Caused by: org.apache.kafka.common.errors.SerializationException:
…
at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
複製代碼
雖然設置了一個死信隊列,可是如何處理那些「死信」呢?由於它只是一個Kafka主題,因此能夠像使用任何其它主題同樣使用標準的Kafka工具。上面已經看到了,好比可使用kafkacat來檢查消息頭,而且對於消息的內容及其元數據的通常檢查kafkacat也能夠作。固然根據被拒絕的緣由,也能夠選擇對消息進行重播。
一個場景是鏈接器正在使用Avro轉換器,可是主題上的倒是JSON格式消息(所以被寫入死信隊列)。可能因爲遺留緣由JSON和Avro格式的生產者都在寫入源主題,這個問題得解決,可是目前只須要將管道流中的數據寫入接收器便可。
首先,從初始的接收器讀取源主題開始,使用Avro反序列化並路由到死信隊列:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_06__01-avro",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_avro",
"file":"/data/file_sink_06.txt",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_06__01",
"errors.deadletterqueue.topic.replication.factor":1,
"errors.deadletterqueue.context.headers.enable":true,
"errors.retry.delay.max.ms": 60000,
"errors.retry.timeout": 300000
}
}'
複製代碼
另外再建立第二個接收器,將第一個接收器的死信隊列做爲源主題,並嘗試將記錄反序列化爲JSON,在這裏要更改的是value.converter
、key.converter
、源主題名和死信隊列名(若是此鏈接器須要將任何消息路由到死信隊列,要避免遞歸)。
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_06__02-json",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"dlq_file_sink_06__01",
"file":"/data/file_sink_06.txt",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_06__02",
"errors.deadletterqueue.topic.replication.factor":1,
"errors.deadletterqueue.context.headers.enable":true,
"errors.retry.delay.max.ms": 60000,
"errors.retry.timeout": 300000
}
}'
複製代碼
如今能夠驗證一下。
首先,源主題收到20條Avro消息,以後能夠看到20條消息被讀取並被原始Avro接收器接收:
而後發送8條JSON消息,這時8條消息被髮送到死信隊列,而後被JSON接收器接收:
如今再發送5條格式錯誤的JSON消息,以後能夠看到二者都有失敗的消息,有2點能夠確認:
除了使用JMX監控死信隊列以外,還能夠利用KSQL的聚合能力編寫一個簡單的流應用來監控消息寫入隊列的速率:
-- 爲每一個死信隊列主題註冊流。
CREATE STREAM dlq_file_sink_06__01(MSG VARCHAR)WITH(KAFKA_TOPIC ='dlq_file_sink_06__01',VALUE_FORMAT ='DELIMITED');
CREATE STREAM dlq_file_sink_06__02(MSG VARCHAR)WITH(KAFKA_TOPIC ='dlq_file_sink_06__02',VALUE_FORMAT ='DELIMITED');
-- 從主題的開頭消費數據
SET 'auto.offset.reset' = 'earliest';
-- 使用其它列建立監控流,可用於後續聚合查詢
CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS \
SELECT 'dlq_file_sink_06__01' AS SINK_NAME, \
'Records: ' AS GROUP_COL, \
MSG \
FROM dlq_file_sink_06__01;
-- 使用來自第二個死信隊列的消息注入相同的監控流
INSERT INTO DLQ_MONITOR \
SELECT 'dlq_file_sink_06__02' AS SINK_NAME, \
'Records: ' AS GROUP_COL, \
MSG \
FROM dlq_file_sink_06__02;
-- 在每一個死信隊列每分鐘的時間窗口內,建立消息的聚合視圖
CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS \
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS, \
SINK_NAME, \
GROUP_COL, \
COUNT(*) AS DLQ_MESSAGE_COUNT \
FROM DLQ_MONITOR \
WINDOW TUMBLING (SIZE 1 MINUTE) \
GROUP BY SINK_NAME, \
GROUP_COL;
複製代碼
這個聚合表能夠以交互式的方式進行查詢,下面顯示了一分鐘內每一個死信隊列中的消息數量:
ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_MESSAGE_COUNT_PER_MIN;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
2019-02-01 03:12:00 | dlq_file_sink_06__01 | 5
2019-02-01 02:56:00 | dlq_file_sink_06__02 | 5
2019-02-01 03:12:00 | dlq_file_sink_06__02 | 5
複製代碼
由於這個表的下面是Kafka主題,因此能夠將其路由到指望的任何監控儀表盤,還能夠用於驅動告警。假定有幾條錯誤消息是能夠接受的,可是一分鐘內超過5條消息就是個大問題須要關注:
CREATE TABLE DLQ_BREACH AS \
SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT \
FROM DLQ_MESSAGE_COUNT_PER_MIN \
WHERE DLQ_MESSAGE_COUNT>5;
複製代碼
如今又有了一個報警服務能夠訂閱的DLQ_BREACH
主題,當收到任何消息時,能夠觸發適當的操做(例如通知)。
ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
複製代碼
Kafka鏈接器的錯誤處理方式,以下表所示:
鏈接器生命週期階段 | 描述 | 是否處理錯誤? |
---|---|---|
開始 | 首次啓動鏈接器時,其將執行必要的初始化,例如鏈接到數據存儲 | 無 |
拉取(針對源鏈接器) | 從源數據存儲讀取消息 | 無 |
格式轉換 | 從Kafka主題讀寫數據並對JSON/Avro格式進行序列化/反序列化 | 有 |
單消息轉換 | 應用任何已配置的單消息轉換 | 有 |
接收(針對接收鏈接器) | 將消息寫入目標數據存儲 | 無 |
注意源鏈接器沒有死信隊列。
關於鏈接器錯誤處理的配置,能夠按照以下的流程一步步進階:
處理錯誤是任何穩定可靠的數據管道的重要組成部分,根據數據的使用方式,能夠有兩個選項。若是管道任何錯誤的消息都不能接受,代表上游存在嚴重的問題,那麼就應該當即中止處理(這是Kafka鏈接器的默認行爲)。
另外一方面,若是隻是想將數據流式傳輸到存儲以進行分析或非關鍵性處理,那麼只要不傳播錯誤,保持管道穩定運行則更爲重要。這時就能夠定義錯誤的處理方式,推薦的方式是使用死信隊列並密切監視來自Kafka鏈接器的可用JMX指標。