Kafka鏈接器深度解讀之轉換器和序列化

Kafka鏈接器是Apache Kafka®的一部分,提供數據存儲與Kafka之間的流式集成。對於數據工程師來講,只須要使用JSON格式配置文件便可。目前已經有不少數據存儲的鏈接器,僅舉幾例來講,包括JDBCElasticsearchIBM MQS3BigQueryhtml

對於開發者,Kafka鏈接器有豐富的API,若有必要,能夠開發本身的鏈接器。此外它還具備用於配置和管理鏈接器的REST APIjava

Kafka鏈接器自己是模塊化的,提供了很是強大的知足集成需求的方法,部分關鍵組件包括:git

  • 鏈接器:定義了一組如何與數據存儲集成的JAR文件;
  • 轉換器:處理數據的序列化和反序列化;
  • 變換:傳輸過程當中的消息處理(可選)。

圍繞Kafka鏈接器,常見的錯誤或者誤解之一是數據的序列化,這是Kafka鏈接器經過轉換器進行處理的,下面會介紹它們的工做機制,並說明一些常見問題如何處理。github

Kafka消息只是字節

Kafka消息是按照主題進行組織的。每條消息都是一個鍵/值對,不過Kafka就須要這些。當數據在Kafka中存儲時都只是字節,這使得Kafka能夠適用於各類場景,但這也意味着開發者有責任決定如何對數據進行序列化。sql

在配置Kafka鏈接器時,標準步驟的關鍵之一是序列化格式,要確保主題的讀取方和寫入方使用相同的序列化格式,不然會出現混亂和錯誤!docker

常見的格式有不少,包括:數據庫

  • JSON;
  • Avro;
  • Protobuf;
  • 字符串分割(如CSV)。

每種格式都有優勢和缺點。apache

序列化格式的選擇

選擇序列化格式的一些原則包括:json

  • 模式:不少時候數據都會有一個模式。可能不喜歡這個事實,但做爲開發人員有責任保留和傳播此模式,由於模式提供了服務之間的契約。某些消息格式(例如Avro和Protobuf)具備強大的模式支持,而其它消息格式支持較少(JSON)或根本沒有(分隔字符串);
  • 生態系統兼容性:Avro是Confluent平臺的一等公民,獲得了Confluent模式註冊表、Kafka鏈接器、KSQL等的原生支持。而Protobuf則依賴於部分功能支持的社區貢獻;
  • 消息大小:JSON是純文本格式,消息大小依賴於Kafka自己的壓縮配置,而Avro和Protobuf都是二進制格式,所以消息較小;
  • 語言支持:Java體系對Avro有強大的支持,但若是應用不是基於Java的,那麼可能會發現它不太容易處理。

若是使用JSON格式寫入目標端,須要在主題中使用JSON格式麼?

不須要,無論是從源端讀取數據的格式,仍是將數據寫入外部存儲,都不會影響Kafka中消息序列化的格式。bootstrap

Kafka中的鏈接器負責從源端(例如數據庫)讀取數據,並將其做爲數據的內部表示傳遞給轉換器,而後,Kafka中的轉換器會將此源數據對象序列化到主題上。

當使用Kafka鏈接器做爲接收端時,正好相反,即轉換器未來自主題的數據反序列化爲內部表示,其會傳遞給鏈接器,而後使用指定方法寫入目標端。

這意味着能夠在主題中好比以Avro格式保存數據,而後好比將其寫入HDFS時,再指定接收端鏈接器使用的格式

配置轉換器

Kafka鏈接器在工做節點級別使用默認的轉換器配置,也能夠在每一個鏈接器上覆蓋它。因爲在整個流水線中使用相同的序列化格式一般是一個好的作法,因此一般只需在工做節點上配置轉換器,而無需在鏈接器中指定。可是若是從其它主題中提取數據而它們使用不一樣的序列化格式時,就要在鏈接器配置中指定它,即便在鏈接器的配置中覆蓋它,執行任務的仍是那個轉換器。

正確的鏈接器永遠不會序列化/反序列化存儲在Kafka中的消息,而是讓配置的轉換器完成這項工做。

注意Kafka消息只是鍵/值字節對,所以須要使用key.convertervalue.converter配置項爲鍵和值指定轉換器,某些狀況下,能夠爲鍵和值指定不一樣的轉換器。

下面是使用String轉換器的示例,因爲它只是一個字符串,數據沒有模式,所以用於value並非那麼有用:

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

