.Net(c#)使用 Kafka 小結

.Net(c#)使用 Kafka 小結

1.開篇

因爲項目中必須使用 kafka 來做爲消息組件,因此使用 kafka 有一段時間了。不得不感嘆 kafka 是一個至關優秀的消息系統。下面直接對使用過程作一總結,但願對你們有用。git

1.1.kafka 部署

kafka 的簡單搭建咱們使用 docker 進行,方便快捷單節點。生產環境不推薦這樣的單節點 kafka 部署。github

1.1.1.確保安裝了 docker 和 docker-compose

網上不少教程,安裝也簡單,不做爲重點贅述。docker

1.1.2.編寫 docker-compose.yml

將如下內容直接複製到新建空文件docker-compose.yml中。c#

version: "3"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

1.1.3.容器構建提交

docker-compose.yml文件的目錄下執行如下命令:服務器

docker-compose build # 打包
docker-compose up # 啓動, 添加 -d 能夠後臺啓動。

看到日誌輸出:異步

Creating network "desktop_default" with the default driver
Creating desktop_zookeeper_1 ... done
Creating desktop_kafka_1     ... done
Attaching to desktop_zookeeper_1, desktop_kafka_1
zookeeper_1  | ZooKeeper JMX enabled by default
zookeeper_1  | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper_1  | 2020-05-17 03:34:31,794 [myid:] - INFO  [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
...
zookeeper_1  | 2020-05-17 03:34:31,872 [myid:] - INFO  [main:ZooKeeperServer@836] - tickTime set to 2000
...
kafka_1      | Excluding KAFKA_VERSION from broker config

沒有錯誤輸出說明部署成功。async

2.kafka 客戶端選擇

在 github 上可以找到好幾個 c#可使用的 kafka 客戶端。你們能夠去搜一下,本文就只說明rdkafka-dotnetconfluent-kafka-dotnettcp

2.1.rdkafka-dotnet

咱們生產環境中就使用的該客戶端。在該項目 github 首頁中能夠看到:函數

var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
{
    consumer.OnMessage += (obj, msg) =>
    {
        //...
    };
}

沒錯,使用它的緣由就是它提供了EventConsumer,能夠直接異步訂閱消息。總體上來講該客戶端很是的穩定,性能優良。使用過程當中比較難纏的就是它的配置,比較不直觀。它基於librdkafka(C/C++)實現,配置 Config 類中顯式配置比較少,大多數是經過字典配置的,好比:性能

var config = new Config();
config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置爲最先

這對於新手來講並非很友好,很難想到去這樣配置。固然若是有 librdkafka 的使用經驗會好不少。大多數配置在 librdkafka 項目的CONFIGURATION

還有一個須要注意的是 Broker 的版本支持Broker version support: >=0.8,也在 librdkafka 項目中能夠找到。

2.2 confluent-kafka-dotnet

confluent-kafka-dotnet 是 rdkafka-dotnet(好幾年沒有維護了)的官方後續版本。推薦使用 confluent-kafka-dotnet,由於配置相對友好,更加全面。好比:

var conf = new ConsumerConfig
{
    AutoOffsetReset = AutoOffsetReset.Earliest//顯式強類型賦值配置
};

對於 EventConsumer 怎麼辦呢?在項目變動記錄中已經明確提出移除了 OnMessage 多播委託,而 EventConsumer,也就不存在了。但這不難,咱們能夠參照基項目寫一個:

public class EventConsumer<TKey, TValue> : IDisposable
{
    private Task _consumerTask;
    private CancellationTokenSource _consumerCts;
    public IConsumer<TKey, TValue> Consumer { get; }
    public ConsumerBuilder<TKey, TValue> Builder { get; set; }
    public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)
    {
        Builder = new ConsumerBuilder<TKey, TValue>(config);
        Consumer = Builder.Build();
    }
    public event EventHandler<ConsumeResult<TKey, TValue>> OnConsumeResult;
    public event EventHandler<ConsumeException> OnConsumeException;
    public void Start()
    {
        if (Consumer.Subscription?.Any() != true)
        {
            throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
        }
        if (_consumerTask != null)
        {
            return;
        }
        _consumerCts = new CancellationTokenSource();
        var ct = _consumerCts.Token;
        _consumerTask = Task.Factory.StartNew(() =>
        {
            while (!ct.IsCancellationRequested)
            {
                try
                {
                    var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
                    if (cr == null) continue;
                    OnConsumeResult?.Invoke(this, cr);
                }
                catch (ConsumeException e)
                {
                    OnConsumeException?.Invoke(this, e);
                }
            }
        }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }
    public async Task Stop()
    {
        if (_consumerCts == null || _consumerTask == null) return;
        _consumerCts.Cancel();
        try
        {
            await _consumerTask;
        }
        finally
        {
            _consumerTask = null;
            _consumerCts = null;
        }
    }
    public void Dispose()
    {
        if (_consumerTask != null)
        {
            Stop().Wait();
        }
        Consumer?.Dispose();
    }
}

使用測試:

static async Task Main(string[] args)
{
    Console.WriteLine("Hello World!");
    var conf = new ConsumerConfig
    {
        GroupId = "test-consumer-group",
        BootstrapServers = "localhost:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest,
    };
    var eventConsumer = new EventConsumer<Ignore, string>(conf);
    eventConsumer.Consumer.Subscribe(new[] {"test"});
    eventConsumer.OnConsumeResult += (sen, cr) =>
    {
        Console.WriteLine($"Receive '{cr.Message.Value}' from '{cr.TopicPartitionOffset}'");
    };
    do
    {
        var line = Console.ReadLine();
        switch (line)
        {
            case "stop":
                eventConsumer.Stop();
                break;
            case "start":
                eventConsumer.Start();
                break;
        }
    } while (true);
}

3.功能擴展

!!!如下討論都是對confluent-kafka-dotnet。

因爲用戶終端也使用了 kafka 客戶端訂閱消息。若是終端長時間沒有上線,而且消息過時時間也較長,服務端會存有大量消息。終端一上線就會讀取到大量的堆積消息,很容易就把內存耗盡了。考慮到客戶端不是長期在線的場景,無需不間斷的處理全部消息,服務端才適合這個角色(:。因此客戶端只需每次從登陸時的最新點開始讀取就能夠了,歷史性統計就交給服務器去作。

最便捷的方法是每次客戶端鏈接都使用新的groupid,用時間或者guid撒鹽。但這樣會使服務端記錄大量的group信息(若是終端不少m個,而且終端斷開鏈接重連的次數也會不少隨機n次,那麼也是m*n個group信息),勢必對服務端性能形成影響。

另外一種方法是在保持groupid不變的狀況下,修改消費偏移。那如何去設置位置偏移爲最新點呢?

3.1 錯誤思路 AutoOffsetReset

在配置中存在一個讓新手容易產生誤解的配置項AutoOffsetReset.Latest自動偏移到最新位置。當你興沖沖的準備大幹一番時發現只有首次建立GroupId時會起做用,當 groupid 已經存在 kafka 記錄中時它就無論用了。

3.2 提交偏移 Commit

咱們可以在IConsumer<TKey, TValue>中找到該 commit 方法,它有三個重載:

1. 無參函數。就是提交當前客戶端`IConsumer<TKey, TValue>.Assignment`記錄的偏移。
2. 參數ConsumeResult<TKey, TValue>。一次僅提交一個偏移。固然配置中默認設置爲自動提交(`conf.EnableAutoCommit = true;`),無需手動提交。
3. 參數IEnumerable<TopicPartitionOffset> offsets。直接提交到某一個位置。TopicPartitionOffset有三個決定性屬性:話題topic、分區:partition、偏移offset。

第三個函數就是咱們想要的,咱們只需獲得對應參數TopicPartitionOffset的值就能夠。

3.2.1.TopicPartition的獲取

topic 是咱們惟一能夠肯定的。在IConsumer<TKey, TValue>.Assignment中能夠獲得 topic 和 partition。但遺憾的是它只有不會當即有值。咱們只能主動去服務端獲取,在IAdminClient中找到了可獲取該信息的方法,因此咱們作一擴展:

public static IEnumerable<TopicPartition> GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout)
{
    using var adv = new AdminClientBuilder(config).Build();
    var topPns = adv.GetTopicPartition(topic, timeout);
    return topPns;
}

public static IEnumerable<TopicPartition> GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout)
{
    var mta = client.GetMetadata(timeout);
    var topicPartitions = mta.Topics
        .Where(t => topic == t.Topic)
        .SelectMany(t => t.Partitions.Select(tt => new TopicPartition(t.Topic, tt.PartitionId)))
        .ToList();
    return topicPartitions;
}

3.2.2. TopicPartitionOffset獲取

咱們還差 offset 的值,經過IConsumer<TKey, TValue>.QueryWatermarkOffsets方法能夠查到當前水位,而其中 High 水位就是最新偏移。

如今咱們能夠完成咱們的任務了嗎?問題再次出現,雖然客戶端表現得從最新點消費了,可是在此以前的卡頓和相似與內存溢出讓人不得心安。Commit 仍是消費了全部消息:(,只不過暗搓搓的進行。在全部消息消費期間讀取全部未消費,而後拼命提交。客戶端哪有這麼大的內存和性能呢。最終,找到一個和第三個 commit 方法同樣接受參數的方法Assign,一試果真靈驗。

public static void AssignOffsetToHighWatermark<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TopicPartition partition, TimeSpan timeout)
{
    var water = consumer.QueryWatermarkOffsets(partition, timeout);
    if (water == null || water.High == 0) return;
    var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High);
    consumer.Assign(offset);
}

3.2.3.實際使用

最終的使用示例:

//...
var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5));
topicPartitions?.ToList().ForEach(t =>
{
    eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5));
});
eventConsumer.Start();//在消費事件開始以前就能夠進行偏移設置
//...

