Redis 發佈/訂閱 隊列使用

項目中使用到redis的發佈/訂閱功能,通常的發佈/訂閱功能,只要客戶端publish一個消息,訂閱端就能立刻訂閱到發佈的消息。可是在分佈式集羣中訂閱端在每臺服務器中都開啓了一個進程進行頻道的訂閱,這樣就會出現這種狀況,當一個消息發佈的時候,每臺服務器都進行訂閱一次出現數據重複,對於有些需求咱們只須要全部的服務器的訂閱只能取到一次數據。這時候我目前的解決方法使用隊列,每當發佈消息時,向隊列中插入一條數據,訂閱時從隊列中獲取。 注意:(發佈/訂閱)使用序列化對象傳輸時,必定要帶上字符編碼格式否則會出現亂碼,以下列子:java

發佈端發佈消息
jedis.publish("testMessage" , SerializeUtils.serialize2(userDo));


訂閱端讀取
UserDo userDo = (UserDo) SerializeUtils.unSerialize(msg.getBytes("ISO-8859-1"));

/**
 * 序列化 返回字符串
 * @param object
 * @return
 */
public static String serialize2(Object object) {
    ObjectOutputStream oos = null;
    ByteArrayOutputStream baos = null;
    try {
        baos = new ByteArrayOutputStream();
        oos = new ObjectOutputStream(baos);
        oos.writeObject(object);
        return baos.toString("ISO-8859-1");
    } catch (Exception e) {
        logger.info("context", e);
    }
    return null;
}

/**
 * 反序列化
 * @param bytes
 * @return
 */
public static Object unSerialize(byte[] bytes) {
    ByteArrayInputStream bais = null;
    try {
        bais = new ByteArrayInputStream(bytes);
        ObjectInputStream ois =new ObjectInputStream(bais);
        return ois.readObject();
    } catch (Exception e) {
        logger.info("context", e);
    }
    return null;
}


如下是redis發佈/訂閱 使用訂閱 主要代碼實現

我目前是設定系統啓動時,立刻開啓對須要的訂閱的頻道進程。(固然有不少方法開啓訂閱進程,本身去想一想好的方法)
package cn.sparkant.main;

import cn.sparkant.common.tools.redis.Subscriber;
import cn.sparkant.utils.redis.RedisUtils;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.context.ServletContextAware;
import redis.clients.jedis.Jedis;

import javax.servlet.ServletContext;

/**
 * 系統啓動時 初始化相關方法
 * Created by hjs on 16/4/17.
 */
public class SystemInitBean  implements InitializingBean , ServletContextAware{

    private static final Logger logger = Logger.getLogger(SystemInitBean.class);

    public void afterPropertiesSet() throws Exception {
        logger.info("................系統啓動時 初始化相關方法....");
        startSubscriber();
    }

    public void setServletContext(ServletContext servletContext) {

    }

    public void startSubscriber() {
        final Subscriber subscriber = new Subscriber();
        new Thread(new Runnable() {
            public void run() {
                    Jedis jedis = RedisUtils.getJedis();
                try {
                    logger.info("................進入redis訂閱監聽........");
                    jedis.subscribe(subscriber, new String[]{"channel1", "channel2","testMessage"});
                } catch (Exception e) {
                    logger.info("content", e);
                } finally {
                    RedisUtils.releaseJedis(jedis);
                }
            }
        }).start();
    }

}


spring-context.xml配置文件中加入
<!--系統啓動時 初始化-->
<bean class="cn.sparkant.main.SystemInitBean"></bean>

訂閱實現類
package cn.sparkant.common.tools.redis;

import cn.sparkant.common.entity.bean.UserDo;
import cn.sparkant.utils.redis.RedisUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;

/**
 * redis訂閱者
 * Created by hjs on 16/4/14.
 */
public class Subscriber extends JedisPubSub{

    private static Logger logger = Logger.getLogger(Subscriber.class);

    public void onMessage(String channel, String msg) {
        System.out.println("收到頻道 : 【" + channel + " 】的消息 :" + msg);
            if (channel.equals("testMessage") && StringUtils.isNotEmpty(msg)) {
                try {
                    //UserDo userDo = (UserDo) SerializeUtils.unSerialize(msg.getBytes("ISO-8859-1"));
                    //System.out.println("......." + userDo.getEmail());
                    UserDo userDo = (UserDo) RedisUtils.rpop("test");
                    logger.info("--------" + userDo.getEmail());
                } catch (Exception e) {
                    logger.error("error" ,e);
                }
            }
    }
}

