Kafka鏈接器深度解讀之錯誤處理和死信隊列

Kafka鏈接器是Kafka的一部分,是在Kafka和其它技術之間構建流式管道的一個強有力的框架。它可用於將數據從多個地方(包括數據庫、消息隊列和文本文件)流式注入到Kafka,以及從Kafka將數據流式傳輸到目標端(如文檔存儲、NoSQL、數據庫、對象存儲等)中。javascript

現實世界並不完美,出錯是不免的,所以在出錯時Kafka的管道能儘量優雅地處理是最好的。一個常見的場景是獲取與特定序列化格式不匹配的主題的消息(好比預期爲Avro時實際爲JSON,反之亦然)。自從Kafka 2.0版本發佈以來,Kafka鏈接器包含了錯誤處理選項,即將消息路由到php

死信隊列
的功能,這是構建數據管道的經常使用技術。

在本文中將介紹幾種處理問題的常見模式,並說明如何實現。css

失敗後當即中止

有時可能但願在發生錯誤時當即中止處理,可能遇到質量差的數據是因爲上游的緣由致使的,必須由上游來解決,繼續嘗試處理其它的消息已經沒有意義。java

這是Kafka鏈接器的默認行爲,也可使用下面的配置項顯式地指定:sql

errors.tolerance = none
複製代碼

在本示例中,該鏈接器配置爲從主題中讀取JSON格式數據,而後將其寫入純文本文件。注意這裏爲了演示使用的是FileStreamSinkConnector鏈接器,不建議在生產中使用。數據庫

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_01",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_json",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "file":"/data/file_sink_01.txt"
                }
        }'
複製代碼

主題中的某些JSON格式消息是無效的,鏈接器會當即終止,進入如下的FAILED狀態:apache

$ curl -s "http://localhost:8083/connectors/file_sink_01/status"| \
    jq -c -M '[.name,.tasks[].state]'
["file_sink_01","FAILED"]
複製代碼

查看Kafka鏈接器工做節點的日誌,能夠看到錯誤已經記錄而且任務已經終止:json

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
…
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
…
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 98)): was expecting double-quote to start field name
 at [Source: (byte[])"{brokenjson-:"bar 1"}"; line: 1, column: 3]
複製代碼

要修復管道,須要解決源主題上的消息問題。除非事先指定,Kafka鏈接器是不會簡單地「跳過」無效消息的。若是是配置錯誤(例如指定了錯誤的序列化轉換器),那最好了,改正以後從新啓動鏈接器便可。不過若是確實是該主題的無效消息,那麼須要找到一種方式,即不要阻止全部其它有效消息的處理。ruby

靜默忽略無效的消息

若是隻是但願處理一直持續下去:bash

errors.tolerance = all
複製代碼

在實際中大概以下:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_05",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_json",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "file":"/data/file_sink_05.txt",
                "errors.tolerance": "all"
                }
        }'
複製代碼

啓動鏈接器以後(仍是原來的源主題,其中既有有效的,也有無效的消息),就能夠持續地運行:

$ curl -s "http://localhost:8083/connectors/file_sink_05/status"| \
    jq -c -M '[.name,.tasks[].state]'
["file_sink_05","RUNNING"]
複製代碼

這時即便鏈接器讀取的源主題上有無效的消息,也不會有錯誤寫入Kafka鏈接器工做節點的輸出,而有效的消息會按照預期寫入輸出文件:

$ head data/file_sink_05.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
…

複製代碼

是否能夠感知數據的丟失?

配置了errors.tolerance = all以後,Kafka鏈接器就會忽略掉無效的消息,而且默認也不會記錄被丟棄的消息。若是確認配置errors.tolerance = all,那麼就須要仔細考慮是否以及如何知道實際上發生的消息丟失。在實踐中這意味着基於可用指標的監控/報警,和/或失敗消息的記錄。

肯定是否有消息被丟棄的最簡單方法,是將源主題上的消息數與寫入目標端的數量進行對比:

