Redis的高級特性使用

樂觀鎖和事務

  • 使用watch監控某一個key;
  • 開啓事務進行操做;
  • 操做完成後提交事務;
  • 若在提交前,watch的key發生了變化,提交的事務將不起做用;
/**
 * 測試事務和樂觀鎖
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisTransactionTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 測試事務。
     * 模擬場景是用戶出售商品到商店。
     */
    @Test
    public void test() {
        Assert.assertEquals(listItem(), true);
    }

    boolean listItem() {
        // 樂觀鎖
        redisTemplate.watch("inventory:17");
        // 判斷揹包內是否有要出售的物品ItemN,若是沒有就結束流程
        if (!redisTemplate.opsForSet().isMember("inventory:17", "ItemN")) {
            redisTemplate.unwatch();
            return false;
        }
        // 啓用事務
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.multi();
        // 添加到商店
        redisTemplate.opsForZSet().add("market:", "ItemN", 9);
        // 從揹包中移除
        redisTemplate.opsForSet().remove("inventory:17", "ItemN");
        // 執行事務
        List<Object> result = redisTemplate.exec();
        // 判斷事務是否爲空
        if (result == null || result.size() == 0) return false;
        return true;
    }

}

過時時間

使用過時時間特性實現訪問次數限制。java

test方法redis

爲每一個IP建立一個Key,計數器爲Value,過時時間爲1分鐘,每次請求先判斷key是否存在,若是存在就自增計數器,當計數器次數大於指定次數則表明到達限制次數。該方法的缺點是在當前分鐘的最後一秒和下一分鐘的第一秒,能夠連續訪問。spring

test1方法ruby

爲每個IP建立一個List,List中存放請求時間,每次請求先判斷List中的長度是否小於指定次數,若是小於,則表明沒有到達限制,將當前請求時間放到List中。ide

若是List中長度大於指定次數,則取出最後一次請求的時間,與當前時間比較,判斷是否在指定時間內,若是在時間內,表明請求指定時間內的請求次數超標,若是不在指定時間內,則把當前時間放到隊列中,將隊列最先的時間彈出。函數

/**
 * 測試過時時間
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisExpireTest {

    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 設置ip訪問頻率,每分鐘只能訪問2次
     */
    @Test
    public void test() {
        String key = "rate.limiting:localhost";
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 判斷key是否存在
        if (redisTemplate.countExistingKeys(keys) > 0) {
            // 若是存在就將這個ip的訪問計數器+1
            long times = redisTemplate.opsForValue().increment(key);
            // 若是次數大於2,則提示操做了訪問頻率
            if (times > 2) {
                System.out.println("超過了訪問頻率。");
            }
        } else {
            // 若是key不存在,則建立這個ip的計數器
            redisTemplate.opsForValue().increment(key);
            // 把計數器的超時時間設置成60秒
            // 這種作法,在當前分鐘的最後一秒和下一分鐘的第一秒,能夠連續訪問 2 + 2 = 4次
            redisTemplate.expire(key, 60, TimeUnit.SECONDS);
        }
    }

    @Test
    public void test1() {
        String key = "1rate.limiting:localhost";
        long len = redisTemplate.opsForList().size(key);
        // 一分鐘內不可請求超過2次
        if (len < 2) {
            // 保存當前時間
            redisTemplate.opsForList().leftPush(key, System.currentTimeMillis());
        } else {
            // 取出上一次的時間
            long time = (long) redisTemplate.opsForList().index(key, -1);
            // 已經請求了2次,但與上次請求的時間的間隔不夠1分鐘
            if (System.currentTimeMillis() - time < 60000) {
                System.out.println("超過了訪問頻率。");
            } else {
                // 保存當前時間
                redisTemplate.opsForList().leftPush(key, System.currentTimeMillis());
                // 裁剪隊列
                redisTemplate.opsForList().trim(key, 0, 1);
            }
        }
    }
}

Sort排序

  • sort:test方法中實現,能夠對List類型,Set類型,ZSet類型進行排序;
  • by:test1方法中實現,能夠經過by鏈接Hash或String類型作排序字段;
  • get:test2方法中實現,做用是返回get參數指定的值,能夠是用於by的對象的值,若是想返回主key的值則能夠使用#號;
/**
 * 測試Sort
 *
 * @author xuepeng
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisSortTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 調用Sort排序
     */
    @Test
    public void test() {
        SortQuery<String> query = SortQueryBuilder.sort("mylist").build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }

    /**
     * 調用Sort + By排序
     */
    @Test
    public void test1() {
        SortQuery<String> query = SortQueryBuilder.sort("sortbylist")
                .by("itemscore:*")
                .order(SortParameters.Order.DESC)
                .limit(0, -1).build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }

    /**
     * 調用Sort + By + Get排序
     */
    @Test
    public void test2() {
        SortQuery<String> query = SortQueryBuilder.sort("tag:ruby:posts")
                .by("post:*->time")
                .order(SortParameters.Order.DESC)
                .limit(0, -1)
                .get("post:*->title")
                .get("post:*->time")
                .get("#")
                .build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }


}

阻塞隊列

經過brpop函數消費隊列中的數據,若是超過了指定的阻塞時間尚未消費到數據,則取消消費。post

brpop能夠同時消費多個隊列,順序是從左到右,根據此特性能夠實現優先級隊列。測試

/**
 * 阻塞隊列
 *
 * @author xuepeng
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisQueueTest {

    private static Lock lock = new ReentrantLock();

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Test
    public void test() throws InterruptedException {
        lock.lockInterruptibly();
        RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
        RedisConnection connection = connectionFactory.getConnection();

        try {
            while (true) {
                List<byte[]> results = connection.bRPop(60, "queue".getBytes());
                if (!CollectionUtils.isEmpty(results)) {
                    for (byte[] bytes : results) {
                        System.out.println(new String(bytes));
                    }
                }
            }
        } finally {
            lock.unlock();
            RedisConnectionUtils.releaseConnection(connection, connectionFactory);
        }

    }

}

發佈/訂閱

加入監聽,監聽message這個key。ui

/**
     * 配置消費者,參數是實現了MessageListener的對象。
     */
    @Bean
    public MessageListenerAdapter adapter(RedisMessage message) {
        // onMessage 若是RedisMessage 中 沒有實現接口,這個參數必須跟RedisMessage中的讀取信息的方法名稱同樣
        return new MessageListenerAdapter(message, "message");
    }

    /**
     * 配置監聽容器。
     */
    @Bean
    public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory,
                                                   MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //監聽對應的channel
        container.addMessageListener(adapter, new PatternTopic("message"));
        return container;
    }

消息提供者this

/**
 * 測試發佈訂閱
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisMQTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 發送消息到Redis的MQ。
     */
    @Test
    public void test() {
        redisTemplate.convertAndSend("message", "hello world");
    }

}

消息消費者

/**
 * 消費Redis的MQ
 */
@Component
public class RedisMessage implements MessageListener {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
        String msg = serializer.deserialize(message.getBody());
        System.out.println("接收到的消息是:" + msg);
    }

}

管道

若是有批量操做redis的場景,能夠使用pipeline,減小了屢次鏈接redis的開銷。

package cc.xuepeng.ch06;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.StringRedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;

/**
 * 測試管道發佈
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisPipelineTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 發送消息到Redis的MQ。
     */
    @Test
    public void test() {
        List<Object> redisResult = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
                for (int i = 0; i < 1000; i++) {
                    redisConnection.incr("incr:test".getBytes());
                }
                return null;
            }
        });
        System.out.println(redisResult);
    }

}
相關文章
相關標籤/搜索