基於Redis實現分佈式消息隊列

基於Redis實現分佈式消息隊列
原文地址:http://blog.csdn.net/stationxp/article/details/45731497)
一、爲何須要消息隊列?
當系統中出現「生產「和「消費「的速度或穩定性等因素不一致的時候,就須要消息隊列,做爲抽象層,彌合雙方的差別。
舉個例子:業務系統觸發短信發送申請,但短信發送模塊速度跟不上,須要未來不及處理的消息暫存一下,緩衝壓力。 
再舉個例子:調遠程系統下訂單成本較高,且由於網絡等因素,不穩定,攢一批一塊兒發送。 
再舉個栗子,交互模塊5:00到24:00和電商系統聯通,和內部ERP斷開。1:00到4:00和ERP聯通,和電商系統斷開。 
再舉個例子,服務員點菜快,廚師作菜慢。 
再舉個例子,到銀行辦事的人多,提供服務的窗口少。 
乖乖排隊吧。
二、使用消息隊列有什麼好處?
2.一、提升系統響應速度
使用了消息隊列,生產者一方,把消息往隊列裏一扔,就能夠立馬返回,響應用戶了。無需等待處理結果。
處理結果可讓用戶稍後本身來取,如醫院取化驗單。也可讓生產者訂閱(如:留下手機號碼或讓生產者實現listener接口、加入監聽隊列),有結果了通知。得到約定將結果放在某處,無需通知。
2.二、提升系統穩定性
考慮電商系統下訂單,發送數據給生產系統的狀況。 
電商系統和生產系統之間的網絡有可能掉線,生產系統可能會因維護等緣由暫停服務。
若是不使用消息隊列,電商系統數據發佈出去,顧客沒法下單,影響業務開展。 
兩個系統間不該該如此緊密耦合。應該經過消息隊列解耦。同時讓系統更健壯、穩定。
三、爲何須要分佈式?
3.一、多系統協做須要分佈式
消息隊列中的數據須要在多個系統間共享數據才能發揮價值。 
因此必須提供分佈式通訊機制、協同機制。
3.二、單系統內部署環境須要分佈式
單系統內部,爲了更好的性能、爲了不單點故障,多爲集羣環境。 
集羣環境中,應用運行在多臺服務器的多個JVM中;數據也保存在各類類型的數據庫或非數據庫的多個節點上。 
爲了知足多節點協做須要,須要提供分佈式的解決方案。
四、分佈式環境下須要解決哪些問題
4.一、併發問題
需進行良好的併發控制。確保「線程安全「。
不要出現一個訂單被出貨兩次。不要出現顧客A下的單,發貨發給了顧客B等狀況。
4.二、簡單的、統一的操做機制
需定義簡單的,語義明確的,業務無關的,恰當穩妥的統一的訪問方式。
4.三、容錯
控制好單點故障,確保數據安全。
4.四、可橫向擴展
可便捷擴容。
五、如何實現?
成熟的消息隊列中間件產品太多了,族繁不及備載。
成熟產品通過驗證,接口規範,可擴展性強。
結合事業環境因素、組織過程遺產、實施運維考慮、技術路線考慮、開發人員狀況等緣由綜合考慮,基於Redis本身作一個是最可行的選擇。
一、消息隊列需提供哪些功能?
在功能設計上,我崇尚奧卡姆剃刀法則。 
對於消息隊列,只須要兩個方法: 生產 和 消費。 
具體的業務場景是任務隊列,代碼設計以下:
public abstract class TaskQueue{
private final String name ;
public String getName(){return this.name;}html

public abstract void addTask(Serializable taskId);
public abstract Serializable popTask();

}
同時支持多個隊列,每一個隊列都應該有個名字。final確保TaskQueue是線程安全的。TaskQueue的實現類也應該確保線程安全。
addTask向隊列中添加一個任務。隊列中僅保存任務的id,不存儲任務的業務數據。
popTask從隊列中取出一個任務來執行。 
這種設計不是特別友好,由於她須要調用者自行保證任務執行成功,若是執行失敗,自行確保從新把任務放回隊列。 不管如何,這種機制是能夠工做的。想一想奧卡姆剃刀法則,咱們先按照這個設計實現出來看看。 
若是調用者把業務數據存在數據庫中,業務數據中包含「狀態「列,標識任務是否被執行,調用者須要自行管理這個狀態,並控制事務。
popTask採用阻塞方式,仍是非阻塞方式呢? 
若是採用阻塞方式,隊列中沒任務的時候,客戶端不會斷開鏈接,只是等。 
通常狀況下,客戶端會有多個worker搶着幹活兒,幾條狼一塊兒等一個肉包子,畫面太美。鏈接是重要資源,若是一直沒活兒幹,先放回池裏,也不錯。 
先採用非阻塞的方式吧,若是隊列是空的,popTask返回null,當即返回。
二、後續可能提供的功能
2.一、引入Task生命週期概念
應用場景不一樣,需求也不一樣。 
在嚴格的應用場景中,須要確保每一個Task執行「成功「了。 
對於上面提到的popTask後無論的「模式「,這是另一種「運行模式「,兩種模式能夠並行存在。
在這種新模式下,Task狀態有3種:新建立(new,剛調用addTask加到隊列中)、正在執行(in-process,調用popTask後,調用finish前)、完成(done,執行OK了,調用finishTask後)。 
調整後的代碼以下:
public abstract class TaskQueue{redis

private final String name ;
public String getName(){return this.name;}

public abstract int getMode();

public abstract void addTask(Serializable taskId);
public abstract Serializable popTask();
public abstract void finishTask(Serializable taskId);

}
2.二、增長批量取出任務的功能
popTask()一次取出一個任務,太磨嘰了。 
比如咱們要買5瓶水,開車去超市買,每去一次買1瓶,有點兒啥。 
咱們須要一個一次取多個任務的方法。
public abstract class TaskQueue{
... ...
public abstract Serializable[] popTasks(long cnt);
}
2.三、增長阻塞等待機制
想象一種場景: 
小明同窗,取出一個任務,發現幹不了,放回隊列,再去取,取出來發現仍是幹不了,又放回去。反反覆覆。 
小明童鞋腫麼了?多是他幹活須要網絡,網絡斷了。多是他作任務須要寫磁盤,磁盤滿了。
若是小明像鄰居家的孩子同樣優秀,當他發現哪裏不對的時候,他應該冷靜下來,歇會兒。
但他萬一不是呢?只有咱們能幫他了。
假如隊列中有10000個待辦任務。 
這時候小明來了。他失敗100次後,咱們應該攔他嗎?不該該,除非他主動要求(在系統參數中配置)。5000次後呢?也不該該,除非他主動要求。咱們的原則是:咱們作的全部事情,對於調用者,都是能夠預期的。
咱們能夠在系統參數中要求調用者設置一個閥值N,若是不設置,默認爲100。連續失敗N次後,讓調用者睡一下子,睡多長時間,讓調用者配置。
假如咱們的底層實現中包含待辦子隊列、重作子隊列和完成子隊列(這種設計好複雜!pop的時候先pop重作,仍是先pop待辦,複雜死了!希望不須要這樣)。 
待辦子隊列中有10000個任務。
在小明失敗10000次後,全部的任務都在重作子隊列了。這時候咱們應該攔他嗎? 
重作子隊列要不要設置大小,超過以後,讓下一個訪問者等。 
等的話就會涉及超時,超時後,任務也不能丟棄。 
太複雜 了!設置一個連續失敗次數的限制就夠了!
2.四、考慮增長Task類
不保存任務的相關數據是基本原則,絕對不動搖。 
增長Task類能夠管理下生命週期,更有用的是,能夠把Task自己設計成Listener,代碼大概時這樣的:
public abstract class Task{數據庫

public Serializable getId();
public int getState();

pubic void doTask();

public void whenAdded(final TaskQueue tq);
public void whenPoped(final TaskQueue tq);
// public void whenFaild(final TaskQueue tq);
public void whenFinished(final TaskQueue tq);

}
經過Task接口,咱們能夠對調用過程進行更強勢的管理(如進行事務控制),對調用者施加更強的控制,用戶也能夠得到更多的交互機會,同TaskQueue有更好的交互(如在whenFinished中作持久化工做)。
但這些真的有必要嗎?是否是太侵入了?註解的方式會好些嗎? 
再考慮吧。
2.五、增長系統參數
貌似須要個Config類了,不爽! 
原本想作一個很小很精緻的小東西的,若是必須再加吧。 
若是作的話,須要支持properties、註解設置、api方式設置、Spring注入式設置,煩。
次回預告:Redis自己機制和TaskQueue的契合。
基於Redis實現分佈式消息隊列(3)
一、Redis是什麼鬼?
Redis是一個簡單的,高效的,分佈式的,基於內存的緩存工具。 
假設好服務器後,經過網絡鏈接(相似數據庫),提供Key-Value式緩存服務。
簡單,是Redis突出的特點。 
簡單能夠保證核心功能的穩定和優異。
二、性能
性能方面:Redis是足夠高效的。 
和Memecached對比,在數據量較小大狀況下,Redis性能更優秀。 
數據量大到必定程度的時候,Memecached性能稍好。
簡單結論:但整體上講Redis性能已經足夠好。
// Ref: Redis性能測試 http://www.cnblogs.com/lulu/archive/2013/06/10/3130878.html 
原則:Value大小不要超過1390Byte。
經實驗得知: 
List操做和字符串操做性能至關,略差,幾乎能夠忽略。 
使用Jedis自帶pool,「每次從pool中取用完放回「 和 「重用單個鏈接「 相比,平均用時是3倍。這部分須要繼續研究底層機制,採用更合理的實驗方法進一步得到數據。 
使用Jedis自帶pool,性能上是知足當前訪問量須要的,等有時間了再進一步深刻。
三、數據類型
Redis支持5種數據類型:字符串、Map、List、Set、Sorted Set。 
List特別適合用於實現隊列。提供的操做包括: 
從左側(或右側)放入一個元素,從右側(或左側)取出一個元素,讀取某個範圍的元素,刪除某個範圍的元素。
Sorted Set中元素是惟一的,能夠經過名字找。 
Map能夠高效地經過key找。 
假如咱們須要實現finishTash(taskId),須要經過名字在隊列中找元素,上面兩個可能會用到。
四、原子操做
實現分佈式隊列首要問題是:不能出現併發問題。
Redis是底層是單線程的,命令執行是原子操做,支持事務,契合了咱們的需求。
Redis直接提供的命令都是原子操做,包括lpush、rpop、blpush、brpop等。
Redis支持事務。經過相似 begin…[cancel]…commit的語法,提供begin…commit之間的命令爲原子操做的功能,之間命令對數據的改變對其餘操做是不可見的。相似關係型數據庫中的存儲過程,同時提供了最高級別的事務隔離級別。
Redis支持腳本,每一個腳本的執行是原子性的。
作了一下併發測試: 
寫了個小程序,隨機對List作push或pop操做,push的比pop的稍多。 
記錄每次處理的詳細信息到數據庫。 
最後把List中數據都pop出來,詳細記錄每次pop詳細信息。 
統計push和pop是否相等,統計針對每條數據是否都有push和pop。 
500併發,沒有出現併發問題。
五、集羣
實現分佈式隊列另外一個重要問題是:不能出現單點故障。
Redis支持Master-Slave數據複製,從服務器設置 slave-of master-ip:port 便可。 
集羣功能能夠由客戶端提供。 
客戶端使用哨兵,可自動切換主服務器。
因爲隊列操做都是寫操做,從服務器主要目的是備份數據,保證數據安全。
若是想基於 sharding 作多master集羣,能夠結合 zookeeper 本身作。
Redis 3.0支持集羣了,還沒細看,應該是個好消息,等你們都用起來,沒什麼問題的話,能夠考慮試試看。
若是 master 宕掉,怎麼辦? 
「哨兵」會選出一個新的master來。產生過程當中,消息隊列暫停服務。 
最極端的狀況,全部Redis都停了,當消息隊列發現Redis中止響應時,對業務系統的請求應拋出異常,中止隊列服務。 
這樣會影響業務,業務系統下訂單、審批等操做會失敗。若是能夠接受,這是一種方案。 
Redis整個集羣宕掉,這種狀況不多發生,若是真發生了,業務系統中止服務也是能夠理解的。
若是想要在Redis整個集羣宕掉的狀況下,消息隊列仍繼續提供服務。 
方法是這樣的: 
啓用備用存儲機制,能夠是zookeeper、能夠是關係型數據庫、能夠是另外可用的Memecached等。 
本地內存存儲是不可取的,首先,同步多個客戶端虛擬機內存數據太複雜,至關於本身實現了一個Redis,其次,保證內存數據存儲安全太複雜。 
備用存儲機制至關於實現了另一個版本的消息隊列,邏輯一致,底層存儲不一樣。這個實現能夠性能低一些,保證最基本的原則便可。 
想要保證不出現併發問題,因爲消息隊列程序同時運行在多個虛擬機中,對象鎖、方法鎖無效。須要有一個獨立於虛擬機的鎖機制,zookeeper是個好選擇。 
將關係型數據庫設置爲最高級別的事務隔離級別,太傻了。除了zk有其餘好辦法嗎?
Redis集羣整個宕掉的同時Zookeeper也全軍覆沒怎麼辦? 
這個問題是沒有盡頭的,提供了第二備用存儲、第三備用存儲、第四備用存儲、…,理論上也會同時宕掉,那時候怎麼辦? 
有錢任性的土豪能夠繼續,預算有限的狀況,能作到哪步就作到哪步。
六、持久化
分佈式隊列的應用場景和緩存的應用場景是不同的。
若是有沒來得及持久化的數據怎麼辦? 
從業務系統的角度,已經成功發送給消息隊列了。 
消息隊列也覺得Redis妥妥地收好了。 
可Redis還沒寫到日記裏,更沒有及時通知小夥伴,掛了。多是斷電了,多是進程被kill了。
後果會怎樣? 
已經執行過的任務會再次執行一遍。 
已經放到隊列中的任務,消失了。 
標記爲已經完成的任務,狀態變爲「進行中」了,而後又被執行了一遍。 
後果不可接受。
分佈式隊列不容許丟數據。 
從業務角度,哪怕丟1條數據也是沒法接受的。 
從運維角度,Redis丟數據後,若是能夠及時發現並補救,也是能夠接受的。
從架構角度,隊列保存在Redis中,業務數據(包括任務狀態)保存在關係型數據庫中。 
任務狀態是從業務角度肯定的,消息隊列不該該干涉。若是業務狀態沒有統一的規範和定義,從業務數據比對任務隊列是否全面正確,就只能交給業務開發方來作。 
從分工上來看,任務隊列的目的是管理任務執行的狀態,業務系統把這個職責交給了任務隊列,業務系統自身的任務狀態維護未必準確。 
結論:任務隊列不能推卸責任,不能丟數據是核心功能,不能打折扣。
採用 Master-Slave 數據複製模式,配置bgsave,追加存儲到aof。
在從服務器上配置bgsave,不影響master性能。
隊列操做都是寫操做,master任務繁重,能讓slave分擔的持久化工做,就不要master作。
rdb和aof兩種方法都用上,多重保險。 
appendfsync設爲always。// 單節點測性能,連續100000次算平均時間,和per second比對,性能損失不大。 
性能會有些許損失,但任務執行爲異步操做,無需用戶同步等待,爲了保證數據安全,這樣是值得的。
當運維須要重啓Master服務器的時候,採起這樣的順序: 小程序

  1. 經過 cli shutdown 中止master服務器, master交代完後過後,關掉本身。這時候「哨兵」會找一個新的master出來。 
    萬萬不能夠直接kill或者直接打開防火牆中斷master和slave之間的鏈接。 
    master 對外防火牆,中止對外服務,Master 自動切換到其餘服務器上, 原 Master 繼續持久化 aof,發送到原來各從服務器。 
  2. 在原 master 上進行運維操做。 
  3. 啓動原 master,這時候它已是從服務器了。耐心等待它重新 master 獲取最新數據。觀察 redis 日誌輸出,確認數據安全。 
  4. 對新的 master 重複1-3的操做。 
  5. 將以上操做寫成腳本,自動化執行,避免人爲錯誤。
    基於Redis實現分佈式消息隊列(4)
    一、訪問Redis的工具類
    public class RedisManager {api

    private static Pool<Jedis> pool;緩存

    protected final static Logger logger = Logger.getLogger(RedisManager.class);安全

    static{
    try {
    init();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }服務器

    public static void init() throws Exception {網絡

    Properties props = ConfigManager.getProperties("redis");
    logger.debug("初始化Redis鏈接池。");
    if(props==null){
        throw new RuntimeException("沒有找到redis配置文件");
    }
    // 建立jedis池配置實例
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
    // 設置池配置項值
    int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim());
    jedisPoolConfig.setMaxTotal(poolMaxTotal);
    
    int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim());
    jedisPoolConfig.setMaxIdle(poolMaxIdle);
    
    long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim());
    jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);
    
    logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ",
            poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));
    
    // 根據配置實例化jedis池
    String connectMode = props.getProperty("redis.connectMode");
    String hostPortStr = props.getProperty("redis.hostPort");
    
    logger.debug(String.format("host : %s ",hostPortStr));
    logger.debug(String.format("mode : %s ",connectMode));
    
    if(StringUtils.isEmpty(hostPortStr)){
        throw new OptimusException("redis配置文件未配置主機-端口集");
    }
    String[] hostPortSet = hostPortStr.split(","); 
    if("single".equals(connectMode)){
        String[] hostPort = hostPortSet[0].split(":");
        pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim()));
    }else if("sentinel".equals(connectMode)){
        Set<String> sentinels = new HashSet<String>();     
        for(String hostPort : hostPortSet){
            sentinels.add(hostPort);
        }
        pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);
    }

    }架構

    /**

    • 使用完成後,必須調用 returnResource 還回。
    • @return 獲取Jedis對象
      */
      public static Jedis getResource(){
      Jedis jedis = pool.getResource();
      if(logger.isDebugEnabled()){
      logger.debug("得到連接:" + jedis);
      }
      return jedis;
      }

    /**

    • 獲取Jedis對象。
    • 用完後,須要調用returnResource放回鏈接池。
    • @param db 數據庫序號
    • @return
      */
      public static Jedis getResource(int db){
      Jedis jedis = pool.getResource();
      jedis.select(db);
      if(logger.isDebugEnabled()){
      logger.debug("得到連接:" + jedis);
      }
      return jedis;
      }

    /**

    • @param jedis
      */
      public static void returnResource(Jedis jedis){
      if(jedis!=null){
      pool.returnResource(jedis);
      if(logger.isDebugEnabled()){
      logger.debug("放回連接:" + jedis);
      }
      }
      }

    /**

    • 須要經過Spring確認這個方法被調用。
    • @throws Exception
      */
      public static void destroy() throws Exception {
      pool.destroy();
      }
      }
      這個類沒有經過技術手段強制調用returnResource和destroy,須要想一想辦法。
      二、隊列接口
      public interface TaskQueue {

    /**

    • 獲取隊列名
    • @return
      */
      String getName();

    /**

    • 往隊列中添加任務
    • @param task
      */
      void pushTask(String task);

    /**

    • 從隊列中取出一個任務
    • @return
      */
      String popTask();

}
用String類型描述任務,也能夠考慮byte[],要求對每一個任務描述的數據儘量短。
三、隊列的Redis實現類
/**

  • 任務隊列Redis實現。
  • 採用每次獲取Jedis並放回pool的方式。
  • 若是得到Jedis後一直不放手,反覆重用,兩個操做耗時能夠下降1/3。
  • 暫時先忍受這種低性能,不明確Jedis是否線程安全。
  • */public class TaskQueueRedisImpl implements TaskQueue {

    private final static int REDIS_DB_IDX = 9;

    private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);

    private final String name;

    /**

    • 構造函數。
    • @param name
      */
      public TaskQueueRedisImpl(String name) {
      this.name = name;
      }

    /* (non-Javadoc)

    • @see com.gwssi.common.mq.TaskQueue#getName()
      /
      public String getName() {
      return this.name;
      }
      /
      (non-Javadoc)
    • @see com.gwssi.common.mq.TaskQueue#pushTask(String)
      */
      public void pushTask(String task) {
      Jedis jedis = null;
      try{
      jedis = RedisManager.getResource(REDIS_DB_IDX);
      jedis.lpush(this.name, task);
      }catch(Throwable e){ logger.error(e.getMessage(),e);
      }finally{ if(jedis!=null){ RedisManager.returnResource(jedis); }
      }
      }

    /* (non-Javadoc)

    • @see com.gwssi.common.mq.TaskQueue#popTask()
      public String popTask() {
      Jedis jedis = null;
      String task = null;
      try{
      jedis = RedisManager.getResource(REDIS_DB_IDX);
      task = jedis.rpop(this.name);
      }catch(Throwable e){ logger.error(e.getMessage(),e);
      }finally{ if(jedis!=null){ RedisManager.returnResource(jedis); }
      }
      return task;
      }}
      四、獲取隊列實例的工具類
  • <pre>
  • // 得到隊列
  • TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
  • // 添加任務到隊列
  • String task = "task id";
  • tq.pushTask(task);
  • // 從隊列中取出任務執行
  • String taskToDo = tq.popTask();
  • </pre>
  • @author liuhailong
    */public class TaskQueueManager {
    protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);
    private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();
    • 短信隊列名。
      public static final String SMS_QUEUE = "SMS_QUEUE";
    • 規則隊列名。
      public static final String RULE_QUEUE = "RULE_QUEUE";
      private static void initQueneMap() {
      logger.debug("初始化任務隊列...");
      queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
      logger.debug("創建隊列:"+RULE_QUEUE);
      queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
      logger.debug("創建隊列:"+SMS_QUEUE);
      }
      static { initQueneMap(); }
      public static TaskQueue get(String name){ return getRedisTaskQueue(name); }
      public static TaskQueue getRedisTaskQueue(String name){ return queneMap.get(name); }

}
和具體的隊列過於緊耦合,但簡單好用。 
先跑起來再說。
五、向隊列中添加任務的代碼
TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);
六、從隊列中取出任務執行的代碼
public class SmsSendTask{
protected final static Logger logger = Logger.getLogger(SmsSendTask.class);
protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();