$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l 150 $ wc -l data/file_sink_05.txt 100 data/file_sink_05.txt 複製代碼

這個作法雖然不是很優雅,可是確實能看出發生了消息的丟失,而且由於日誌中沒有記錄,因此用戶仍然對此一無所知。

一個更加可靠的辦法是,使用JMX指標來主動監控和報警錯誤消息率:

這時能夠看到發生了錯誤,可是並不知道那些消息發生了錯誤,不過這是用戶想要的。其實即便以後這些被丟棄的消息被寫入了/dev/null,實際上也是能夠知道的,這也正是死信隊列概念出現的點。

將消息路由到死信隊列

Kafka鏈接器能夠配置爲將沒法處理的消息(例如上面提到的反序列化錯誤)發送到一個單獨的Kafka主題,即死信隊列。有效消息會正常處理,管道也會繼續運行。而後能夠從死信隊列中檢查無效消息,並根據須要忽略或修復並從新處理。

[圖片上傳失敗...(image-9dbb1e-1554814992353)]

進行以下的配置能夠啓用死信隊列:

errors.tolerance = all
errors.deadletterqueue.topic.name = 
複製代碼

若是運行於單節點Kafka集羣,還須要配置errors.deadletterqueue.topic.replication.factor = 1,其默認值爲3。

具備此配置的鏈接器配置示例大體以下:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_02",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_json",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "file": "/data/file_sink_02.txt",
                "errors.tolerance": "all",
                "errors.deadletterqueue.topic.name":"dlq_file_sink_02",
                "errors.deadletterqueue.topic.replication.factor": 1
                }
        }'
複製代碼

使用和以前相同的源主題,而後處理混合有有效和無效的JSON數據,會看到新的鏈接器能夠穩定運行:

$ curl -s "http://localhost:8083/connectors/file_sink_02/status"| \
    jq -c -M '[.name,.tasks[].state]'
["file_sink_02","RUNNING"]
複製代碼

源主題中的有效記錄將寫入目標文件:

$ head data/file_sink_02.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
複製代碼

這樣管道能夠繼續正常運行,而且還有了死信隊列主題中的數據,這能夠從指標數據中看出:

檢查主題自己也能夠看出來:

ksql> LIST TOPICS;

 Kafka Topic            | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------
 dlq_file_sink_02       | false      | 1          | 1                  | 0         | 0
 test_topic_json        | false      | 1          | 1                  | 1         | 1
---------------------------------------------------------------------------------------------------

ksql> PRINT 'dlq_file_sink_02' FROM BEGINNING;
Format:STRING
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 1"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 2"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 3"}
…
複製代碼

從輸出中能夠看出,消息的時間戳爲(1/24/19 5:16:03 PM UTC),鍵爲(NULL),而後爲值。這時能夠看到值是無效的JSON格式{foo:"bar 1"}foo也應加上引號),所以JsonConverter在處理時會拋出異常,所以最終會輸出到死信主題。

可是隻有看到消息才能知道它是無效的JSON,即使如此,也只能假設消息被拒絕的緣由,要肯定Kafka鏈接器將消息視爲無效的實際緣由,有兩個方法:

  • 死信隊列的消息頭;
  • Kafka鏈接器的工做節點日誌。

下面會分別介紹。

記錄消息的失敗緣由:消息頭

消息頭是使用Kafka消息的鍵、值和時間戳存儲的附加元數據,是在Kafka 0.11版本中引入的。Kafka鏈接器能夠將有關消息拒絕緣由的信息寫入消息自己的消息頭中。這個作法比寫入日誌文件更好,由於它將緣由直接與消息聯繫起來。

配置以下的參數,能夠在死信隊列的消息頭中包含拒絕緣由:

errors.deadletterqueue.context.headers.enable = true
複製代碼

配置示例大體以下:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_03",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_json",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "file": "/data/file_sink_03.txt",
                "errors.tolerance": "all",
                "errors.deadletterqueue.topic.name":"dlq_file_sink_03",
                "errors.deadletterqueue.topic.replication.factor": 1,
                "errors.deadletterqueue.context.headers.enable":true
                }
        }'