發佈測試類
package cn.sparkant.test.redis;

import cn.sparkant.common.entity.bean.UserDo;
import cn.sparkant.utils.SerializeUtils;
import cn.sparkant.utils.redis.RedisUtils;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import java.io.IOException;

/**
 * Created by hjs on 16/4/14.
 */

public class TestRedis {

   

    @Test
    public void publish_test() throws IOException{
        Jedis jedis = RedisUtils.getJedis();
        jedis.lpush("test","huangjianshan");
        long i = jedis.publish("channel1", "channel1的朋友們,大家好嗎?親");
        i = jedis.publish("channel2", "你好呀,親");
        UserDo userDo = new UserDo();
        userDo.setEmail("17tengfei@163.com");
        jedis.publish("testMessage" , SerializeUtils.serialize2(userDo));
        RedisUtils.releaseJedis(jedis);

    }

}

序列化 反序列化工具
package cn.sparkant.utils;

import org.apache.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;


/**序列化 反序列化工具
 * Created by hjs on 16/4/10.
 */
public class SerializeUtils {

    private static final Logger logger = Logger.getLogger(SerializeUtils.class);

    /**
     * 序列化 返回字節
     * @param object
     * @return
     */
    public static byte[] serialize(Object object) {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            byte[] bytes = baos.toByteArray();
            return  bytes;
        } catch (Exception e) {
            logger.info("context", e);
        }
        return null;
    }

    /**
     * 序列化 返回字符串
     * @param object
     * @return
     */
    public static String serialize2(Object object) {
        ObjectOutputStream oos = null;
        ByteArrayOutputStream baos = null;
        try {
            baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(object);
            return baos.toString("ISO-8859-1");
        } catch (Exception e) {
            logger.info("context", e);
        }
        return null;
    }

    /**
     * 反序列化
     * @param bytes
     * @return
     */
    public static Object unSerialize(byte[] bytes) {
        ByteArrayInputStream bais = null;
        try {
            bais = new ByteArrayInputStream(bytes);
            ObjectInputStream ois =new ObjectInputStream(bais);
            return ois.readObject();
        } catch (Exception e) {
            logger.info("context", e);
        }
        return null;
    }


}


後面還需進一步的實現動態的添加訂閱頻道與取消訂閱頻道


redis工具類封裝代碼以下
package cn.sparkant.utils.redis;

import cn.sparkant.common.entity.vo.UserVo;
import cn.sparkant.utils.SerializeUtils;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


/**Redis緩存數據庫庫操做工具類
 * Created by hjs on 16/4/9.
 */
public class RedisUtils {

    private static final Logger logger = Logger.getLogger(RedisUtils.class);


    //Redis服務器IP
    private static final String ADDRESS = "10.211.55.8";
    //Redis的端口號
    private static final int PORT = 6379;
    //訪問密碼
    private static final String AUTH = "sparkant@123";
    //可用鏈接實例的最大數目,默認值爲8;
    //若是賦值爲-1,則表示不限制;若是pool已經分配了maxActive個jedis實例,則此時pool的狀態爲exhausted(耗盡)。
    private static final int MAX_ACTIVE = 1024;
    //控制一個pool最多有多少個狀態爲idle(空閒的)的jedis實例,默認值也是8
    private static final int MAX_IDLE = 200;
    //等待可用鏈接的最大時間,單位毫秒,默認值爲-1,表示永不超時。若是超過等待時間,則直接拋出JedisConnectionException;
    private static final int MAX_WAIT = 30000;
    private static final int TIME_OUT = 30000;
    //在borrow一個jedis實例時,是否提早進行validate操做;若是爲true,則獲得的jedis實例均是可用的;
    private static final boolean TEST_ON_BORROW = true;

    private static  JedisPool jedisPool = null;

    /**
     * 初始化Redis鏈接池
     */
    static {
        try {
                JedisPoolConfig config = new JedisPoolConfig();
                //config.setMaxActive(MAX_ACTIVE);
                config.setMaxIdle(MAX_IDLE);
                config.setMaxWaitMillis(MAX_WAIT);
                config.setTestOnBorrow(TEST_ON_BORROW);
                jedisPool = new JedisPool(config, ADDRESS, PORT, TIME_OUT, AUTH);
            } catch (Exception e) {
                logger.info("context",e);
            }
    }

