基於Redis實現分佈式消息隊列
原文地址:http://blog.csdn.net/stationxp/article/details/45731497)
一、爲何須要消息隊列?
當系統中出現「生產「和「消費「的速度或穩定性等因素不一致的時候,就須要消息隊列,做爲抽象層,彌合雙方的差別。
舉個例子:業務系統觸發短信發送申請,但短信發送模塊速度跟不上,須要未來不及處理的消息暫存一下,緩衝壓力。
再舉個例子:調遠程系統下訂單成本較高,且由於網絡等因素,不穩定,攢一批一塊兒發送。
再舉個栗子,交互模塊5:00到24:00和電商系統聯通,和內部ERP斷開。1:00到4:00和ERP聯通,和電商系統斷開。
再舉個例子,服務員點菜快,廚師作菜慢。
再舉個例子,到銀行辦事的人多,提供服務的窗口少。
乖乖排隊吧。
二、使用消息隊列有什麼好處?
2.一、提升系統響應速度
使用了消息隊列,生產者一方,把消息往隊列裏一扔,就能夠立馬返回,響應用戶了。無需等待處理結果。
處理結果可讓用戶稍後本身來取,如醫院取化驗單。也可讓生產者訂閱(如:留下手機號碼或讓生產者實現listener接口、加入監聽隊列),有結果了通知。得到約定將結果放在某處,無需通知。
2.二、提升系統穩定性
考慮電商系統下訂單,發送數據給生產系統的狀況。
電商系統和生產系統之間的網絡有可能掉線,生產系統可能會因維護等緣由暫停服務。
若是不使用消息隊列,電商系統數據發佈出去,顧客沒法下單,影響業務開展。
兩個系統間不該該如此緊密耦合。應該經過消息隊列解耦。同時讓系統更健壯、穩定。
三、爲何須要分佈式?
3.一、多系統協做須要分佈式
消息隊列中的數據須要在多個系統間共享數據才能發揮價值。
因此必須提供分佈式通訊機制、協同機制。
3.二、單系統內部署環境須要分佈式
單系統內部,爲了更好的性能、爲了不單點故障,多爲集羣環境。
集羣環境中,應用運行在多臺服務器的多個JVM中;數據也保存在各類類型的數據庫或非數據庫的多個節點上。
爲了知足多節點協做須要,須要提供分佈式的解決方案。
四、分佈式環境下須要解決哪些問題
4.一、併發問題
需進行良好的併發控制。確保「線程安全「。
不要出現一個訂單被出貨兩次。不要出現顧客A下的單,發貨發給了顧客B等狀況。
4.二、簡單的、統一的操做機制
需定義簡單的,語義明確的,業務無關的,恰當穩妥的統一的訪問方式。
4.三、容錯
控制好單點故障,確保數據安全。
4.四、可橫向擴展
可便捷擴容。
五、如何實現?
成熟的消息隊列中間件產品太多了,族繁不及備載。
成熟產品通過驗證,接口規範,可擴展性強。
結合事業環境因素、組織過程遺產、實施運維考慮、技術路線考慮、開發人員狀況等緣由綜合考慮,基於Redis本身作一個是最可行的選擇。
一、消息隊列需提供哪些功能?
在功能設計上,我崇尚奧卡姆剃刀法則。
對於消息隊列,只須要兩個方法: 生產 和 消費。
具體的業務場景是任務隊列,代碼設計以下:
public abstract class TaskQueue{
private final String name ;
public String getName(){return this.name;}html
public abstract void addTask(Serializable taskId); public abstract Serializable popTask();
}
同時支持多個隊列,每一個隊列都應該有個名字。final確保TaskQueue是線程安全的。TaskQueue的實現類也應該確保線程安全。
addTask向隊列中添加一個任務。隊列中僅保存任務的id,不存儲任務的業務數據。
popTask從隊列中取出一個任務來執行。
這種設計不是特別友好,由於她須要調用者自行保證任務執行成功,若是執行失敗,自行確保從新把任務放回隊列。 不管如何,這種機制是能夠工做的。想一想奧卡姆剃刀法則,咱們先按照這個設計實現出來看看。
若是調用者把業務數據存在數據庫中,業務數據中包含「狀態「列,標識任務是否被執行,調用者須要自行管理這個狀態,並控制事務。
popTask採用阻塞方式,仍是非阻塞方式呢?
若是採用阻塞方式,隊列中沒任務的時候,客戶端不會斷開鏈接,只是等。
通常狀況下,客戶端會有多個worker搶着幹活兒,幾條狼一塊兒等一個肉包子,畫面太美。鏈接是重要資源,若是一直沒活兒幹,先放回池裏,也不錯。
先採用非阻塞的方式吧,若是隊列是空的,popTask返回null,當即返回。
二、後續可能提供的功能
2.一、引入Task生命週期概念
應用場景不一樣,需求也不一樣。
在嚴格的應用場景中,須要確保每一個Task執行「成功「了。
對於上面提到的popTask後無論的「模式「,這是另一種「運行模式「,兩種模式能夠並行存在。
在這種新模式下,Task狀態有3種:新建立(new,剛調用addTask加到隊列中)、正在執行(in-process,調用popTask後,調用finish前)、完成(done,執行OK了,調用finishTask後)。
調整後的代碼以下:
public abstract class TaskQueue{redis
private final String name ; public String getName(){return this.name;} public abstract int getMode(); public abstract void addTask(Serializable taskId); public abstract Serializable popTask(); public abstract void finishTask(Serializable taskId);
}
2.二、增長批量取出任務的功能
popTask()一次取出一個任務,太磨嘰了。
比如咱們要買5瓶水,開車去超市買,每去一次買1瓶,有點兒啥。
咱們須要一個一次取多個任務的方法。
public abstract class TaskQueue{
... ...
public abstract Serializable[] popTasks(long cnt);
}
2.三、增長阻塞等待機制
想象一種場景:
小明同窗,取出一個任務,發現幹不了,放回隊列,再去取,取出來發現仍是幹不了,又放回去。反反覆覆。
小明童鞋腫麼了?多是他幹活須要網絡,網絡斷了。多是他作任務須要寫磁盤,磁盤滿了。
若是小明像鄰居家的孩子同樣優秀,當他發現哪裏不對的時候,他應該冷靜下來,歇會兒。
但他萬一不是呢?只有咱們能幫他了。
假如隊列中有10000個待辦任務。
這時候小明來了。他失敗100次後,咱們應該攔他嗎?不該該,除非他主動要求(在系統參數中配置)。5000次後呢?也不該該,除非他主動要求。咱們的原則是:咱們作的全部事情,對於調用者,都是能夠預期的。
咱們能夠在系統參數中要求調用者設置一個閥值N,若是不設置,默認爲100。連續失敗N次後,讓調用者睡一下子,睡多長時間,讓調用者配置。
假如咱們的底層實現中包含待辦子隊列、重作子隊列和完成子隊列(這種設計好複雜!pop的時候先pop重作,仍是先pop待辦,複雜死了!希望不須要這樣)。
待辦子隊列中有10000個任務。
在小明失敗10000次後,全部的任務都在重作子隊列了。這時候咱們應該攔他嗎?
重作子隊列要不要設置大小,超過以後,讓下一個訪問者等。
等的話就會涉及超時,超時後,任務也不能丟棄。
太複雜 了!設置一個連續失敗次數的限制就夠了!
2.四、考慮增長Task類
不保存任務的相關數據是基本原則,絕對不動搖。
增長Task類能夠管理下生命週期,更有用的是,能夠把Task自己設計成Listener,代碼大概時這樣的:
public abstract class Task{數據庫
public Serializable getId(); public int getState(); pubic void doTask(); public void whenAdded(final TaskQueue tq); public void whenPoped(final TaskQueue tq); // public void whenFaild(final TaskQueue tq); public void whenFinished(final TaskQueue tq);
}
經過Task接口,咱們能夠對調用過程進行更強勢的管理(如進行事務控制),對調用者施加更強的控制,用戶也能夠得到更多的交互機會,同TaskQueue有更好的交互(如在whenFinished中作持久化工做)。
但這些真的有必要嗎?是否是太侵入了?註解的方式會好些嗎?
再考慮吧。
2.五、增長系統參數
貌似須要個Config類了,不爽!
原本想作一個很小很精緻的小東西的,若是必須再加吧。
若是作的話,須要支持properties、註解設置、api方式設置、Spring注入式設置,煩。
次回預告:Redis自己機制和TaskQueue的契合。
基於Redis實現分佈式消息隊列(3)
一、Redis是什麼鬼?
Redis是一個簡單的,高效的,分佈式的,基於內存的緩存工具。
假設好服務器後,經過網絡鏈接(相似數據庫),提供Key-Value式緩存服務。
簡單,是Redis突出的特點。
簡單能夠保證核心功能的穩定和優異。
二、性能
性能方面:Redis是足夠高效的。
和Memecached對比,在數據量較小大狀況下,Redis性能更優秀。
數據量大到必定程度的時候,Memecached性能稍好。
簡單結論:但整體上講Redis性能已經足夠好。
// Ref: Redis性能測試 http://www.cnblogs.com/lulu/archive/2013/06/10/3130878.html
原則:Value大小不要超過1390Byte。
經實驗得知:
List操做和字符串操做性能至關,略差,幾乎能夠忽略。
使用Jedis自帶pool,「每次從pool中取用完放回「 和 「重用單個鏈接「 相比,平均用時是3倍。這部分須要繼續研究底層機制,採用更合理的實驗方法進一步得到數據。
使用Jedis自帶pool,性能上是知足當前訪問量須要的,等有時間了再進一步深刻。
三、數據類型
Redis支持5種數據類型:字符串、Map、List、Set、Sorted Set。
List特別適合用於實現隊列。提供的操做包括:
從左側(或右側)放入一個元素,從右側(或左側)取出一個元素,讀取某個範圍的元素,刪除某個範圍的元素。
Sorted Set中元素是惟一的,能夠經過名字找。
Map能夠高效地經過key找。
假如咱們須要實現finishTash(taskId),須要經過名字在隊列中找元素,上面兩個可能會用到。
四、原子操做
實現分佈式隊列首要問題是:不能出現併發問題。
Redis是底層是單線程的,命令執行是原子操做,支持事務,契合了咱們的需求。
Redis直接提供的命令都是原子操做,包括lpush、rpop、blpush、brpop等。
Redis支持事務。經過相似 begin…[cancel]…commit的語法,提供begin…commit之間的命令爲原子操做的功能,之間命令對數據的改變對其餘操做是不可見的。相似關係型數據庫中的存儲過程,同時提供了最高級別的事務隔離級別。
Redis支持腳本,每一個腳本的執行是原子性的。
作了一下併發測試:
寫了個小程序,隨機對List作push或pop操做,push的比pop的稍多。
記錄每次處理的詳細信息到數據庫。
最後把List中數據都pop出來,詳細記錄每次pop詳細信息。
統計push和pop是否相等,統計針對每條數據是否都有push和pop。
500併發,沒有出現併發問題。
五、集羣
實現分佈式隊列另外一個重要問題是:不能出現單點故障。
Redis支持Master-Slave數據複製,從服務器設置 slave-of master-ip:port 便可。
集羣功能能夠由客戶端提供。
客戶端使用哨兵,可自動切換主服務器。
因爲隊列操做都是寫操做,從服務器主要目的是備份數據,保證數據安全。
若是想基於 sharding 作多master集羣,能夠結合 zookeeper 本身作。
Redis 3.0支持集羣了,還沒細看,應該是個好消息,等你們都用起來,沒什麼問題的話,能夠考慮試試看。
若是 master 宕掉,怎麼辦?
「哨兵」會選出一個新的master來。產生過程當中,消息隊列暫停服務。
最極端的狀況,全部Redis都停了,當消息隊列發現Redis中止響應時,對業務系統的請求應拋出異常,中止隊列服務。
這樣會影響業務,業務系統下訂單、審批等操做會失敗。若是能夠接受,這是一種方案。
Redis整個集羣宕掉,這種狀況不多發生,若是真發生了,業務系統中止服務也是能夠理解的。
若是想要在Redis整個集羣宕掉的狀況下,消息隊列仍繼續提供服務。
方法是這樣的:
啓用備用存儲機制,能夠是zookeeper、能夠是關係型數據庫、能夠是另外可用的Memecached等。
本地內存存儲是不可取的,首先,同步多個客戶端虛擬機內存數據太複雜,至關於本身實現了一個Redis,其次,保證內存數據存儲安全太複雜。
備用存儲機制至關於實現了另一個版本的消息隊列,邏輯一致,底層存儲不一樣。這個實現能夠性能低一些,保證最基本的原則便可。
想要保證不出現併發問題,因爲消息隊列程序同時運行在多個虛擬機中,對象鎖、方法鎖無效。須要有一個獨立於虛擬機的鎖機制,zookeeper是個好選擇。
將關係型數據庫設置爲最高級別的事務隔離級別,太傻了。除了zk有其餘好辦法嗎?
Redis集羣整個宕掉的同時Zookeeper也全軍覆沒怎麼辦?
這個問題是沒有盡頭的,提供了第二備用存儲、第三備用存儲、第四備用存儲、…,理論上也會同時宕掉,那時候怎麼辦?
有錢任性的土豪能夠繼續,預算有限的狀況,能作到哪步就作到哪步。
六、持久化
分佈式隊列的應用場景和緩存的應用場景是不同的。
若是有沒來得及持久化的數據怎麼辦?
從業務系統的角度,已經成功發送給消息隊列了。
消息隊列也覺得Redis妥妥地收好了。
可Redis還沒寫到日記裏,更沒有及時通知小夥伴,掛了。多是斷電了,多是進程被kill了。
後果會怎樣?
已經執行過的任務會再次執行一遍。
已經放到隊列中的任務,消失了。
標記爲已經完成的任務,狀態變爲「進行中」了,而後又被執行了一遍。
後果不可接受。
分佈式隊列不容許丟數據。
從業務角度,哪怕丟1條數據也是沒法接受的。
從運維角度,Redis丟數據後,若是能夠及時發現並補救,也是能夠接受的。
從架構角度,隊列保存在Redis中,業務數據(包括任務狀態)保存在關係型數據庫中。
任務狀態是從業務角度肯定的,消息隊列不該該干涉。若是業務狀態沒有統一的規範和定義,從業務數據比對任務隊列是否全面正確,就只能交給業務開發方來作。
從分工上來看,任務隊列的目的是管理任務執行的狀態,業務系統把這個職責交給了任務隊列,業務系統自身的任務狀態維護未必準確。
結論:任務隊列不能推卸責任,不能丟數據是核心功能,不能打折扣。
採用 Master-Slave 數據複製模式,配置bgsave,追加存儲到aof。
在從服務器上配置bgsave,不影響master性能。
隊列操做都是寫操做,master任務繁重,能讓slave分擔的持久化工做,就不要master作。
rdb和aof兩種方法都用上,多重保險。
appendfsync設爲always。// 單節點測性能,連續100000次算平均時間,和per second比對,性能損失不大。
性能會有些許損失,但任務執行爲異步操做,無需用戶同步等待,爲了保證數據安全,這樣是值得的。
當運維須要重啓Master服務器的時候,採起這樣的順序: 小程序
將以上操做寫成腳本,自動化執行,避免人爲錯誤。
基於Redis實現分佈式消息隊列(4)
一、訪問Redis的工具類
public class RedisManager {api
private static Pool<Jedis> pool;緩存
protected final static Logger logger = Logger.getLogger(RedisManager.class);安全
static{
try {
init();
} catch (Exception e) {
e.printStackTrace();
}
}服務器
public static void init() throws Exception {網絡
Properties props = ConfigManager.getProperties("redis"); logger.debug("初始化Redis鏈接池。"); if(props==null){ throw new RuntimeException("沒有找到redis配置文件"); } // 建立jedis池配置實例 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); // 設置池配置項值 int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim()); jedisPoolConfig.setMaxTotal(poolMaxTotal); int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim()); jedisPoolConfig.setMaxIdle(poolMaxIdle); long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim()); jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis); logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ", poolMaxTotal,poolMaxIdle,poolMaxWaitMillis)); // 根據配置實例化jedis池 String connectMode = props.getProperty("redis.connectMode"); String hostPortStr = props.getProperty("redis.hostPort"); logger.debug(String.format("host : %s ",hostPortStr)); logger.debug(String.format("mode : %s ",connectMode)); if(StringUtils.isEmpty(hostPortStr)){ throw new OptimusException("redis配置文件未配置主機-端口集"); } String[] hostPortSet = hostPortStr.split(","); if("single".equals(connectMode)){ String[] hostPort = hostPortSet[0].split(":"); pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim())); }else if("sentinel".equals(connectMode)){ Set<String> sentinels = new HashSet<String>(); for(String hostPort : hostPortSet){ sentinels.add(hostPort); } pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig); }
}架構
/**
/**
/**
/**
/**
/**
/**
}
用String類型描述任務,也能夠考慮byte[],要求對每一個任務描述的數據儘量短。
三、隊列的Redis實現類
/**
*/public class TaskQueueRedisImpl implements TaskQueue {
private final static int REDIS_DB_IDX = 9;
private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);
private final String name;
/**
/* (non-Javadoc)
/* (non-Javadoc)
}
和具體的隊列過於緊耦合,但簡單好用。
先跑起來再說。
五、向隊列中添加任務的代碼
TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);
六、從隊列中取出任務執行的代碼
public class SmsSendTask{
protected final static Logger logger = Logger.getLogger(SmsSendTask.class);
protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();