這裏我使用Redis的發佈、訂閱功能實現簡單的消息隊列,基本的命令有publish、subscribe等。java
在Jedis中,有對應的java方法,可是隻能發佈字符串消息。爲了傳輸對象,須要將對象進行序列化,並封裝成字符串進行處理。redis
封裝一個消息對象ide
public class Message implements Serializable{ private static final long serialVersionUID = 1L; private String titile; private String info; public Message(String titile,String info){ this.titile=titile; this.info=info; } public String getTitile() { return titile; } public void setTitile(String titile) { this.titile = titile; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } }
爲這個消息對象提供序列化方法測試
public class MessageUtil { //convert To String public static String convertToString(Object obj,String charset) throws IOException{ ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oo = new ObjectOutputStream(bo); oo.writeObject(obj); String str = bo.toString(charset); bo.close(); oo.close(); return str; } //convert To Message public static Object convertToMessage(byte[] bytes) throws Exception{ ByteArrayInputStream in = new ByteArrayInputStream(bytes); ObjectInputStream sIn = new ObjectInputStream(in); return sIn.readObject(); } }
從Jedis鏈接池中獲取鏈接this
public class RedisUtil { /** * Jedis connection pool * @Title: config */ public static JedisPool getJedisPool(){ ResourceBundle bundle=ResourceBundle.getBundle("redis"); String host=bundle.getString("host"); int port=Integer.valueOf(bundle.getString("port")); int timeout=Integer.valueOf(bundle.getString("timeout")); // String password=bundle.getString("password"); JedisPoolConfig config=new JedisPoolConfig(); config.setMaxActive(Integer.valueOf(bundle.getString("maxActive"))); config.setMaxWait(Integer.valueOf(bundle.getString("maxWait"))); config.setTestOnBorrow(Boolean.valueOf(bundle.getString("testOnBorrow"))); config.setTestOnReturn(Boolean.valueOf(bundle.getString("testOnReturn"))); JedisPool pool=new JedisPool(config, host, port, timeout); return pool; } }
public class Producer { private Jedis jedis; private JedisPool pool; public Producer(){ pool=RedisUtil.getJedisPool(); jedis = pool.getResource(); } public void provide(String channel,Message message) throws IOException{ String str1=MessageUtil.convertToString(channel,"UTF-8"); String str2=MessageUtil.convertToString(message,"UTF-8"); jedis.publish(str1, str2); } //close the connection public void close() throws IOException { //將Jedis對象歸還給鏈接池,關閉鏈接 pool.returnResource(jedis); } }
public class Consumer { private Jedis jedis; private JedisPool pool; public Consumer(){ pool=RedisUtil.getJedisPool(); jedis = pool.getResource(); } public void consum(String channel) throws IOException{ JedisPubSub jedisPubSub = new JedisPubSub() { // 取得訂閱的消息後的處理 public void onMessage(String channel, String message) { System.out.println("Channel:"+channel); System.out.println("Message:"+message.toString()); } // 初始化訂閱時候的處理 public void onSubscribe(String channel, int subscribedChannels) { System.out.println("onSubscribe:"+channel); } // 取消訂閱時候的處理 public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("onUnsubscribe:"+channel); } // 初始化按表達式的方式訂閱時候的處理 public void onPSubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取消按表達式的方式訂閱時候的處理 public void onPUnsubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取得按表達式的方式訂閱的消息後的處理 public void onPMessage(String pattern, String channel, String message) { System.out.println(pattern + "=" + channel + "=" + message); } }; jedis.subscribe(jedisPubSub, channel); } //close the connection public void close() throws IOException { //將Jedis對象歸還給鏈接池 pool.returnResource(jedis); } }
public static void main(String[] args){ Message msg=new Message("hello!", "this is the first message!"); Producer producer=new Producer(); Consumer consumer=new Consumer(); try { producer.provide("chn1",msg); consumer.consum("chn1"); } catch (IOException e) { e.printStackTrace(); } }