複製代碼

和以前一致,鏈接器能夠正常運行(由於配置了errors.tolerance=all)。

$ curl -s "http://localhost:8083/connectors/file_sink_03/status"| \
    jq -c -M '[.name,.tasks[].state]'
["file_sink_03","RUNNING"]
複製代碼

源主題中的有效消息會正常寫入目標文件:

$ head data/file_sink_03.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
複製代碼

可使用任何消費者工具來檢查死信隊列上的消息(以前使用了KSQL),不過這裏會使用kafkacat,而後立刻就會看到緣由,最簡單的操做大體以下:

kafkacat -b localhost:9092 -t dlq_file_sink_03
% Auto-selecting Consumer mode (use -P or -C to override)
{foo:"bar 1"}
{foo:"bar 2"}
…
複製代碼

不過kafkacat有更強大的功能,能夠看到比消息自己更多的信息:

kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 \
  -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
複製代碼

這個命令將獲取最後一條消息(-o-1,針對偏移量,使用最後一條消息),只讀取一條消息(-c1),而且經過-f參數對其進行格式化,以更易於理解:

Key (-1 bytes):
  Value (13 bytes): {foo:"bar 5"}
  Timestamp: 1548350164096
  Partition: 0
  Offset: 34
  Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
 due to serialization error:
[…]
複製代碼

也能夠只顯示消息頭,並使用一些簡單的技巧將其拆分,這樣能夠更清楚地看到該問題的更多信息:

$ kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 -f '%h'|tr ',' '\n'
__connect.errors.topic=test_topic_json
__connect.errors.partition=0
__connect.errors.offset=94
__connect.errors.connector.name=file_sink_03
__connect.errors.task.id=0
__connect.errors.stage=VALUE_CONVERTER
__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter
__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException
__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error:
複製代碼

Kafka鏈接器處理的每條消息都來自源主題和該主題中的特定點(偏移量),消息頭已經準確地說明了這一點。所以可使用它來回到原始主題並在須要時檢查原始消息,因爲死信隊列已經有一個消息的副本,這個檢查更像是一個保險的作法。

根據從上面的消息頭中獲取的詳細信息,能夠再檢查一下源消息:

__connect.errors.topic=test_topic_json
__connect.errors.offset=94
複製代碼

將這些值分別插入到kafkacat的表明主題和偏移的-t-o參數中,能夠獲得:

$ kafkacat -b localhost:9092 -C \
  -t test_topic_json -o94 \
  -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Topic: %t\n'
複製代碼
Key (-1 bytes):
  Value (13 bytes): {foo:"bar 5"}
  Timestamp: 1548350164096
  Partition: 0
  Offset: 94
  Topic: test_topic_json
複製代碼

與死信隊列中的上述消息相比,能夠看到徹底相同,甚至包括時間戳,惟一的區別是主題、偏移量和消息頭。

記錄消息的失敗緣由:日誌

記錄消息的拒絕緣由的第二個選項是將其寫入日誌。根據安裝方式不一樣,Kafka鏈接器會將其寫入標準輸出或日誌文件。不管哪一種方式都會爲每一個失敗的消息生成一堆詳細輸出。進行以下配置可啓用此功能:

errors.log.enable = true
複製代碼

經過配置errors.log.include.messages = true,還能夠在輸出中包含有關消息自己的元數據。此元數據中包括一些和上面提到的消息頭中同樣的項目,包括源消息的主題和偏移量。注意它不包括消息鍵或值自己。

這時的鏈接器配置以下:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_04",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_json",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "file": "/data/file_sink_04.txt",
                "errors.tolerance": "all",
                "errors.log.enable":true,
                "errors.log.include.messages":true
                }
        }'
複製代碼

鏈接器是能夠成功運行的:

$ curl -s "http://localhost:8083/connectors/file_sink_04/status"| \
    jq -c -M '[.name,.tasks[].state]'
