Kafka鏈接器之在2.3版本中的改進

在Kafka的2.3版本中,對Kafka鏈接器作了很大的改進。首先就是在添加和刪除鏈接器時,修改了Kafka鏈接器處理任務的方式。以前這個動做形成了整個系統的停頓,這是一直被開發和運維人員詬病的地方,除此以外,社區中頻繁提到的其餘一些問題,也獲得瞭解決。html

Kafka鏈接器中的增量協做再平衡

Kafka鏈接器集羣由一個或多個工做節點進程組成,集羣以任務的形式分發鏈接器的負載。在添加或刪除鏈接器或工做節點時,Kafka鏈接器會嘗試再平衡這些任務。在Kafka的2.3版本以前,集羣會中止全部任務,從新計算全部任務的執行位置,而後重啓全部任務。每次再平衡都會暫停全部數據進出的工做,一般時間很短,但有時也會持續一段時間。git

如今經過KIP-415,Kafka 2.3用增量協做再平衡作了替代,之後將僅對須要啓動、中止或移動的任務進行再平衡。具體的詳細信息請參見這裏docker

下面用一些鏈接器作了一個簡單的測試,這裏只使用了一個分佈式Kafka鏈接器工做節點,而源端使用了kafka-connect-datagen,它以固定的時間間隔根據給定的模式生成隨機數據。以固定的時間間隔就能夠粗略地計算因爲再平衡而中止任務的時間,由於生成的消息做爲Kafka消息的一部分,包含了時間戳。這些消息以後會被流式注入Elasticsearch,之因此用它,不只由於它是一個易於使用的接收端,也由於能夠經過觀察源端消息的時間戳來查看生產中的任何停頓。apache

經過以下的方式,能夠建立源端:json

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \
    -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "orders",
    "quickstart":"orders",
    "max.interval":200,
    "iterations":10000000,
    "tasks.max": "1"
  }'

經過以下方式建立接收端:api

curl -s -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-elastic-orders-00/config \
    -d '{
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "orders",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "type.name=kafkaconnect",
        "key.ignore": "true",
        "schema.ignore": "false",
        "transforms": "addTS",
        "transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addTS.timestamp.field": "op_ts"
        }'

這裏使用了單消息轉換,將Kafka消息的時間戳提高到消息自己的字段中,以即可以在Elasticsearch中進行公開。以後會使用Kibana進行繪製,這樣產生的消息數量降低就能夠顯示出來,與再平衡發生的位置一致:安全

在Kafka鏈接器的工做節點日誌中,能夠查看活動和時間,並對Kafka的2.2版本和2.3版本的行爲進行比較:bash

**注意:**爲了清楚地說明問題,日誌作了精簡處理。app

對日誌的改進

在再平衡問題(如前述)已大大改善以後,Kafka鏈接器的第二大困擾多是難以在Kafka鏈接器工做節點日誌中肯定某個消息屬於哪一個鏈接器。運維

以前能夠直接從鏈接器的任務中獲取日誌中的消息,例如:

INFO Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory)
INFO Using default GSON instance (io.searchbox.client.JestClientFactory)
INFO Node Discovery disabled... (io.searchbox.client.JestClientFactory)
INFO Idle connection reaping disabled... (io.searchbox.client.JestClientFactory)

他們屬於哪一個任務?不清楚。也許會認爲JestClientElasticsearch有關,也許它們來自Elasticsearch鏈接器,可是如今有5個不一樣的Elasticsearch鏈接器在運行,那麼它們來自哪一個實例?更不用說鏈接器能夠有多個任務了。

在Apache Kafka 2.3中,可使用映射診斷上下文(MDC)日誌,在日誌中提供了更多的上下文信息:

INFO [sink-elastic-orders-00|task-0] Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory:223)
INFO [sink-elastic-orders-00|task-0] Using default GSON instance (io.searchbox.client.JestClientFactory:69)
INFO [sink-elastic-orders-00|task-0] Node Discovery disabled... (io.searchbox.client.JestClientFactory:86)
INFO [sink-elastic-orders-00|task-0] Idle connection reaping disabled... (io.searchbox.client.JestClientFactory:98)

這個日誌格式的更改默認是禁用的,以保持後向兼容性。要啓用此改進,須要編輯etc/kafka/connect-log4j.properties文件,按照以下方式修改log4j.appender.stdout.layout.ConversionPattern

log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

經過環境變量CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERNKafka鏈接器的Docker鏡像也支持了這個特性。

具體細節請參見KIP-449

REST改進

