上個章節咱們講了kafka的環境安裝(這裏),如今主要來了解下Kafka使用,基於.net實現kafka的消息隊列應用,本文用的是Confluent.Kafka,版本0.11.6html
在NuGet程序包中搜索「Confluent.Kafka」下載安裝便可json
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using Confluent.Kafka; 5 using Confluent.Kafka.Serialization; 6 7 namespace KafKa 8 { 9 /// <summary> 10 /// Kafka消息生產者 11 /// </summary> 12 public sealed class KafkaProducer 13 { 14 /// <summary> 15 /// 生產消息併發送消息 16 /// </summary> 17 /// <param name="broker">kafka的服務器地址</param> 18 /// <param name="topic">kafka的消息主題名稱</param> 19 /// <param name="partion">分區</param> 20 /// <param name="message">須要傳送的消息</param> 21 public bool Produce(string broker, string topic, int partion, string message) 22 { 23 bool result = false; 24 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 25 { 26 throw new ArgumentNullException("Kafka消息服務器地址不能爲空!"); 27 } 28 29 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 30 { 31 throw new ArgumentNullException("消息所屬的主題不能爲空!"); 32 } 33 34 if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0) 35 { 36 throw new ArgumentNullException("消息內容不能爲空!"); 37 } 38 39 var config = new Dictionary<string, object> 40 { 41 { "bootstrap.servers", broker } 42 }; 43 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 44 { 45 var deliveryReport = producer.ProduceAsync(topic, null, message, partion); 46 deliveryReport.ContinueWith(task => 47 { 48 if (task.Result.Error.Code == ErrorCode.NoError) 49 { 50 result = true; 51 } 52 //能夠在控制檯使用如下語句 53 //Console.WriteLine("Producer:" + producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value); 54 }); 55 56 producer.Flush(TimeSpan.FromSeconds(10)); 57 } 58 return result; 59 } 60 } 61 }
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using System.Threading; 5 using Confluent.Kafka; 6 using Confluent.Kafka.Serialization; 7 8 namespace KafKa 9 { 10 /// <summary> 11 /// Kafka消息消費者 12 /// </summary> 13 public sealed class KafkaConsumer 14 { 15 #region 私有字段 16 17 private bool isCancelled; 18 19 #endregion 20 21 #region 構造函數 22 23 /// <summary> 24 /// 構造函數,初始化IsCancelled屬性 25 /// </summary> 26 public KafkaConsumer() 27 { 28 isCancelled = false; 29 } 30 31 #endregion 32 33 #region 屬性 34 35 /// <summary> 36 /// 是否應該取消繼續消費Kafka的消息,默認值是false,繼續消費消息 37 /// </summary> 38 public bool IsCancelled 39 { 40 get { return isCancelled; } 41 set { isCancelled = value; } 42 } 43 44 #endregion 45 46 #region 同步版本 47 48 /// <summary> 49 /// 指定的組別的消費者開始消費指定主題的消息 50 /// </summary> 51 /// <param name="broker">Kafka消息服務器的地址</param> 52 /// <param name="topic">Kafka消息所屬的主題</param> 53 /// <param name="groupID">Kafka消費者所屬的組別</param> 54 /// <param name="action">能夠對已經消費的消息進行相關處理</param> 55 public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null) 56 { 57 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 58 { 59 throw new ArgumentNullException("Kafka消息服務器的地址不能爲空!"); 60 } 61 62 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 63 { 64 throw new ArgumentNullException("消息所屬的主題不能爲空!"); 65 } 66 67 if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0) 68 { 69 throw new ArgumentNullException("用戶分組ID不能爲空!"); 70 } 71 72 var config = new Dictionary<string, object> 73 { 74 { "bootstrap.servers", broker }, 75 { "group.id", groupID }, 76 { "enable.auto.commit", true }, // this is the default 77 { "auto.commit.interval.ms", 5000 }, 78 { "statistics.interval.ms", 60000 }, 79 { "session.timeout.ms", 6000 }, 80 { "auto.offset.reset", "smallest" } 81 }; 82 83 84 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 85 { 86 if (action != null) 87 { 88 consumer.OnMessage += (_, message) => { 89 ConsumerResult messageResult = new ConsumerResult(); 90 messageResult.Broker = broker; 91 messageResult.Topic = message.Topic; 92 messageResult.Partition = message.Partition; 93 messageResult.Offset = message.Offset.Value; 94 messageResult.Message = message.Value; 95 96 //執行外界自定義的方法 97 action(messageResult); 98 }; 99 } 100 101 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset); 102 103 consumer.OnError += (_, error) => Console.WriteLine("Error:" + error); 104 105 //引起反序列化錯誤或消費消息出現錯誤!= NoError。 106 consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error); 107 108 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets); 109 110 // 當消費者被分配一組新的分區時引起。 111 consumer.OnPartitionsAssigned += (_, partitions) => 112 { 113 Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId); 114 //若是您未向OnPartitionsAssigned事件添加處理程序,則會自動執行如下.Assign調用。 若是你爲它添加了事件處理程序,你必須明確地調用.Assign以便消費者開始消費消息。 115 consumer.Assign(partitions); 116 }; 117 118 // Raised when the consumer's current assignment set has been revoked. 119 //當消費者的當前任務集已被撤銷時引起。 120 consumer.OnPartitionsRevoked += (_, partitions) => 121 { 122 Console.WriteLine("Revoked Partitions:" + partitions); 123 // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions. 124 //若是您未向OnPartitionsRevoked事件添加處理程序,則下面的.Unassign調用會自動發生。 若是你爲它增長了事件處理程序,你必須明確地調用.Usessign以便消費者中止從它先前分配的分區中消費消息。 125 consumer.Unassign(); 126 }; 127 128 //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json); 129 130 consumer.Subscribe(topic); 131 132 //Console.WriteLine("Subscribed to:" + consumer.Subscription); 133 134 while (!IsCancelled) 135 { 136 consumer.Poll(TimeSpan.FromMilliseconds(100)); 137 } 138 } 139 } 140 141 #endregion 142 143 #region 異步版本 144 145 /// <summary> 146 /// 指定的組別的消費者開始消費指定主題的消息 147 /// </summary> 148 /// <param name="broker">Kafka消息服務器的地址</param> 149 /// <param name="topic">Kafka消息所屬的主題</param> 150 /// <param name="groupID">Kafka消費者所屬的組別</param> 151 /// <param name="action">能夠對已經消費的消息進行相關處理</param> 152 public void ConsumeAsync(string broker, string topic, string groupID, Action<ConsumerResult> action = null) 153 { 154 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 155 { 156 throw new ArgumentNullException("Kafka消息服務器的地址不能爲空!"); 157 } 158 159 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 160 { 161 throw new ArgumentNullException("消息所屬的主題不能爲空!"); 162 } 163 164 if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0) 165 { 166 throw new ArgumentNullException("用戶分組ID不能爲空!"); 167 } 168 169 ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Action = action }); 170 } 171 172 #endregion 173 174 #region 兩種提交Offsets的版本 175 176 /// <summary> 177 /// Kafka消息隊列服務器自動提交offset 178 /// </summary> 179 /// <param name="state">消息消費者信息</param> 180 private void KafkaAutoCommittedOffsets(object state) 181 { 182 ConsumerSetting setting = state as ConsumerSetting; 183 184 var config = new Dictionary<string, object> 185 { 186 { "bootstrap.servers", setting.Broker }, 187 { "group.id", setting.GroupID }, 188 { "enable.auto.commit", true }, // this is the default 189 { "auto.commit.interval.ms", 5000 }, 190 { "statistics.interval.ms", 60000 }, 191 { "session.timeout.ms", 6000 }, 192 { "auto.offset.reset", "smallest" } 193 }; 194 195 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 196 { 197 if (setting.Action != null) 198 { 199 consumer.OnMessage += (_, message) => 200 { 201 ConsumerResult messageResult = new ConsumerResult(); 202 messageResult.Broker = setting.Broker; 203 messageResult.Topic = message.Topic; 204 messageResult.Partition = message.Partition; 205 messageResult.Offset = message.Offset.Value; 206 messageResult.Message = message.Value; 207 208 //執行外界自定義的方法 209 setting.Action(messageResult); 210 }; 211 } 212 213 //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}"); 214 215 //能夠寫日誌 216 //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error); 217 218 //能夠寫日誌 219 //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}"); 220 221 consumer.Subscribe(setting.Topic); 222 223 while (!IsCancelled) 224 { 225 consumer.Poll(TimeSpan.FromMilliseconds(100)); 226 } 227 } 228 } 229 230 /// <summary> 231 /// Kafka消息隊列服務器手動提交offset 232 /// </summary> 233 /// <param name="state">消息消費者信息</param> 234 private void KafkaManuallyCommittedOffsets(object state) 235 { 236 ConsumerSetting setting = state as ConsumerSetting; 237 238 var config = new Dictionary<string, object> 239 { 240 { "bootstrap.servers", setting.Broker }, 241 { "group.id", setting.GroupID }, 242 { "enable.auto.commit", false },//不是自動提交的 243 { "auto.commit.interval.ms", 5000 }, 244 { "statistics.interval.ms", 60000 }, 245 { "session.timeout.ms", 6000 }, 246 { "auto.offset.reset", "smallest" } 247 }; 248 249 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 250 { 251 //能夠寫日誌 252 //consumer.OnError += (_, error) => Console.WriteLine("Error:"+error); 253 254 //能夠寫日誌 255 // Raised on deserialization errors or when a consumed message has an error != NoError. 256 //consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error:"+error); 257 258 consumer.Subscribe(setting.Topic); 259 260 Message<Ignore, string> message = null; 261 262 while (!isCancelled) 263 { 264 if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100))) 265 { 266 continue; 267 } 268 269 if (setting.Action != null) 270 { 271 ConsumerResult messageResult = new ConsumerResult(); 272 messageResult.Broker = setting.Broker; 273 messageResult.Topic = message.Topic; 274 messageResult.Partition = message.Partition; 275 messageResult.Offset = message.Offset.Value; 276 messageResult.Message = message.Value; 277 278 //執行外界自定義的方法 279 setting.Action(messageResult); 280 } 281 282 if (message.Offset % 5 == 0) 283 { 284 var committedOffsets = consumer.CommitAsync(message).Result; 285 //Console.WriteLine("Committed offset:"+committedOffsets); 286 } 287 } 288 } 289 } 290 291 #endregion 292 } 293 }
1 using System; 2 using KafKa; 3 4 namespace ConsoleProducer 5 { 6 class Program 7 { 8 static void Main(string[] args) 9 { 10 while (true) 11 { 12 var message = Console.ReadLine(); 13 var producer = new KafkaProducer(); 14 producer.Produce("localhost:9092", "test", 0, message); 15 } 16 17 Console.ReadKey(); 18 } 19 } 20 }
1 using System; 2 using System.Collections.Generic; 3 using KafKa; 4 5 namespace ConsoleConsumer 6 { 7 class Program 8 { 9 static void Main(string[] args) 10 { 11 var dts = new List<TimeSpan>(); 12 13 var consumer = new KafkaConsumer(); 14 consumer.ConsumeAsync("localhost:9092", "test", "0", result => 15 { 16 Console.WriteLine(result.Message); 17 }); 18 19 Console.ReadKey(); 20 } 21 } 22 }
經過以上步驟運行producer控制檯,發送消息回車,在consumer控制檯就能夠接收到消息了。bootstrap
那麼咱們如何經過多個consumer來消費消息呢,kafka默認採用的是range分配方法,即平均分配分區。首先注意在建立topic的命令行時建立多個分區(--partitions 5),這裏咱們建立了5個分區,在發送消息時選擇不一樣的分區發送(0-5),打開5個consumer控制檯(注意要同一個分組),咱們會發現5個consumer會分別消費對應分區的消息服務器