["file_sink_04","RUNNING"]
Valid records from the source topic get written to the target file:
$ head data/file_sink_04.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
複製代碼

這時去看Kafka鏈接器的工做節點日誌,會發現每一個失敗的消息都有錯誤記錄:

ERROR Error encountered in task file_sink_04-0. Executing stage 'VALUE_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
[…]
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
 at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
複製代碼

能夠看到錯誤自己,還有就是和錯誤有關的信息:

{topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}
複製代碼

如上所示,能夠在kafkacat等工具中使用該主題和偏移量來檢查源主題上的消息。根據拋出的異常也可能會看到記錄的源消息:

Caused by: org.apache.kafka.common.errors.SerializationException:
…
at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
複製代碼

處理死信隊列的消息

雖然設置了一個死信隊列,可是如何處理那些「死信」呢?由於它只是一個Kafka主題,因此能夠像使用任何其它主題同樣使用標準的Kafka工具。上面已經看到了,好比可使用kafkacat來檢查消息頭,而且對於消息的內容及其元數據的通常檢查kafkacat也能夠作。固然根據被拒絕的緣由,也能夠選擇對消息進行重播。

一個場景是鏈接器正在使用Avro轉換器,可是主題上的倒是JSON格式消息(所以被寫入死信隊列)。可能因爲遺留緣由JSON和Avro格式的生產者都在寫入源主題,這個問題得解決,可是目前只須要將管道流中的數據寫入接收器便可。

首先,從初始的接收器讀取源主題開始,使用Avro反序列化並路由到死信隊列:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_06__01-avro",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"test_topic_avro",
                "file":"/data/file_sink_06.txt",
                "key.converter": "io.confluent.connect.avro.AvroConverter",
                "key.converter.schema.registry.url": "http://schema-registry:8081",
                "value.converter": "io.confluent.connect.avro.AvroConverter",
                "value.converter.schema.registry.url": "http://schema-registry:8081",
                "errors.tolerance":"all",
                "errors.deadletterqueue.topic.name":"dlq_file_sink_06__01",
                "errors.deadletterqueue.topic.replication.factor":1,
                "errors.deadletterqueue.context.headers.enable":true,
                "errors.retry.delay.max.ms": 60000,
                "errors.retry.timeout": 300000
                }
        }'
複製代碼

另外再建立第二個接收器,將第一個接收器的死信隊列做爲源主題,並嘗試將記錄反序列化爲JSON,在這裏要更改的是value.converterkey.converter、源主題名和死信隊列名(若是此鏈接器須要將任何消息路由到死信隊列,要避免遞歸)。

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "file_sink_06__02-json",
        "config": {
                "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                "topics":"dlq_file_sink_06__01",
                "file":"/data/file_sink_06.txt",
                "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                "value.converter.schemas.enable": false,
                "key.converter":"org.apache.kafka.connect.json.JsonConverter",
                "key.converter.schemas.enable": false,
                "errors.tolerance":"all",
                "errors.deadletterqueue.topic.name":"dlq_file_sink_06__02",
                "errors.deadletterqueue.topic.replication.factor":1,
                "errors.deadletterqueue.context.headers.enable":true,
                "errors.retry.delay.max.ms": 60000,
                "errors.retry.timeout": 300000
                }
        }'
複製代碼

如今能夠驗證一下。

首先,源主題收到20條Avro消息,以後能夠看到20條消息被讀取並被原始Avro接收器接收:

而後發送8條JSON消息,這時8條消息被髮送到死信隊列,而後被JSON接收器接收:

如今再發送5條格式錯誤的JSON消息,以後能夠看到二者都有失敗的消息,有2點能夠確認:

  1. 從Avro接收器發送到死信隊列的消息數與成功發送的JSON消息數之間有差別;
  2. 消息被髮送到JSON接收器的死信隊列。

經過KSQL監控死信隊列

除了使用JMX監控死信隊列以外,還能夠利用KSQL的聚合能力編寫一個簡單的流應用來監控消息寫入隊列的速率:

