/** * 測試事務和樂觀鎖 */ @RunWith(SpringRunner.class) @SpringBootTest public class RedisTransactionTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 測試事務。 * 模擬場景是用戶出售商品到商店。 */ @Test public void test() { Assert.assertEquals(listItem(), true); } boolean listItem() { // 樂觀鎖 redisTemplate.watch("inventory:17"); // 判斷揹包內是否有要出售的物品ItemN,若是沒有就結束流程 if (!redisTemplate.opsForSet().isMember("inventory:17", "ItemN")) { redisTemplate.unwatch(); return false; } // 啓用事務 redisTemplate.setEnableTransactionSupport(true); redisTemplate.multi(); // 添加到商店 redisTemplate.opsForZSet().add("market:", "ItemN", 9); // 從揹包中移除 redisTemplate.opsForSet().remove("inventory:17", "ItemN"); // 執行事務 List<Object> result = redisTemplate.exec(); // 判斷事務是否爲空 if (result == null || result.size() == 0) return false; return true; } }
使用過時時間特性實現訪問次數限制。java
test方法redis
爲每一個IP建立一個Key,計數器爲Value,過時時間爲1分鐘,每次請求先判斷key是否存在,若是存在就自增計數器,當計數器次數大於指定次數則表明到達限制次數。該方法的缺點是在當前分鐘的最後一秒和下一分鐘的第一秒,能夠連續訪問。spring
test1方法ruby
爲每個IP建立一個List,List中存放請求時間,每次請求先判斷List中的長度是否小於指定次數,若是小於,則表明沒有到達限制,將當前請求時間放到List中。ide
若是List中長度大於指定次數,則取出最後一次請求的時間,與當前時間比較,判斷是否在指定時間內,若是在時間內,表明請求指定時間內的請求次數超標,若是不在指定時間內,則把當前時間放到隊列中,將隊列最先的時間彈出。函數
/** * 測試過時時間 */ @RunWith(SpringRunner.class) @SpringBootTest public class RedisExpireTest { private RedisTemplate<String, Object> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 設置ip訪問頻率,每分鐘只能訪問2次 */ @Test public void test() { String key = "rate.limiting:localhost"; List<String> keys = new ArrayList<>(); keys.add(key); // 判斷key是否存在 if (redisTemplate.countExistingKeys(keys) > 0) { // 若是存在就將這個ip的訪問計數器+1 long times = redisTemplate.opsForValue().increment(key); // 若是次數大於2,則提示操做了訪問頻率 if (times > 2) { System.out.println("超過了訪問頻率。"); } } else { // 若是key不存在,則建立這個ip的計數器 redisTemplate.opsForValue().increment(key); // 把計數器的超時時間設置成60秒 // 這種作法,在當前分鐘的最後一秒和下一分鐘的第一秒,能夠連續訪問 2 + 2 = 4次 redisTemplate.expire(key, 60, TimeUnit.SECONDS); } } @Test public void test1() { String key = "1rate.limiting:localhost"; long len = redisTemplate.opsForList().size(key); // 一分鐘內不可請求超過2次 if (len < 2) { // 保存當前時間 redisTemplate.opsForList().leftPush(key, System.currentTimeMillis()); } else { // 取出上一次的時間 long time = (long) redisTemplate.opsForList().index(key, -1); // 已經請求了2次,但與上次請求的時間的間隔不夠1分鐘 if (System.currentTimeMillis() - time < 60000) { System.out.println("超過了訪問頻率。"); } else { // 保存當前時間 redisTemplate.opsForList().leftPush(key, System.currentTimeMillis()); // 裁剪隊列 redisTemplate.opsForList().trim(key, 0, 1); } } } }
/** * 測試Sort * * @author xuepeng */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisSortTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 調用Sort排序 */ @Test public void test() { SortQuery<String> query = SortQueryBuilder.sort("mylist").build(); List<String> result = redisTemplate.sort(query); System.out.println(result); } /** * 調用Sort + By排序 */ @Test public void test1() { SortQuery<String> query = SortQueryBuilder.sort("sortbylist") .by("itemscore:*") .order(SortParameters.Order.DESC) .limit(0, -1).build(); List<String> result = redisTemplate.sort(query); System.out.println(result); } /** * 調用Sort + By + Get排序 */ @Test public void test2() { SortQuery<String> query = SortQueryBuilder.sort("tag:ruby:posts") .by("post:*->time") .order(SortParameters.Order.DESC) .limit(0, -1) .get("post:*->title") .get("post:*->time") .get("#") .build(); List<String> result = redisTemplate.sort(query); System.out.println(result); } }
經過brpop函數消費隊列中的數據,若是超過了指定的阻塞時間尚未消費到數據,則取消消費。post
brpop能夠同時消費多個隊列,順序是從左到右,根據此特性能夠實現優先級隊列。測試
/** * 阻塞隊列 * * @author xuepeng */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisQueueTest { private static Lock lock = new ReentrantLock(); private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } @Test public void test() throws InterruptedException { lock.lockInterruptibly(); RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory(); RedisConnection connection = connectionFactory.getConnection(); try { while (true) { List<byte[]> results = connection.bRPop(60, "queue".getBytes()); if (!CollectionUtils.isEmpty(results)) { for (byte[] bytes : results) { System.out.println(new String(bytes)); } } } } finally { lock.unlock(); RedisConnectionUtils.releaseConnection(connection, connectionFactory); } } }
加入監聽,監聽message這個key。ui
/** * 配置消費者,參數是實現了MessageListener的對象。 */ @Bean public MessageListenerAdapter adapter(RedisMessage message) { // onMessage 若是RedisMessage 中 沒有實現接口,這個參數必須跟RedisMessage中的讀取信息的方法名稱同樣 return new MessageListenerAdapter(message, "message"); } /** * 配置監聽容器。 */ @Bean public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory, MessageListenerAdapter adapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //監聽對應的channel container.addMessageListener(adapter, new PatternTopic("message")); return container; }
消息提供者this
/** * 測試發佈訂閱 */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisMQTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 發送消息到Redis的MQ。 */ @Test public void test() { redisTemplate.convertAndSend("message", "hello world"); } }
消息消費者
/** * 消費Redis的MQ */ @Component public class RedisMessage implements MessageListener { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } @Override public void onMessage(Message message, byte[] pattern) { RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); String msg = serializer.deserialize(message.getBody()); System.out.println("接收到的消息是:" + msg); } }
若是有批量操做redis的場景,能夠使用pipeline,減小了屢次鏈接redis的開銷。
package cc.xuepeng.ch06; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.StringRedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.SessionCallback; import org.springframework.test.context.junit4.SpringRunner; import java.util.List; /** * 測試管道發佈 */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisPipelineTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 發送消息到Redis的MQ。 */ @Test public void test() { List<Object> redisResult = redisTemplate.executePipelined(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection redisConnection) throws DataAccessException { for (int i = 0; i < 1000; i++) { redisConnection.incr("incr:test".getBytes()); } return null; } }); System.out.println(redisResult); } }