核心原理:利用Redis的List列表實現,發佈事件對應rpush,訂閱事件對應lpopredis
問題一:Redis不是自帶Pub/Sub嗎?緩存
redis自帶的pub/sub有兩個問題:app
1.若是發佈消息期間訂閱方沒有連到redis,那麼這條消息就收不到了,即便從新鏈接上來也收不到ide
2.redis內部是用一個線程給全部訂閱鏈接推數據的,V生產> V消費 的狀況下還會主動斷開鏈接,有性能隱患。感興趣的能夠多瞭解一下它的原理。工具
問題二:要實現怎樣一個工具,或者說想要什麼樣的效果?性能
效果就是獲得一個service對象,這個對象有如下兩個重要功能:優化
1.有個publish方法能夠調用,用來靈活地發佈消息。想發佈什麼就發佈什麼,想給哪一個topic發送就給哪一個topic發送。this
2.能夠預約義一些訂閱者,定義好當收到某個topic的消息後,該作什麼處理。編碼
(一)接口定義spa
第一步要作的就是定義接口,一個是發佈接口,咱們須要這樣一個接口來發布消息,消息內容能夠是任何形式的對象
public interface MessagePublisher { /** * 發佈消息 * @param topic 主題 * @param msg 消息內容 */ void publish(String topic, Object msg); }
第二個是訂閱接口,咱們須要依此實現觀察者模式
public interface MessageConsumer { /** * 獲取此消費者訂閱的topic * @return 訂閱topic */ String getTopic(); /** * 回調方法,收到消息後,此方法被觸發 * @param topic topic * @param msg 消息內容 */ void onMessage(String topic, Object msg); }
第三個就是轉換接口,已知Redis不能直接存儲Java對象,因此必須進行轉換,這裏咱們選擇用String形式進行存儲。因此咱們須要一個類型轉換工具
public interface Translator { /** * 將對象序列化爲字符串 * @param obj 對象 * @return 字符串 */ String serialize(Object obj); /** * 將字符串反序列化爲對象 * @param str 字符串 * @return 對象 */ Object deserialize(String str); }
(二)轉換器實現——JsonTranslator
問題一:取出數據後如何轉換成正確的對象?
在寫入redis的時候同時也寫入該對象的類型信息,而後取出的時候利用該類型信息進行轉換便可。
public class JsonTranslator implements Translator { private static ObjectMapper MAPPER = new ObjectMapper(); /** * 緩存類信息,優化速度 */ private Map<String, Class> classCache = new HashMap<>(); @Override public String serialize(Object obj) { Message message = new Message(); message.setClazz(obj.getClass().getName()); message.setData(encode(obj)); return encode(message); } @Override public Object deserialize(String str) { Message message = decode(str, Message.class); String className = message.getClazz(); Class clazz = classCache.get(className); if(clazz != null) return decode(message.getData(), clazz); try { clazz = Class.forName(className); classCache.put(className, clazz); return decode(message.getData(), clazz); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } } private String encode(Object obj) { try { return MAPPER.writeValueAsString(obj); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } private <T> T decode(String str, Class<T> clazz) { try { return (T) MAPPER.readValue(str, Message.class); } catch (IOException e) { throw new RuntimeException(e); } } @Data class Message { //保存類信息,是爲了反序列化可以獲得正確類型的對象 /** * 類名(含路徑) */ private String clazz; /** * 序列化後的對象 */ private String data; } }
(三)核心實現——RedisPubSub
問題一:Redis配置如何處理?
咱們將Redis的配置與這個MQ解耦,讓用戶配置鏈接池後再注入進來便可。
問題二:如何知道要監聽哪些topic?
咱們把容器中的Consumer實現類都注入進來,就能夠經過getTopic方法獲得總共須要監聽哪些topic。
問題三:如何進行監聽?
每一個須要監聽的topic開一個線程進行監聽,監聽方法就是循環調用blpop。
問題四:監聽到消息後如何進行通知?
當獲得topic的消息的時候,就回調訂閱此topic的consumer的onMessage方法。
問題五:如何啓動和關閉監聽?
咱們給MQ類提供兩個方法start和stop。在注入容器的時候指明這兩個分別是init和destroy方法,這樣它就能隨着容器啓動和中止了。
public class RedisPubSub implements MessagePublisher{ //外部注入信息 private JedisPool jedisPool; private List<MessageConsumer> consumerList; /** * 對象和字符串的轉換器,默認使用JsonTranslator */ private Translator translator = new JsonTranslator(); //內部信息 /** * key:topic * value:此topic的訂閱者 */ private Map<String, List<MessageConsumer>> subcribeInfo; private List<MessageListener> listeners; public void setJedisPool(JedisPool jedisPool) { this.jedisPool = jedisPool; } public void setConsumerList(List<MessageConsumer> consumerList) { this.consumerList = consumerList; subcribeInfo = new HashMap<>(); String topic; List<MessageConsumer> topicConsumers; //注入消費者後,整理好訂閱狀況 for(MessageConsumer consumer : consumerList) { topic = consumer.getTopic(); topicConsumers = subcribeInfo.get(topic); if(topicConsumers == null) { topicConsumers = new ArrayList<>(); subcribeInfo.put(topic, topicConsumers); } topicConsumers.add(consumer); } } public void setTranslator(Translator translator) { this.translator = translator; } public void publish(String topic, Object msg) { Jedis jedis = jedisPool.getResource(); jedis.rpush(topic,translator.serialize(msg)); jedis.close(); } public void start() { MessageListener listener; //每一個topic開一個監聽線程進行監聽 for(String topic : subcribeInfo.keySet()) { listener = new MessageListener(topic, subcribeInfo.get(topic)); listener.start(); listeners.add(listener); } } public void stop() { //關閉全部監聽器 for(MessageListener listener: listeners) { listener.stop(); } } public class MessageListener implements Runnable { /** * 此監聽器監聽的topic */ private String topic; /** * 此topic的消費者 */ private List<MessageConsumer> consumers; /** * 綁定線程 */ private Thread t; public MessageListener(String topic, List<MessageConsumer> consumers) { this.topic = topic; this.consumers = consumers; } /** * 將數據反序列化 * @param msg 字符串消息 * @return 消息對象 */ public Object deserialize(String msg) { return translator.deserialize(msg); } public void run() { String msg; Object obj; //從池中抓取一個鏈接用來監聽redis隊列 Jedis jedis = jedisPool.getResource(); while(!Thread.interrupted()) { msg = jedis.blpop(1, topic).get(1); obj = deserialize(msg); //收到消息後告知全部消費者 for(MessageConsumer consumer:consumers) { consumer.onMessage(topic, obj); } } jedis.close(); //訂閱結束後釋放資源 } public void start() { t = new Thread(this); t.start(); } public void stop() { //利用中斷打斷線程的運行 t.interrupt(); } } }
(一)定義好Consumer,注入爲容器bean
@Component public class TestConsumer implements MessageConsumer { @Override public void onMessage(String topic, Object message) { System.out.println((SomeObject)message); } @Override public String getTopic() { return "test"; } }
因爲Ttranslator會將對象轉換好,因此只要將Object強制轉換成指定類型便可使用。
(二)全局配置
@Configuration public class TestConfig { @Bean public JedisPool jedisPool() { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(20); jedisPoolConfig.setMaxIdle(5); jedisPoolConfig.setMinIdle(1); return new JedisPool(jedisPoolConfig, "127.0.0.1", 6379, 2000, "123456"); } @Bean(value = "rediMQ", initMethod = "start", destroyMethod = "stop") @Autowired public RedisPubSub redisPubSub(List<MessageConsumer> consumers, JedisPool jedisPool) { RedisPubSub redisPubSub = new RedisPubSub(); redisPubSub.setJedisPool(jedisPool); redisPubSub.setConsumerList(consumers); return redisPubSub; } }
@Autowired 配合方法參數的List<MessageConsumer> 就能夠獲得容器中全部的Consumer。
(三)引入使用
@Service public class SomeService { @Autowired private MessagePublisher publisher; public void someOperation() { publisher.publish("test", new SomeObject()); } }
只須要以MessagePublisher接口的身份引入就能夠了。