在Kafka的2.3版本中,對Kafka鏈接器作了很大的改進。首先就是在添加和刪除鏈接器時,修改了Kafka鏈接器處理任務的方式。以前這個動做形成了整個系統的停頓,這是一直被開發和運維人員詬病的地方,除此以外,社區中頻繁提到的其餘一些問題,也獲得瞭解決。html
Kafka鏈接器集羣由一個或多個工做節點進程組成,集羣以任務的形式分發鏈接器的負載。在添加或刪除鏈接器或工做節點時,Kafka鏈接器會嘗試再平衡這些任務。在Kafka的2.3版本以前,集羣會中止全部任務,從新計算全部任務的執行位置,而後重啓全部任務。每次再平衡都會暫停全部數據進出的工做,一般時間很短,但有時也會持續一段時間。git
如今經過KIP-415,Kafka 2.3用增量協做再平衡作了替代,之後將僅對須要啓動、中止或移動的任務進行再平衡。具體的詳細信息請參見這裏。docker
下面用一些鏈接器作了一個簡單的測試,這裏只使用了一個分佈式Kafka鏈接器工做節點,而源端使用了kafka-connect-datagen
,它以固定的時間間隔根據給定的模式生成隨機數據。以固定的時間間隔就能夠粗略地計算因爲再平衡而中止任務的時間,由於生成的消息做爲Kafka消息的一部分,包含了時間戳。這些消息以後會被流式注入Elasticsearch,之因此用它,不只由於它是一個易於使用的接收端,也由於能夠經過觀察源端消息的時間戳來查看生產中的任何停頓。apache
經過以下的方式,能夠建立源端:json
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "orders", "quickstart":"orders", "max.interval":200, "iterations":10000000, "tasks.max": "1" }'
經過以下方式建立接收端:api
curl -s -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/sink-elastic-orders-00/config \ -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "orders", "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "false", "transforms": "addTS", "transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTS.timestamp.field": "op_ts" }'
這裏使用了單消息轉換,將Kafka消息的時間戳提高到消息自己的字段中,以即可以在Elasticsearch中進行公開。以後會使用Kibana進行繪製,這樣產生的消息數量降低就能夠顯示出來,與再平衡發生的位置一致:安全
在Kafka鏈接器的工做節點日誌中,能夠查看活動和時間,並對Kafka的2.2版本和2.3版本的行爲進行比較:bash
**注意:**爲了清楚地說明問題,日誌作了精簡處理。app
在再平衡問題(如前述)已大大改善以後,Kafka鏈接器的第二大困擾多是難以在Kafka鏈接器工做節點日誌中肯定某個消息屬於哪一個鏈接器。運維
以前能夠直接從鏈接器的任務中獲取日誌中的消息,例如:
INFO Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory) INFO Using default GSON instance (io.searchbox.client.JestClientFactory) INFO Node Discovery disabled... (io.searchbox.client.JestClientFactory) INFO Idle connection reaping disabled... (io.searchbox.client.JestClientFactory)
他們屬於哪一個任務?不清楚。也許會認爲JestClient
與Elasticsearch
有關,也許它們來自Elasticsearch
鏈接器,可是如今有5個不一樣的Elasticsearch
鏈接器在運行,那麼它們來自哪一個實例?更不用說鏈接器能夠有多個任務了。
在Apache Kafka 2.3中,可使用映射診斷上下文(MDC)日誌,在日誌中提供了更多的上下文信息:
INFO [sink-elastic-orders-00|task-0] Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory:223) INFO [sink-elastic-orders-00|task-0] Using default GSON instance (io.searchbox.client.JestClientFactory:69) INFO [sink-elastic-orders-00|task-0] Node Discovery disabled... (io.searchbox.client.JestClientFactory:86) INFO [sink-elastic-orders-00|task-0] Idle connection reaping disabled... (io.searchbox.client.JestClientFactory:98)
這個日誌格式的更改默認是禁用的,以保持後向兼容性。要啓用此改進,須要編輯etc/kafka/connect-log4j.properties
文件,按照以下方式修改log4j.appender.stdout.layout.ConversionPattern
:
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
經過環境變量CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
,Kafka鏈接器的Docker鏡像也支持了這個特性。
具體細節請參見KIP-449。
REST改進
KIP-465爲/connectors
REST端點添加了一些方便的功能。經過傳遞其餘參數,能夠獲取有關每一個鏈接器的更多信息,而沒必要迭代結果並進行其餘REST調用。
例如,在Kafka 2.3以前要查詢全部任務的狀態,必須執行如下操做,使用xargs
迭代輸出並重復調用status
端點:
$ curl -s "http://localhost:8083/connectors"| \ jq '.[]'| \ xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \ jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \ column -s : -t| sed 's/\"//g'| sort sink-elastic-orders-00 | RUNNING | RUNNING source-datagen-01 | RUNNING | RUNNING
如今使用Kafka 2.3,可使用/connectors?expand=status
加上一些jq
技巧進行單個REST調用,就能夠達到和以前同樣的效果:
$ curl -s "http://localhost:8083/connectors?expand=status" | \ jq 'to_entries[] | [.key, .value.status.connector.state,.value.status.tasks[].state]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort sink-elastic-orders-00 | RUNNING | RUNNING source-datagen-01 | RUNNING | RUNNING
還有/connectors?expand=status
,它將返回每一個鏈接器信息,如配置、鏈接器類型等,也能夠把它們結合起來:
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status"|jq 'to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort sink | sink-elastic-orders-00 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector source | source-datagen-01 | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector
Kafka鏈接器現已支持client.id
由於KIP-411,Kafka鏈接器如今能夠以更有用的方式爲每項任務配置client.id
。以前,只能看到consumer-25
做爲鏈接器的消費者組的一部分從給定的分區進行消費,如今則能夠將其直接綁定回特定的任務,從而使故障排除和診斷更加容易。
鏈接器級生產者/消費者配置覆寫
長期以來的一個常見需求是可以覆寫分別由Kafka鏈接器接收端和源端使用的消費者設置或生產者設置。到目前爲止,它們都採用了工做節點配置中指定的值,除非生成了更多的工做節點,不然沒法對諸如安全主體之類的內容進行細粒度的控制。
Kafka 2.3中的KIP-458使工做節點可以容許對配置進行覆寫。connector.client.config.override.policy
是一個新的參數,在工做節點級能夠有3個可選項:
值 | 描述 |
---|---|
None | 默認策略,不容許任何配置的覆寫 |
Principal | 容許覆蓋生產者、消費者和admin 前綴的security.protocol 、sasl.jaas.config 和sasl.mechanism |
All | 容許覆蓋生產者、消費者和admin 前綴的全部配置 |
經過在工做節點配置中設置上述參數,如今能夠針對每一個鏈接器對配置進行覆寫。只需提供帶有consumer.override
(接收端)或producer.override
(源端)前綴的必需參數便可,還能夠針對死信隊列使用admin.override
。
在下面的示例中,建立鏈接器時,它將從主題的當前點開始消費數據,而不是讀取主題中的全部可用數據,這是經過將consumer.override.auto.offset.reset
配置爲latest
覆蓋auto.offset.reset configuration
來實現的。
curl -i -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/sink-elastic-orders-01-latest/config \ -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "orders", "consumer.override.auto.offset.reset": "latest", "tasks.max": 1, "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "false", "transforms": "renameTopic", "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.renameTopic.regex": "orders", "transforms.renameTopic.replacement": "orders-latest" }'
經過檢查工做節點日誌,能夠看到覆寫已經生效:
[2019-07-17 13:57:27,532] INFO [sink-elastic-orders-01-latest|task-0] ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest […]
能夠看到這個ConsumerConfig
日誌條目與建立的鏈接器直接關聯,證實了上述MDC日誌記錄的有用性。
第二個鏈接器運行於同一主題但沒有consumer.override
,所以繼承了默認值earliest
:
[2019-07-17 13:57:27,487] INFO [sink-elastic-orders-01-earliest|task-0] ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest […]
經過將數據從主題流式傳輸到Elasticsearch能夠檢查配置的差別形成的影響。
$ curl -s "localhost:9200/_cat/indices?h=idx,docsCount" orders-latest 2369 orders-earliest 144932
有兩個索引:一個從同一主題注入了較少的消息,由於orders-latest
索引只注入了鏈接器建立後纔到達主題的消息;而另外一個orders-earliest
索引,由一個單獨的鏈接器注入,它會使用Kafka鏈接器的默認配置,即會注入全部的新消息,再加上主題中原有的全部消息。