請注意,若是您關閉了自動提交功能,而且不主動提交任何偏移信息,那麼服務端對該 group 的偏移記錄將一直不變,Assign 函數並不會改變任何服務的偏移記錄。

4.總結

這一圈下來整個 kafka 的基本消費流程也就搞清楚了。kafka 消費者須要對消費的消息進行提交。事實上,每一個消息體裏都有偏移信息。不提交對於服務端來講就是客戶端沒有處理過該消息,將不會更改已消費偏移。以此來保證消息消費的可靠性。這和 tcp 中三次握手有殊途同歸之妙。

服務端保存着每個 groupid 對應的已經提交偏移Committed Offset。固然客戶端不提交它是不會變動的(不考慮直接操做服務端的形式)。

客戶端保存本身的當前偏移Current Offset,能夠經過AssignCommit進行更改,兩者區別是Commit將連同提交到服務端對應的偏移中進行更改,而Assign僅改變客戶端偏移,這一更改記錄在IConsumer<TKey, TValue>.Assignment中,首次啓動時客戶端異步向服務端請求Committed Offset來對其賦值。這就是在 3.2 節中咱們沒有當即獲得該值的的緣由,該值將在可能在幾秒中後被賦值,因此寫了一個主動獲取的方法GetTopicPartition。客戶端下一次消費將根據IConsumer<TKey, TValue>.Assignment進行。

使用AdminClientBuilder.GetMetadata函數能夠獲得對應話題的元數據,包括:topic、partition、Brokers 等。

使用IConsumer<TKey, TValue>.QueryWatermarkOffsets函數能夠獲得當前服務端的水位,low 爲最先的偏移(可能不是 0,考慮消息過時被刪除的狀況),high 爲最新的偏移。

相關文章
相關標籤/搜索