  • 入口方法。
    public void execute() {
    TaskQueue taskQueue = null;
    String task = null;
    try {
    taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
    // 非線程安全
    Set<Serializable> executedTaskSet = new HashSet<Serializable>();
    task = taskQueue.popTask();
    while(task!=null){
    // 判斷是否把全部任務都執行一遍了,避免死循環
    if(executedTaskSet.contains(task)){ taskQueue.pushTask(task); break; }
    executeSingleTask(taskQueue,task);
    task = taskQueue.popTask();
    }
    }catch(Throwable e){ logger.error(e.getMessage(),e); e.printStackTrace(); } }
  • 發送單條短信。
  • 取出任務並執行,若是失敗,放回任務列表。
  • @param taskQueue
  • @param task@SuppressWarnings({ "rawtypes", "unchecked" })private void executeSingleTask(TaskQueue taskQueue, String task) {try {// do the jobString smsId = task;Map<String,String> sms = smsSendService.getSmsList(smsId);smsSendService.send(sms);smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);String opType = "2";TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);} catch (Throwable e) {if(task!=null){taskQueue.pushTask(task);smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);if(logger.isDebugEnabled()){logger.error(String.format("任務%s執行失敗:%s,從新放回隊列", task, e.getMessage()));}}else { e.printStackTrace(); } } }}這部分代碼是固定模式,並且不這樣作存在重大缺陷,會有任務執行失敗,被丟棄,這部分代碼應該寫到隊列實現中。 有空再改。
相關文章
相關標籤/搜索