KIP-465/connectorsREST端點添加了一些方便的功能。經過傳遞其餘參數,能夠獲取有關每一個鏈接器的更多信息,而沒必要迭代結果並進行其餘REST調用。

例如,在Kafka 2.3以前要查詢全部任務的狀態,必須執行如下操做,使用xargs迭代輸出並重復調用status端點:

$ curl -s "http://localhost:8083/connectors"| \
    jq '.[]'| \
    xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
    jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
    column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00  |  RUNNING  |  RUNNING
source-datagen-01       |  RUNNING  |  RUNNING

如今使用Kafka 2.3,可使用/connectors?expand=status加上一些jq技巧進行單個REST調用,就能夠達到和以前同樣的效果:

$ curl -s "http://localhost:8083/connectors?expand=status" | \
     jq 'to_entries[] | [.key, .value.status.connector.state,.value.status.tasks[].state]|join(":|:")' | \
     column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00  |  RUNNING  |  RUNNING
source-datagen-01       |  RUNNING  |  RUNNING

還有/connectors?expand=status,它將返回每一個鏈接器信息,如配置、鏈接器類型等,也能夠把它們結合起來:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status"|jq 'to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
sink    |  sink-elastic-orders-00  |  RUNNING  |  RUNNING  |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
source  |  source-datagen-01       |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector

Kafka鏈接器現已支持client.id

由於KIP-411,Kafka鏈接器如今能夠以更有用的方式爲每項任務配置client.id。以前,只能看到consumer-25做爲鏈接器的消費者組的一部分從給定的分區進行消費,如今則能夠將其直接綁定回特定的任務,從而使故障排除和診斷更加容易。

鏈接器級生產者/消費者配置覆寫

長期以來的一個常見需求是可以覆寫分別由Kafka鏈接器接收端和源端使用的消費者設置生產者設置。到目前爲止,它們都採用了工做節點配置中指定的值,除非生成了更多的工做節點,不然沒法對諸如安全主體之類的內容進行細粒度的控制。

Kafka 2.3中的KIP-458使工做節點可以容許對配置進行覆寫。connector.client.config.override.policy是一個新的參數,在工做節點級能夠有3個可選項:

描述
None 默認策略,不容許任何配置的覆寫
Principal 容許覆蓋生產者、消費者和admin前綴的security.protocolsasl.jaas.configsasl.mechanism
All 容許覆蓋生產者、消費者和admin前綴的全部配置

經過在工做節點配置中設置上述參數,如今能夠針對每一個鏈接器對配置進行覆寫。只需提供帶有consumer.override(接收端)或producer.override(源端)前綴的必需參數便可,還能夠針對死信隊列使用admin.override

在下面的示例中,建立鏈接器時,它將從主題的當前點開始消費數據,而不是讀取主題中的全部可用數據,這是經過將consumer.override.auto.offset.reset配置爲latest覆蓋auto.offset.reset configuration來實現的。

curl -i -X PUT -H  "Content-Type:application/json" \
      http://localhost:8083/connectors/sink-elastic-orders-01-latest/config \
      -d '{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics": "orders",
  "consumer.override.auto.offset.reset": "latest",
  "tasks.max": 1,
  "connection.url": "http://elasticsearch:9200",  "type.name": "type.name=kafkaconnect",
  "key.ignore": "true",   "schema.ignore": "false",
  "transforms": "renameTopic",
  "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.renameTopic.regex": "orders",
  "transforms.renameTopic.replacement": "orders-latest"
}'

經過檢查工做節點日誌,能夠看到覆寫已經生效:

[2019-07-17 13:57:27,532] INFO [sink-elastic-orders-01-latest|task-0] ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
[…]

能夠看到這個ConsumerConfig日誌條目與建立的鏈接器直接關聯,證實了上述MDC日誌記錄的有用性。

第二個鏈接器運行於同一主題但沒有consumer.override,所以繼承了默認值earliest

[2019-07-17 13:57:27,487] INFO [sink-elastic-orders-01-earliest|task-0] ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
[…]

經過將數據從主題流式傳輸到Elasticsearch能夠檢查配置的差別形成的影響。

$ curl -s "localhost:9200/_cat/indices?h=idx,docsCount"
orders-latest     2369
orders-earliest 144932

有兩個索引:一個從同一主題注入了較少的消息,由於orders-latest索引只注入了鏈接器建立後纔到達主題的消息;而另外一個orders-earliest索引,由一個單獨的鏈接器注入,它會使用Kafka鏈接器的默認配置,即會注入全部的新消息,再加上主題中原有的全部消息。

相關文章
相關標籤/搜索