-- 爲每一個死信隊列主題註冊流。
CREATE STREAM dlq_file_sink_06__01(MSG VARCHAR)WITH(KAFKA_TOPIC ='dlq_file_sink_06__01',VALUE_FORMAT ='DELIMITED');
CREATE STREAM dlq_file_sink_06__02(MSG VARCHAR)WITH(KAFKA_TOPIC ='dlq_file_sink_06__02',VALUE_FORMAT ='DELIMITED');

-- 從主題的開頭消費數據
SET 'auto.offset.reset' = 'earliest';

-- 使用其它列建立監控流,可用於後續聚合查詢
CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS \
  SELECT 'dlq_file_sink_06__01' AS SINK_NAME, \
         'Records: ' AS GROUP_COL, \
         MSG \
    FROM dlq_file_sink_06__01;

-- 使用來自第二個死信隊列的消息注入相同的監控流
INSERT INTO DLQ_MONITOR \
  SELECT 'dlq_file_sink_06__02' AS SINK_NAME, \
         'Records: ' AS GROUP_COL, \
         MSG \
    FROM dlq_file_sink_06__02;

-- 在每一個死信隊列每分鐘的時間窗口內,建立消息的聚合視圖
CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS \
  SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS, \
         SINK_NAME, \
         GROUP_COL, \
         COUNT(*) AS DLQ_MESSAGE_COUNT \
    FROM DLQ_MONITOR \
          WINDOW TUMBLING (SIZE 1 MINUTE) \
 GROUP BY SINK_NAME, \
          GROUP_COL;
複製代碼

這個聚合表能夠以交互式的方式進行查詢,下面顯示了一分鐘內每一個死信隊列中的消息數量:

ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_MESSAGE_COUNT_PER_MIN;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
2019-02-01 03:12:00 | dlq_file_sink_06__01 | 5
2019-02-01 02:56:00 | dlq_file_sink_06__02 | 5
2019-02-01 03:12:00 | dlq_file_sink_06__02 | 5
複製代碼

由於這個表的下面是Kafka主題,因此能夠將其路由到指望的任何監控儀表盤,還能夠用於驅動告警。假定有幾條錯誤消息是能夠接受的,可是一分鐘內超過5條消息就是個大問題須要關注:

CREATE TABLE DLQ_BREACH AS \
    SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT \
      FROM DLQ_MESSAGE_COUNT_PER_MIN \
     WHERE DLQ_MESSAGE_COUNT>5;
複製代碼

如今又有了一個報警服務能夠訂閱的DLQ_BREACH主題,當收到任何消息時,能夠觸發適當的操做(例如通知)。

ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
複製代碼

Kafka鏈接器哪裏沒有提供錯誤處理?

Kafka鏈接器的錯誤處理方式,以下表所示:

鏈接器生命週期階段 描述 是否處理錯誤?
開始 首次啓動鏈接器時,其將執行必要的初始化,例如鏈接到數據存儲
拉取(針對源鏈接器) 從源數據存儲讀取消息
格式轉換 從Kafka主題讀寫數據並對JSON/Avro格式進行序列化/反序列化
單消息轉換 應用任何已配置的單消息轉換
接收(針對接收鏈接器) 將消息寫入目標數據存儲

注意源鏈接器沒有死信隊列。

錯誤處理配置流程

關於鏈接器錯誤處理的配置,能夠按照以下的流程一步步進階:

總結

處理錯誤是任何穩定可靠的數據管道的重要組成部分,根據數據的使用方式,能夠有兩個選項。若是管道任何錯誤的消息都不能接受,代表上游存在嚴重的問題,那麼就應該當即中止處理(這是Kafka鏈接器的默認行爲)。

另外一方面,若是隻是想將數據流式傳輸到存儲以進行分析或非關鍵性處理,那麼只要不傳播錯誤,保持管道穩定運行則更爲重要。這時就能夠定義錯誤的處理方式,推薦的方式是使用死信隊列並密切監視來自Kafka鏈接器的可用JMX指標。

相關文章
相關標籤/搜索