The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queuegit
消息dequeue時增長auto completegithub
public static async Task MessageDequeueAsync()
{
// Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
// Maximum number of Concurrent calls to the callback ProcessMessagesAsync
, set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,windows
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback. // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below. AutoComplete = false }; // Register the queue message handler and receive messages in a loop queueClient.RegisterMessageHandler( async (message, token) => { // Process the message await PostMessageToBot(message); loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}"); // Complete the message so that it is not received again. // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode. await queueClient.CompleteAsync(message.SystemProperties.LockToken); }, //async (exceptionEvent) => //{ // // Process the exception // loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.Exception.Message}"); //} messageHandlerOptions); }
2.在Azure上設置duplication detect時間,由1~59s,設置爲55s,大筆試lock duration 時間長,可供消費的時間。app
第一次使用消息隊列,遇到了一些問題:同一個消息有屢次出列。是一個消息只入列一次,仍是屢次?仍是由於出列問題,出列了屢次?async
Azure service bus queue在Azure上建立一個service bus,在service bus 上建立一個 queue,建立的時候注意 enable duplicate detected message這個選項選上,防止消息重複入列。oop
找到SAS Policy: RootManageSharedAccessKey 的值,複製下來,用做connectstring。性能
Azure service bus sample on githubspa
namespace:Microsoft.Azure.ServiceBus;
初始化queueClient:.net
private static readonly Serilog.ILogger loggerCore = LoggerCoreFactory.GetLoggerCore(); const string ServiceBusConnectionString = "Endpoint=sb://{YOUR-NAMESPACE-ON-Azure}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR-KEYS}"; const string QueueName = "{YOUR-QUEUE-NAME}"; static IQueueClient queueClient; static MessageQueueHandler() { queueClient = new QueueClient(ServiceBusConnectionString, QueueName); }
這裏添加messageID 做爲入列依據,防止同一消息屢次入列,而後不用頻繁的關閉消息鏈接,影響性能。code
public static async Task MessageEnqueueAsync(string message) { // Send messages. await SendMessagesAsync(message); // don't close frequently //await queueClient.CloseAsync(); } static async Task SendMessagesAsync(string messageXML) { try { // Create a new message to send to the queue. var messageStr = ParseMessageType.Parse(messageXML); var msgId = messageStr.Body.MsgId.Value; var message = new Microsoft.Azure.ServiceBus.Message { MessageId = msgId,// avoid same message enqueue more than once Body = Encoding.UTF8.GetBytes(messageXML), }; loggerCore.Information($"message enqueue:{messageXML}"); // Send the message to the queue. await queueClient.SendAsync(message); } catch (Exception exception) { loggerCore.Error($"message enqueue error:{exception.ToString()}"); } }
public static async Task MessageDequeueAsync() { // please choice PeekLock mode queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock); // Register the queue message handler and receive messages in a loop queueClient.RegisterMessageHandler( async (message, token) => { // Process the message await PostMessageToBot(message); loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}"); // Complete the message so that it is not received again. // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode. await queueClient.CompleteAsync(message.SystemProperties.LockToken); }, async (exceptionEvent) => { // Process the exception loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.ToString()}"); }); }
這樣消息就會根據入列的前後,逐次出列。
幾個有幫助的連接分享一下:
1.Read best practice. 就是如何最好的使用Azure queue,避免不須要的開銷。
2.Enable queue duplicate detection 啓用消息重複檢測項,防止同一消息屢次額enqueue。