某些轉換器有其它配置項,對於Avro,須要指定模式註冊表,對於JSON,須要指定是否但願Kafka鏈接器將模式嵌入JSON自己。爲轉換器指定配置項時,要使用key.converter.value.converter.前綴。例如要將Avro用於消息的內容,須要指定如下配置項:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

常見的轉換器包括:

io.confluent.connect.avro.AvroConverter
  • String:Apache Kafka的一部分
org.apache.kafka.connect.storage.StringConverter
  • JSON:Apache Kafka的一部分
org.apache.kafka.connect.json.JsonConverter
  • ByteArray:Apache Kafka的一部分
org.apache.kafka.connect.converters.ByteArrayConverter
com.blueapron.connect.protobuf.ProtobufConverter

JSON和模式

雖然JSON默認不支持攜帶模式,但Kafka鏈接器確實支持嵌入模式的特定JSON格式。因爲模式也包含在每一個消息中,所以生成的數據大小可能會變大。

若是正在配置Kafka源鏈接器而且但願Kafka鏈接器在寫入Kafka的消息中包含模式,須要作以下的配置:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

最終向Kafka寫入的消息大體以下,schema以及payload爲JSON中的頂級元素:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "registertime"
      },
      {
        "type": "string",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "regionid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "gender"
      }
    ],
    "optional": false,
    "name": "ksql.users"
  },
  "payload": {
    "registertime": 1493819497170,
    "userid": "User_1",
    "regionid": "Region_5",
    "gender": "MALE"
  }
}

請注意消息的大小,以及由內容與模式組成的消息的大小。考慮到在每條消息中都重複這一點,就會看到爲何像Avro這樣的格式頗有意義,由於模式是單獨存儲的,而消息只包含有效內容(並進行過壓縮)。

若是從一個Kafka主題中使用Kafka接收鏈接器消費JSON格式的數據,則須要瞭解數據中是否包含模式,若是包含,則要與上面的格式相同,而不能是一些任意的格式,那麼配置以下:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

不過,若是使用的JSON數據沒有schema/payload結構,像下面這樣:

{
  "registertime": 1489869013625,
  "userid": "User_1",
  "regionid": "Region_2",
  "gender": "OTHER"
}

則必須經過配置通知Kafka鏈接器不要尋找模式,以下:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

和之前同樣,要記住轉換器配置項(此處schemas.enable)須要合適的前綴key.convertervalue.converter

常見錯誤

若是Kafka鏈接器中轉換器配置不正確,可能遇到如下一些常見錯誤。這些消息會出如今Kafka鏈接器配置的接收端中,由於這裏是對存儲在Kafka中的消息進行反序列化的點。轉換器問題一般不會在源端發生,由於源端已經配置了序列化。其中每一個都會致使鏈接器失敗,開始的錯誤爲:

ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator.java:104)

這個錯誤以後,會看到一個異常堆棧,其中描述了錯誤的緣由。注意對於鏈接器中的任何嚴重錯誤,都會拋出上述錯誤,所以可能會看到與序列化無關的錯誤。要快速定位錯誤是由哪一個錯誤配置致使的,可參考下表:

問題:使用JsonConverter讀取非JSON格式數據

若是源端主題上有非JSON格式的數據,可是使用JsonConverter進行讀取,就會看到:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)

這多是由於源端主題以Avro或其它格式序列化引發的。

解決方案:若是數據其實是Avro格式,則須要按照以下方式修改Kafka鏈接器的接收端:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

或者,若是主題由Kafka鏈接器注入,也能夠調整上游源端,讓其輸出JSON格式數據:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

問題:使用AvroConverter讀取非Avro格式數據

這是最多見的錯誤,當嘗試使用AvroConverter從非Avro格式的主題讀取數據時,會發生這種狀況,還包括使用非Confluent模式註冊表的Avro序列化器寫入的數據:

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

解決方案:檢查源端主題的序列化格式,調整Kafka鏈接器接收端使用正確的轉換器,或將上游格式修改成Avro(這樣最好)。若是上游主題由Kafka鏈接器注入,也能夠按以下方式配置源端鏈接器的轉換器:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

問題:讀取沒有指望的schema/payload結構的JSON數據

如前所述,Kafka鏈接器支持包含有效內容和模式的特殊JSON格式消息結構,若是讀取的JSON數據不是這樣的結構,會有下面的錯誤:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

要知道,對於schemas.enable=true惟一有效的JSON結構是,schemapayload做爲頂級元素(如上所示)。

正如錯誤消息自己所述,若是隻是簡單的JSON數據,則應將鏈接器的配置更改成:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

