本工具的核心思想就是:賭。只有兩個基礎組件同時死亡,纔會受到
嚴重影響
。哦,斷電除外。
mq是個好東西,咱們都在用。這也決定了mq應該是高高高可用的。某團就由於這個組件,出了好幾回生產事故,呵呵。git
大部分業務系統,要求的消息語義都是at least once
,即都會有重複消息,但保證不會丟。即便這樣,依然有不少問題:github
1、mq可用性沒法保證。 mq的意外死亡,形成生產端發送失敗。不少消息要經過扒取日誌進行回放,成本高耗時長。redis
2、mq阻塞業務正常進行。 mq卡頓或者網絡問題,會形成業務線程卡在mq的發送方法上,正常業務進行不下去,形成災難性的後果。api
3、消息延遲。 mq死了就用不着說了,消息還沒投胎就已死亡。消息延遲主要是客戶端消費能力不強,或者是消費通道單一形成的。微信
使用組合存儲來保證消息的可靠投遞,就是okmq
。網絡
注意:okmq注重的是可靠性。對於順序性、事務等其餘要素,不予考慮。固然,速度是必須的。
我即便用兩套redis來模擬一些mq操做,都會比現有的一些解決方案要強。但這確定不是咱們須要的,由於redis的堆積能力太有限,內存佔用率直線上升的感受並不太好。多線程
但咱們能夠用redis來做爲額外的發送確認機制。這個想法,在《使用多線程增長kafka消費能力》一文中曾經提到過,如今到了實現的時候了。socket
OkmqKafkaProducer producer = new ProducerBuilder() .defaultSerializer() .eanbleHa("redis") .any("okmq.redis.mode", "single") .any("okmq.redis.endpoint", "127.0.0.1:6379") .any("okmq.redis.poolConfig.maxTotal", 100) .servers("localhost:9092") .clientID("okMQProducerTest") .build(); Packet packet = new Packet(); packet.setTopic("okmq-test-topic"); packet.setContent("i will send you a msg"); producer.sendAsync(packet, null); producer.shutdown();
咱們按照數字標號來介紹:分佈式
一、 在消息發送到kafka以前,首先入庫redis。因爲後續回調須要用到一個惟一表示,咱們在packet包裏添加了一個uuid。ide
二、 調用底層的api,進行真正的消息投遞。
三、 經過監聽kafka的回調,刪除redis中對應的key。在這裏能夠獲得某條消息確切的的ack時間。那麼長時間沒有刪除的,就算是投遞失敗的消息。
四、 後臺會有一個線程進行這些失敗消息的遍歷和從新投遞。咱們叫作recovery。最複雜的也就是這一部分。對於redis來講,會首先爭搶一個持續5min的鎖,而後遍歷相關hashkey。
因此,對於以上代碼,redis發出如下命令:
1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354" 1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" "" 1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{\"content\":\"i will send you a msg104736623015238\",\"topic\":\"okmq-test-topic\",\"identify\":\"2b9b33fd-95fd-4cd6-8815-4c572f13f76e\",\"timestamp\":1559206423318}" 1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" 1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000" 1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash" 1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0" 1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354" 1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock" 1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"
1、mq可用性沒法保證。
爲何要要經過過後進行恢復呢?我把recovery機制帶着不是更好麼?經過對未收到ack的消息進行遍歷,能夠把這個過程作成自動化。
2、mq阻塞業務正常進行。
經過設置kafka的MAX_BLOCK_MS_CONFIG
參數,實際上是能夠不阻塞業務的,但會丟失消息。我可使用其餘存儲來保證這些丟失的消息從新發送。
3、消息延遲。
mq死掉了,依然有其餘備用通道進行正常服務。也有的團隊採用雙寫mq雙消費的方式來保證這個過程,也是被逼急了:)。若是kafka死掉了,業務會切換到備用通道進行消費。
若是你不想用redis,好比你先要用hbase,那也是很簡單的。
但須要實現一個HA接口。
public interface HA { void close(); void configure(Properties properties); void preSend(Packet packet) throws HaException; void postSend(Packet packet) throws HaException; void doRecovery(AbstractProducer producer) throws HaException; }
使用以前,還須要註冊一下你的插件。
AbstractProducer.register("log", "com.sayhiai.arch.okmq.api.producer.ha.Ha2SimpleLog");
okmq.ha.recoveryPeriod 恢復線程檢測週期,默認5秒 okmq.redis.mode redis的集羣模式,可選:single、sentinel、cluster okmq.redis.endpoint 地址,多個地址以,分隔 okmq.redis.connectionTimeout 鏈接超時 okmq.redis.soTimeout socket超時 okmq.redis.lockPx 分佈式鎖的持有時間,可默認,5min okmq.redis.splitMillis 間隔時間,redis換一個key進行運算,默認5min okmq.redis.poolConfig.* 兼容jedis的全部參數
一、進行了生產端的高可用抽象,實現了kafka的樣例。
二、增長了SimpleLog的ping、pong日誌實現。
三、增長了Redis的生產端備用通道。包含single、cluster、sentinel三種模式。
四、能夠自定義其餘備用通道。
五、兼容kakfa的全部參數設置。
一、實現ActiveMQ的集成。
二、實現消費者的備用通道集成。
三、增長嵌入式kv存儲的生產者集成。
四、更精細的控制系統的行爲。
五、加入開關和預熱,避免新啓動mq即被壓垮。
六、redis分片機制,大型系統專用。
一、監控功能添加。
二、rest接口添加。
當你把參數ha設置爲true,代表你已經收到如下的使用限制。反之,系統反應於原生無異。
使用限制:
本工具僅適用於非順序性、非事務性的普通消息投遞,且客戶端已經作了冪等。一些訂單系統、消息通知等業務,很是適合。若是你須要其餘特性,請跳出此頁面。
kafka死亡,或者redis單獨死亡,消息最終都會被髮出,僅當kafka與redis同時死亡,消息纔會發送失敗,並記錄在日誌文件裏。
正常狀況下,redis的使用容量極少極少。異常狀況下,redis的容量有限,會迅速佔滿。redis的剩餘時間就是你的StopWatch
,你必須在這個時間內恢復你的消息系統,必定要頂住哇。
系統目前處於1.0.0版本,正在線上小規模試用。工具小衆,但適用於大部分應用場景。若是你正在尋求這樣的解決方案,歡迎一塊完善代碼。
github地址:
https://github.com/sayhiai/okmq
也歡迎關注《小姐姐味道》微信公衆號,進行交流。