工做中常常會有定時任務的需求,常見的作法能夠使用Timer、Quartz、Hangfire等組件,此次想嘗試下新的思路,使用RabbitMQ死信隊列的機制來實現定時任務,同時幫助再次瞭解RabbitMQ的死信隊列。html
1. 用戶建立定時任務git
2. 往死信隊列插入一條消息,並設置過時時間爲首個任務執行時間github
3. 死信隊列中的消息過時後,消息流向工做隊列mongodb
4. 任務執行消費者監聽工做隊列,工做隊列向消費者推送消息數據庫
5. 消費者查詢數據庫,讀取任務信息windows
6. 消費者確認任務有效(未被撤銷),執行任務spa
7. 消費者確認有下個任務,再往死信隊列插入一條消息,並設置過時時間爲任務執行時間3d
8. 重複2-7的步驟,直到全部任務執行完成或任務撤銷日誌
請自行完成MongoDB和RabbitMQ的安裝,Windows、Linux、Docker皆可,如下提供Windows的安裝方法:code
MongoDB:https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/
RabbitMQ:https://www.rabbitmq.com/install-windows.html
1. (WebApi)建立任務,並根據設置建立子任務,把任務數據寫入數據庫
var task = new Task { Name = form.Name, StartTime = form.StartTime, EndTime = form.EndTime, Interval = form.Interval, SubTasks = new List<SubTask>() }; var startTime = task.StartTime; var endTime = task.EndTime; while ((endTime - startTime).TotalMinutes >= 0) { var sendTime = startTime; if (sendTime <= endTime && sendTime > DateTime.UtcNow) { task.SubTasks.Add(new SubTask { Id = ObjectId.GenerateNewId(), SendTime = sendTime }); } startTime = startTime.AddMinutes(task.Interval); } await _mongoDbContext.Collection<Task>().InsertOneAsync(task);
2. (WebApi)往死信隊列中寫入消息
var timeFlag = task.SubTasks[0].SendTime.ToString("yyyy-MM-dd HH:mm:ssZ"); var exchange = "Task"; var queue = "Task"; var index = 0; var pendingExchange = "PendingTask"; var pendingQueue = $"PendingTask|Task:{task.Id}_{index}_{timeFlag}"; using (var channel = _rabbitConnection.CreateModel()) { channel.ExchangeDeclare(exchange, "direct", true); channel.QueueDeclare(queue, true, false, false); channel.QueueBind(queue, exchange, queue); var retryDic = new Dictionary<string, object> { {"x-dead-letter-exchange", exchange}, {"x-dead-letter-routing-key", queue} }; channel.ExchangeDeclare(pendingExchange, "direct", true); channel.QueueDeclare(pendingQueue, true, false, false, retryDic); channel.QueueBind(pendingQueue, pendingExchange, pendingQueue); var properties = channel.CreateBasicProperties(); properties.Headers = new Dictionary<string, object> { ["index"] = index, ["id"] = task.Id.ToString(), ["sendtime"] = timeFlag }; properties.Expiration = ((int)(task.SubTasks[0].SendTime - DateTime.UtcNow).TotalMilliseconds).ToString(CultureInfo.InvariantCulture); channel.BasicPublish(pendingExchange, pendingQueue, properties, Encoding.UTF8.GetBytes(string.Empty)); }
其中:
PendingTask爲死信隊列Exchange,死信隊列的隊列名(Queue Name)會包含Task、index、timeFlag的信息,幫助跟蹤隊列和子任務,同時也起到惟一標識的做用。
task.id爲任務Id
index爲子任務下標
timeFlag爲子任務執行時間
3. (消費者)處理消息
var exchange = "Task"; var queue = "Task"; _channel.ExchangeDeclare(exchange, "direct", true); _channel.QueueDeclare(queue, true, false, false); _channel.QueueBind(queue, exchange, queue); var consumer = new EventingBasicConsumer(_channel);
//監聽處理 consumer.Received += (model, ea) => {
//獲取消息頭信息 var index = (int)ea.BasicProperties.Headers["index"]; var id = (ea.BasicProperties.Headers["id"] as byte[]).BytesToString(); var timeFlag = (ea.BasicProperties.Headers["sendtime"] as byte[]).BytesToString();
//刪除臨時死信隊列 _channel.QueueDelete($"PendingTask|Task:{id}_{index}_{timeFlag}", false, true); var taskId = new ObjectId(id); var task = _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefault();
//撤銷或已完成的任務不執行 if (task == null || task.Status != TaskStatus.Normal) { _channel.BasicAck(ea.DeliveryTag, false); return; }
//執行任務 _logger.LogInformation($"[{DateTime.UtcNow}]執行任務...");
//設置子任務已完成 task.SubTasks[index].IsSent = true; if (task.SubTasks.Count > index + 1) //還有未完成的子任務,把下個子任務的信息寫入死信隊列 { PublishPendingMsg(_channel, task, index + 1); } else { task.Status = TaskStatus.Finished; //全部子任務執行完畢,設置任務狀態爲完成 } _mongoDbContext.Collection<Task>().ReplaceOne(n => n.Id == taskId, task); //更新任務狀態 _channel.BasicAck(ea.DeliveryTag, false); }; _channel.BasicConsume(queue, false, consumer);
4. (WebApi)撤銷任務,更新任務狀態便可
var taskId = new ObjectId(id); var task = await _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefaultAsync(); if (task == null) { return NotFound(new { message = "任務不存在!" }); } task.Status = TaskStatus.Canceled; await _mongoDbContext.Collection<Task>().FindOneAndReplaceAsync(n => n.Id == taskId, task);
1. 先使用控制檯把消費者啓動起來。
2. 建立任務
啓動WebApi,建立一個任務,開始時間爲2019-07-16T07:55:00.000Z,結束時間爲2019-07-16T07:59:00.000Z,執行時間間隔1分鐘:
任務與相應的子任務也寫入了MongoDB,這裏假設子任務多是郵件發送任務:
建立了一個臨時死信隊列,隊列名稱包含任務Id,子任務下標、以及子任務執行時間,並往其寫入一條消息:
3. 執行(子)任務
從日誌內容能夠看出,(子)任務正常執行:
子任務狀態也標註爲已發送
同時也往消息隊列寫入了下一個子任務的消息:
4. 撤銷任務
任務狀態被置爲已撤銷:
任務沒再繼續往下執行:
消息隊列中的臨時隊列被刪除,消息也被消費完