若是要在數據中包含模式,要麼切換到使用Avro(推薦),要麼配置上游的Kafka鏈接器以在消息中包含模式:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",

解決問題的提示

查看鏈接器工做節點的日誌

要查看Kafka鏈接器的錯誤日誌,須要定位到Kafka鏈接器工做節點的輸出。這個位置取決於Kafka鏈接器是如何啓動的,安裝Kafka鏈接器有好幾種方法,包括Docker、Confluent CLI、systemd和手動下載的壓縮包,而後工做節點的日誌分別位於:

  • Docker:docker logs container_name
  • Confluent CLI:confluent log connect
  • systemd:日誌文件寫入/var/log/confluent/kafka-connect
  • 其它:默認狀況下,Kafka鏈接器會將其輸出發送到stdout,所以能夠在啓動Kafka鏈接器的終端會話中看到。

查看Kafka鏈接器的配置文件

要更改Kafka鏈接器工做節點的配置項(適用於全部運行的鏈接器),須要相應地作出以下的修改:

  • Docker:配置環境變量,好比在Docker Compose中:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  • Confluent CLI:使用配置文件/etc/schema-registry/connect-avro-distributed.properties
  • systemd(deb/rpm):使用配置文件/etc/kafka/connect-distributed.properties
  • 其它:啓動Kafka鏈接器時,能夠指定工做節點的屬性文件,例如:
$ cd confluent-5.0.0
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

檢查Kafka主題

假設遇到了上面提到過的錯誤,而且想要解決爲何Kafka鏈接器的接收端沒法從主題中讀取數據。

這時須要檢查正在讀取的主題的數據,並確認它採用了指望的序列化格式。另外,要記住全部消息都必須採用這種格式,因此不要只是假設由於如今以正確的格式向主題發送消息,因此不會出現問題。Kafka鏈接器和其它消費者也會讀取該主題的已有消息。

下面將使用命令行來描述如何進行故障排除,但還有一些其它工具也能夠作:

  • Confluent控制中心有經過可視化的方式查看主題內容的功能,包括自動肯定序列化格式;
  • KSQL的PRINT命令會將主題的內容打印到控制檯,包括自動肯定序列化格式;
  • Confluent CLI工具備consume命令,其可被用於讀取字符串和Avro格式的數據。

若是認爲是字符串/JSON格式數據