    /**
     * 獲取Jedis實列
     * @return
     */
    public synchronized static Jedis getJedis() {
        Jedis resouce = null;
        try {
            if (jedisPool != null) {
                resouce = jedisPool.getResource();
            }
        } catch (Exception e) {
            logger.info("context",e);
        }
        return resouce;
    }

    /**
     * 釋放jedis資源
     */
    public static void releaseJedis(final Jedis jedis) {
        if (jedis != null) {
              jedis.close();
            }
    }

    /**
     * redis保存字符串
     * @param key
     * @param value
     */
    public static void setString(String key, String value) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key,value);
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * redis保存字符串 且有效時間
     * @param key
     * @param seconds 分鐘
     * @param value
     */
    public static void setString(String key, int seconds, String value) {
            Jedis jedis = getJedis();
        try {
            jedis.setex(key, seconds * 60, value);
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }

    }

    /**
     * 根據key獲取redis保存信息
     * @param key
     * @return
     */
    public static String getString(String key) {
        Jedis jedis = getJedis();
        String result = null;
        try {
            result = getJedis().get(key);
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
        return result;
    }

    /**
     * 刪除某鍵值
     * @param key
     */
    public static void deleteKey(String key) {
        Jedis jedis = getJedis();
        try {
            getJedis().del(key);
        }catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 根據內容拼接
     * @param key
     * @param value
     */
    public static void appendString(String key, String value) {
        Jedis jedis = getJedis();
        try {
            getJedis().append(key, value);
        }catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 保存對象
     * @param key
     * @param value
     */
    public static void setObject(String key,Object value) {
        Jedis jedis = getJedis();
        try {
            getJedis().set(key.getBytes(), SerializeUtils.serialize(value));
        }catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 保存對象,帶無效時間
     * @param key
     * @param seconds 分鐘
     * @param value
     */
    public static void setObject(String key,int seconds, Object value) {
        Jedis jedis = getJedis();
        try {
            getJedis().setex(key.getBytes(), seconds * 60, SerializeUtils.serialize(value));
        } catch (Exception e) {
            logger.error("error", e);
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 獲取保存對象
     * @param key
     * @return
     */
    public static Object getObject(String key) {
        Jedis jedis = getJedis();
        try {
            return SerializeUtils.unSerialize(getJedis().get(key.getBytes()));
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 存儲REDIS隊列 順序存儲
     * @param key 鍵值
     * @param value 對象
     */
    public static void lpush(String key, Object value) {
        Jedis jedis = getJedis();
        try {
            jedis.lpush(key.getBytes(), SerializeUtils.serialize(value));
        } catch (Exception e) {
            logger.error("error" , e);
        }finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 獲取隊列裏面第一個對象
     * @param key
     * @return
     */
    public static Object rpop(String key) {
        Jedis jedis = getJedis();
        try {
            return SerializeUtils.unSerialize(jedis.rpop(key.getBytes()));
        } finally {
            releaseJedis(jedis);
        }
    }


    /**
     * 存儲REDIS隊列 反向存儲
     * @param key 鍵值
     * @param value 對象
     */
    public static void rpush(String key, Object value) {
        Jedis jedis = getJedis();
        try {
            jedis.lpush(key.getBytes(), SerializeUtils.serialize(value));
        } catch (Exception e) {
            logger.error("error" , e);
        }finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 獲取隊列裏面最後一個對象
     * @param key
     * @return
     */
    public static Object lpop(String key) {
        Jedis jedis = getJedis();
        try {
            return SerializeUtils.unSerialize(jedis.lpop(key.getBytes()));
        } finally {
            releaseJedis(jedis);
        }
    }

    /**
     * 檢查key是否存在
     * @param key
     * @return
     */
    public static boolean existsKey(String key) {
        Jedis jedis = getJedis();
        try {
            return getJedis().exists(key);
        } finally {
            releaseJedis(jedis);
        }
    }

    public static void main(String[] args) {
        UserVo vo = new UserVo();
        vo.setEmail("cheng");
        vo.setUserId("18");
        RedisUtils.setObject("user",6, vo);
        vo = (UserVo)RedisUtils.getObject("user");
        System.out.println("userid:" + vo.getUserId());
        System.out.println(getJedis().exists("user"));
        RedisUtils.deleteKey("user");
        System.out.println(getJedis().exists("user"));
    }


}
相關文章
相關標籤/搜索