[奇思異想]使用RabbitMQ實現定時任務

背景

  工做中常常會有定時任務的需求,常見的作法能夠使用Timer、Quartz、Hangfire等組件,此次想嘗試下新的思路,使用RabbitMQ死信隊列的機制來實現定時任務,同時幫助再次瞭解RabbitMQ的死信隊列。html

 

交互流程

  

 

  1. 用戶建立定時任務git

  2. 往死信隊列插入一條消息,並設置過時時間爲首個任務執行時間github

  3. 死信隊列中的消息過時後,消息流向工做隊列mongodb

  4. 任務執行消費者監聽工做隊列,工做隊列向消費者推送消息數據庫

  5. 消費者查詢數據庫,讀取任務信息windows

  6. 消費者確認任務有效(未被撤銷),執行任務spa

  7. 消費者確認有下個任務,再往死信隊列插入一條消息,並設置過時時間爲任務執行時間3d

  8. 重複2-7的步驟,直到全部任務執行完成或任務撤銷日誌

 

環境準備

  請自行完成MongoDB和RabbitMQ的安裝,Windows、Linux、Docker皆可,如下提供Windows的安裝方法:code

  MongoDB:https://docs.mongodb.com/manual/tutorial/install-mongodb-on-windows/

  RabbitMQ:https://www.rabbitmq.com/install-windows.html

 

核心代碼

  1. (WebApi)建立任務,並根據設置建立子任務,把任務數據寫入數據庫

    var task = new Task
    {
        Name = form.Name,
        StartTime = form.StartTime,
        EndTime = form.EndTime,
        Interval = form.Interval,
        SubTasks = new List<SubTask>()
    };
    
    var startTime = task.StartTime;
    var endTime = task.EndTime;
    
    while ((endTime - startTime).TotalMinutes >= 0)
    {
        var sendTime = startTime;
        if (sendTime <= endTime && sendTime > DateTime.UtcNow)
        {
            task.SubTasks.Add(new SubTask { Id = ObjectId.GenerateNewId(), SendTime = sendTime });
        }
    
        startTime = startTime.AddMinutes(task.Interval);
    }
    
    await _mongoDbContext.Collection<Task>().InsertOneAsync(task);

 

  2. (WebApi)往死信隊列中寫入消息

    var timeFlag = task.SubTasks[0].SendTime.ToString("yyyy-MM-dd HH:mm:ssZ");
    var exchange = "Task";
    var queue = "Task";
    
    var index = 0;
    var pendingExchange = "PendingTask";
    var pendingQueue = $"PendingTask|Task:{task.Id}_{index}_{timeFlag}";
    
    using (var channel = _rabbitConnection.CreateModel())
    {
        channel.ExchangeDeclare(exchange, "direct", true);
        channel.QueueDeclare(queue, true, false, false);
        channel.QueueBind(queue, exchange, queue);
    
        var retryDic = new Dictionary<string, object>
        {
            {"x-dead-letter-exchange", exchange},
            {"x-dead-letter-routing-key", queue}
        };
    
        channel.ExchangeDeclare(pendingExchange, "direct", true);
        channel.QueueDeclare(pendingQueue, true, false, false, retryDic);
        channel.QueueBind(pendingQueue, pendingExchange, pendingQueue);
    
        var properties = channel.CreateBasicProperties();
        properties.Headers = new Dictionary<string, object>
        {
            ["index"] = index,
            ["id"] = task.Id.ToString(),
            ["sendtime"] = timeFlag
        };
    
        properties.Expiration = ((int)(task.SubTasks[0].SendTime - DateTime.UtcNow).TotalMilliseconds).ToString(CultureInfo.InvariantCulture);
        channel.BasicPublish(pendingExchange, pendingQueue, properties, Encoding.UTF8.GetBytes(string.Empty));
    }

  其中:

  PendingTask爲死信隊列Exchange,死信隊列的隊列名(Queue Name)會包含Task、index、timeFlag的信息,幫助跟蹤隊列和子任務,同時也起到惟一標識的做用。

  task.id爲任務Id

  index爲子任務下標

  timeFlag爲子任務執行時間

 

  3. (消費者)處理消息

    var exchange = "Task";
    var queue = "Task";
    
    _channel.ExchangeDeclare(exchange, "direct", true);
    _channel.QueueDeclare(queue, true, false, false);
    _channel.QueueBind(queue, exchange, queue);
    
    var consumer = new EventingBasicConsumer(_channel);
   //監聽處理 consumer.Received
+= (model, ea) => {
     //獲取消息頭信息
var index = (int)ea.BasicProperties.Headers["index"]; var id = (ea.BasicProperties.Headers["id"] as byte[]).BytesToString(); var timeFlag = (ea.BasicProperties.Headers["sendtime"] as byte[]).BytesToString();

   //刪除臨時死信隊列 _channel.QueueDelete($
"PendingTask|Task:{id}_{index}_{timeFlag}", false, true); var taskId = new ObjectId(id); var task = _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefault();

     //撤銷或已完成的任務不執行
if (task == null || task.Status != TaskStatus.Normal) { _channel.BasicAck(ea.DeliveryTag, false); return; }
     //執行任務 _logger.LogInformation($
"[{DateTime.UtcNow}]執行任務...");
//設置子任務已完成 task.SubTasks[index].IsSent
= true; if (task.SubTasks.Count > index + 1) //還有未完成的子任務,把下個子任務的信息寫入死信隊列 { PublishPendingMsg(_channel, task, index + 1); } else { task.Status = TaskStatus.Finished; //全部子任務執行完畢,設置任務狀態爲完成 } _mongoDbContext.Collection<Task>().ReplaceOne(n => n.Id == taskId, task); //更新任務狀態 _channel.BasicAck(ea.DeliveryTag, false); }; _channel.BasicConsume(queue, false, consumer);

 

  4. (WebApi)撤銷任務,更新任務狀態便可

    var taskId = new ObjectId(id);
    var task = await _mongoDbContext.Collection<Task>().Find(n => n.Id == taskId).SingleOrDefaultAsync();
    if (task == null)
    {
        return NotFound(new { message = "任務不存在!" });
    }
    
    task.Status = TaskStatus.Canceled;
    await _mongoDbContext.Collection<Task>().FindOneAndReplaceAsync(n => n.Id == taskId, task);

 

效果展現

   1. 先使用控制檯把消費者啓動起來。

  

 

  2. 建立任務

  啓動WebApi,建立一個任務,開始時間爲2019-07-16T07:55:00.000Z,結束時間爲2019-07-16T07:59:00.000Z,執行時間間隔1分鐘:

  

 

  任務與相應的子任務也寫入了MongoDB,這裏假設子任務多是郵件發送任務:

  

     

  建立了一個臨時死信隊列,隊列名稱包含任務Id,子任務下標、以及子任務執行時間,並往其寫入一條消息:

  

 

  3. 執行(子)任務

  從日誌內容能夠看出,(子)任務正常執行:

  

  子任務狀態也標註爲已發送

  

 

  同時也往消息隊列寫入了下一個子任務的消息:

  

  

  4. 撤銷任務

   

 

  任務狀態被置爲已撤銷:

  

 

  任務沒再繼續往下執行:

  

   

  消息隊列中的臨時隊列被刪除,消息也被消費完

  

 

源碼地址

  https://github.com/ErikXu/rabbit-scheduler

相關文章
相關標籤/搜索