因爲項目中必須使用 kafka 來做爲消息組件,因此使用 kafka 有一段時間了。不得不感嘆 kafka 是一個至關優秀的消息系統。下面直接對使用過程作一總結,但願對你們有用。git
kafka 的簡單搭建咱們使用 docker 進行,方便快捷單節點。生產環境不推薦這樣的單節點 kafka 部署。github
網上不少教程,安裝也簡單,不做爲重點贅述。docker
將如下內容直接複製到新建空文件docker-compose.yml
中。c#
version: "3" services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: [zookeeper] ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_CREATE_TOPICS: "test" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
在docker-compose.yml
文件的目錄下執行如下命令:服務器
docker-compose build # 打包 docker-compose up # 啓動, 添加 -d 能夠後臺啓動。
看到日誌輸出:異步
Creating network "desktop_default" with the default driver Creating desktop_zookeeper_1 ... done Creating desktop_kafka_1 ... done Attaching to desktop_zookeeper_1, desktop_kafka_1 zookeeper_1 | ZooKeeper JMX enabled by default zookeeper_1 | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg zookeeper_1 | 2020-05-17 03:34:31,794 [myid:] - INFO [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg ... zookeeper_1 | 2020-05-17 03:34:31,872 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 2000 ... kafka_1 | Excluding KAFKA_VERSION from broker config
沒有錯誤輸出說明部署成功。async
在 github 上可以找到好幾個 c#可使用的 kafka 客戶端。你們能夠去搜一下,本文就只說明rdkafka-dotnet和confluent-kafka-dotnet。tcp
咱們生產環境中就使用的該客戶端。在該項目 github 首頁中能夠看到:函數
var config = new Config() { GroupId = "example-csharp-consumer" }; using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) { consumer.OnMessage += (obj, msg) => { //... }; }
沒錯,使用它的緣由就是它提供了EventConsumer,能夠直接異步訂閱消息。總體上來講該客戶端很是的穩定,性能優良。使用過程當中比較難纏的就是它的配置,比較不直觀。它基於librdkafka(C/C++)實現,配置 Config 類中顯式配置比較少,大多數是經過字典配置的,好比:性能
var config = new Config(); config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置爲最先
這對於新手來講並非很友好,很難想到去這樣配置。固然若是有 librdkafka 的使用經驗會好不少。大多數配置在 librdkafka 項目的CONFIGURATION。
還有一個須要注意的是 Broker 的版本支持Broker version support: >=0.8,也在 librdkafka 項目中能夠找到。
confluent-kafka-dotnet 是 rdkafka-dotnet(好幾年沒有維護了)的官方後續版本。推薦使用 confluent-kafka-dotnet,由於配置相對友好,更加全面。好比:
var conf = new ConsumerConfig { AutoOffsetReset = AutoOffsetReset.Earliest//顯式強類型賦值配置 };
對於 EventConsumer 怎麼辦呢?在項目變動記錄中已經明確提出移除了 OnMessage 多播委託,而 EventConsumer,也就不存在了。但這不難,咱們能夠參照基項目寫一個:
public class EventConsumer<TKey, TValue> : IDisposable { private Task _consumerTask; private CancellationTokenSource _consumerCts; public IConsumer<TKey, TValue> Consumer { get; } public ConsumerBuilder<TKey, TValue> Builder { get; set; } public EventConsumer(IEnumerable<KeyValuePair<string, string>> config) { Builder = new ConsumerBuilder<TKey, TValue>(config); Consumer = Builder.Build(); } public event EventHandler<ConsumeResult<TKey, TValue>> OnConsumeResult; public event EventHandler<ConsumeException> OnConsumeException; public void Start() { if (Consumer.Subscription?.Any() != true) { throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function"); } if (_consumerTask != null) { return; } _consumerCts = new CancellationTokenSource(); var ct = _consumerCts.Token; _consumerTask = Task.Factory.StartNew(() => { while (!ct.IsCancellationRequested) { try { var cr = Consumer.Consume(TimeSpan.FromSeconds(1)); if (cr == null) continue; OnConsumeResult?.Invoke(this, cr); } catch (ConsumeException e) { OnConsumeException?.Invoke(this, e); } } }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default); } public async Task Stop() { if (_consumerCts == null || _consumerTask == null) return; _consumerCts.Cancel(); try { await _consumerTask; } finally { _consumerTask = null; _consumerCts = null; } } public void Dispose() { if (_consumerTask != null) { Stop().Wait(); } Consumer?.Dispose(); } }
使用測試:
static async Task Main(string[] args) { Console.WriteLine("Hello World!"); var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest, }; var eventConsumer = new EventConsumer<Ignore, string>(conf); eventConsumer.Consumer.Subscribe(new[] {"test"}); eventConsumer.OnConsumeResult += (sen, cr) => { Console.WriteLine($"Receive '{cr.Message.Value}' from '{cr.TopicPartitionOffset}'"); }; do { var line = Console.ReadLine(); switch (line) { case "stop": eventConsumer.Stop(); break; case "start": eventConsumer.Start(); break; } } while (true); }
!!!如下討論都是對confluent-kafka-dotnet。
因爲用戶終端也使用了 kafka 客戶端訂閱消息。若是終端長時間沒有上線,而且消息過時時間也較長,服務端會存有大量消息。終端一上線就會讀取到大量的堆積消息,很容易就把內存耗盡了。考慮到客戶端不是長期在線的場景,無需不間斷的處理全部消息,服務端才適合這個角色(:。因此客戶端只需每次從登陸時的最新點開始讀取就能夠了,歷史性統計就交給服務器去作。
最便捷的方法是每次客戶端鏈接都使用新的groupid,用時間或者guid撒鹽。但這樣會使服務端記錄大量的group信息(若是終端不少m個,而且終端斷開鏈接重連的次數也會不少隨機n次,那麼也是m*n個group信息),勢必對服務端性能形成影響。
另外一種方法是在保持groupid不變的狀況下,修改消費偏移。那如何去設置位置偏移爲最新點呢?
在配置中存在一個讓新手容易產生誤解的配置項AutoOffsetReset.Latest自動偏移到最新位置。當你興沖沖的準備大幹一番時發現只有首次建立GroupId時會起做用,當 groupid 已經存在 kafka 記錄中時它就無論用了。
咱們可以在IConsumer<TKey, TValue>
中找到該 commit 方法,它有三個重載:
1. 無參函數。就是提交當前客戶端`IConsumer<TKey, TValue>.Assignment`記錄的偏移。 2. 參數ConsumeResult<TKey, TValue>。一次僅提交一個偏移。固然配置中默認設置爲自動提交(`conf.EnableAutoCommit = true;`),無需手動提交。 3. 參數IEnumerable<TopicPartitionOffset> offsets。直接提交到某一個位置。TopicPartitionOffset有三個決定性屬性:話題topic、分區:partition、偏移offset。
第三個函數就是咱們想要的,咱們只需獲得對應參數TopicPartitionOffset的值就能夠。
topic 是咱們惟一能夠肯定的。在IConsumer<TKey, TValue>.Assignment
中能夠獲得 topic 和 partition。但遺憾的是它只有不會當即有值。咱們只能主動去服務端獲取,在IAdminClient
中找到了可獲取該信息的方法,因此咱們作一擴展:
public static IEnumerable<TopicPartition> GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout) { using var adv = new AdminClientBuilder(config).Build(); var topPns = adv.GetTopicPartition(topic, timeout); return topPns; } public static IEnumerable<TopicPartition> GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout) { var mta = client.GetMetadata(timeout); var topicPartitions = mta.Topics .Where(t => topic == t.Topic) .SelectMany(t => t.Partitions.Select(tt => new TopicPartition(t.Topic, tt.PartitionId))) .ToList(); return topicPartitions; }
咱們還差 offset 的值,經過IConsumer<TKey, TValue>.QueryWatermarkOffsets
方法能夠查到當前水位,而其中 High 水位就是最新偏移。
如今咱們能夠完成咱們的任務了嗎?問題再次出現,雖然客戶端表現得從最新點消費了,可是在此以前的卡頓和相似與內存溢出讓人不得心安。Commit 仍是消費了全部消息:(,只不過暗搓搓的進行。在全部消息消費期間讀取全部未消費,而後拼命提交。客戶端哪有這麼大的內存和性能呢。最終,找到一個和第三個 commit 方法同樣接受參數的方法Assign
,一試果真靈驗。
public static void AssignOffsetToHighWatermark<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TopicPartition partition, TimeSpan timeout) { var water = consumer.QueryWatermarkOffsets(partition, timeout); if (water == null || water.High == 0) return; var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High); consumer.Assign(offset); }
最終的使用示例:
//... var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5)); topicPartitions?.ToList().ForEach(t => { eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5)); }); eventConsumer.Start();//在消費事件開始以前就能夠進行偏移設置 //...
請注意,若是您關閉了自動提交功能,而且不主動提交任何偏移信息,那麼服務端對該 group 的偏移記錄將一直不變,Assign 函數並不會改變任何服務的偏移記錄。
這一圈下來整個 kafka 的基本消費流程也就搞清楚了。kafka 消費者須要對消費的消息進行提交。事實上,每一個消息體裏都有偏移信息。不提交對於服務端來講就是客戶端沒有處理過該消息,將不會更改已消費偏移。以此來保證消息消費的可靠性。這和 tcp 中三次握手有殊途同歸之妙。
服務端保存着每個 groupid 對應的已經提交偏移Committed Offset
。固然客戶端不提交它是不會變動的(不考慮直接操做服務端的形式)。
客戶端保存本身的當前偏移Current Offset
,能夠經過Assign
和Commit
進行更改,兩者區別是Commit
將連同提交到服務端對應的偏移中進行更改,而Assign
僅改變客戶端偏移,這一更改記錄在IConsumer<TKey, TValue>.Assignment
中,首次啓動時客戶端異步向服務端請求Committed Offset
來對其賦值。這就是在 3.2 節中咱們沒有當即獲得該值的的緣由,該值將在可能在幾秒中後被賦值,因此寫了一個主動獲取的方法GetTopicPartition
。客戶端下一次消費將根據IConsumer<TKey, TValue>.Assignment
進行。
使用AdminClientBuilder.GetMetadata
函數能夠獲得對應話題的元數據,包括:topic、partition、Brokers 等。
使用IConsumer<TKey, TValue>.QueryWatermarkOffsets
函數能夠獲得當前服務端的水位,low 爲最先的偏移(可能不是 0,考慮消息過時被刪除的狀況),high 爲最新的偏移。