可使用控制檯工具,包括kafkacatkafka-console-consumer,以kafkacat爲例:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1
{
  "registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}

使用jq命令,還能夠驗證和格式化JSON:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1|jq '.'
{
  "registertime": 1493356576434,
  "userid": "User_8",
  "regionid": "Region_2",
  "gender": "MALE"
}

若是你看到了下面這樣的亂碼字符,其極可能是二進制數據,好比經過Avro或Protobuf格式寫入就是這樣的:

$ kafkacat -b localhost:9092 -t users-avro -C -c1
ڝ���VUser_9Region_MALE

若是認爲是Avro格式數據

須要使用專爲讀取和反序列化Avro數據而設計的控制檯工具,這裏會使用kafka-avro-console-consumer。先要確認指定了正確的模式註冊表URL:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --topic users-avro \
                              --from-beginning --max-messages 1
{"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}

和以前同樣,若是要對其格式化,能夠經過管道輸出結果給jq

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --topic users-avro \
                              --from-beginning --max-messages 1 | \
                              jq '.'
{
  "registertime": 1505213905022,
  "userid": "User_5",
  "regionid": "Region_4",
  "gender": "FEMALE"
}

內部轉換器

當運行在分佈式模式時,Kafka鏈接器使用Kafka自己來存儲有關其操做的元數據,包括鏈接器配置,偏移量等。

經過internal.key.converter/internal.value.converter配置項,這些Kafka主題自己能夠配置使用不一樣的轉換器。可是這些配置項只供內部使用,實際上從Kafka 2.0版本開始就已被棄用。再也不須要修改這些,若是還要修改這些配置項,從Kafka的2.0版本開始,將會收到警告。

將模式應用於沒有模式的消息

不少時候Kafka鏈接器會從已經存在模式的地方引入數據,這時只要保留該模式而後使用合適的序列化格式(例如Avro),加上好比模式註冊表等提供的兼容性保證,該數據的全部下游用戶就均可以從可用的模式中受益。可是若是沒有明確的模式呢?

可能正使用FileSourceConnector從純文本文件中讀取數據(不建議用於生產,但一般用於PoC),或者可能正在使用REST鏈接器從REST端點提取數據。因爲這二者以及其它的都沒有固有的模式,所以須要進行聲明。

有時可能只是想從源端讀取字節而後將它們寫入一個主題上,但大多數狀況下須要作正確的事情並應用模式以便數據能夠正確地處理。做爲數據提取的一部分處理一次,而不是將問題推送到每一個消費者(多是多個),這是一個更好的作法。

能夠編寫本身的Kafka流式應用以將模式應用於Kafka主題中的數據,但也可使用KSQL。這篇文章展現瞭如何對從REST端點提取的JSON數據執行此操做。下面會看一下將模式應用於某些CSV數據的簡單示例,顯然是能夠作到的。

假設有一個名爲testdata-csv的Kafka主題,其中有一些CSV數據,大體以下:

$ kafkacat -b localhost:9092 -t testdata-csv -C
1,Rick Astley,Never Gonna Give You Up
2,Johnny Cash,Ring of Fire

經過觀察,能夠猜想它有三個字段,可能爲:

  • ID;
  • 藝術家;
  • 歌曲。

若是將數據保留在這樣的主題中,那麼任何想要使用該數據的應用程序(多是Kafka鏈接器接收端、定製的Kafka應用或者其它),都須要每次猜想這個模式。或者更糟糕的是,每一個消費端應用的開發者都須要不斷向數據提供方確認模式及其任何變動。正如Kafka解耦系統同樣,這種模式依賴性迫使團隊之間存在硬性耦合,這並非一件好事。

所以要作的只是使用KSQL將模式應用於數據,並填充一個新的派生主題,其中保存模式。在KSQL中,能夠像下面這樣查看主題數據:

ksql> PRINT 'testdata-csv' FROM BEGINNING;
Format:STRING
11/6/18 2:41:23 PM UTC , NULL , 1,Rick Astley,Never Gonna Give You Up
11/6/18 2:41:23 PM UTC , NULL , 2,Johnny Cash,Ring of Fire

這裏的前兩個字段(11/6/18 2:41:23 PM UTCNULL)分別是Kafka消息的時間戳和鍵,而其他字段來自CSV文件。下面用KSQL註冊這個主題並聲明模式:

ksql> CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \
WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED');

Message
----------------
Stream created
----------------

經過KSQL能夠查看如今有一個數據流模式:

ksql> DESCRIBE TESTDATA_CSV;

Name                 : TESTDATA_CSV
 Field   | Type
-------------------------------------
 ROWTIME | BIGINT (system)
 ROWKEY  | VARCHAR(STRING) (system)
 ID      | INTEGER
 ARTIST  | VARCHAR(STRING)
 SONG    | VARCHAR(STRING)
-------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

經過查詢KSQL流確認數據是否符合預期。注意對於已有的Kafka主題,此時只是做爲Kafka的消費者,還沒有更改或複製任何數據。

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT ID, ARTIST, SONG FROM TESTDATA_CSV;
1 | Rick Astley | Never Gonna Give You Up
2 | Johnny Cash | Ring of Fire

最後,建立一個新的Kafka主題,由具備模式的從新序列化的數據填充。KSQL查詢是連續的,所以除了將任何已有的數據從源端主題發送到目標端主題以外,KSQL還將向主題發送任何將來的數據。

ksql> CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV;

Message
----------------------------
Stream created and running
----------------------------

這時使用Avro格式的控制檯消費者對數據進行驗證:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                                --property schema.registry.url=http://localhost:8081 \
                                --topic TESTDATA \
                                --from-beginning | \
                                jq '.'
{
  "ID": {
    "int": 1
},
  "ARTIST": {
    "string": "Rick Astley"
},
  "SONG": {
    "string": "Never Gonna Give You Up"
  }
}
[…]

甚至能夠在模式註冊表中查看已經註冊的模式:

$ curl -s http://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson'
{
  "type": "record",
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "ID",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
    {
      "name": "ARTIST",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SONG",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

任何寫入原始主題(testdata-csv)的新消息都由KSQL自動處理,並寫入Avro格式的名爲TESTDATA的新主題。如今,任何想要使用此數據的應用或團隊均可以簡單地處理TESTDATA主題,並利用聲明模式的Avro序列化數據。還可使用此技術更改主題中的分區數,分區鍵和複製因子。

結論

Kafka鏈接器是一個很是簡單但功能強大的工具,可用於將其它系統與Kafka集成,最多見的誤解是Kafka鏈接器提供的轉換器。以前已經介紹過Kafka消息只是鍵/值對,瞭解應該使用哪一個序列化機制而後在Kafka鏈接器中對其進行標準化很是重要。

相關文章
相關標籤/搜索