Redis管道

Redis之管道的使用java

原文地址: https://blog.piaoruiqing.com/blog/2019/06/24/redis管道redis

關鍵詞

Redis Pipelining: 客戶端能夠向服務器發送多個請求而無需等待回覆, 最後只需一步便可讀取回復.數據庫

RTT(Round Trip Time): 往返時間.bash

爲何要用管道

Redis是使用client-server模型和Request/Response協議的TCP服務器. 這意味着一般經過如下步驟完成請求:服務器

  • 客戶端向服務器發送查詢, 並一般以阻塞方式從套接字讀取服務器響應.
  • 服務器處理該命令並將響應發送回客戶端.

應用程序與Redis經過網絡進行鏈接, 可能很是快(本地迴環), 也可能很慢. 但不管網絡延遲是多少, 數據包都須要時間從客戶端傳輸到服務器, 而後從服務器返回到客戶端以進行回覆(此時間稱爲RTT). 當客戶端須要連續執行許多請求時(例如, 將多個元素添加到同一列表或使用多個鍵填充數據庫), 很容易發現這種頻繁操做很影響性能. 使用管道將屢次操做經過一次IO發送給Redis服務器, 而後一次性獲取每一條指令的結果, 以減小網絡上的開銷.網絡

頻繁操做但未使用管道的情形以下圖:async

使用管道後以下圖: 分佈式

如何使用

Jedis

/** jedis pool */
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
private static final JedisPool POOL =
    new JedisPool(new JedisPoolConfig(), "test-redis-server", 6379);
/** * test pipelining with Jedis */
@Test
public void testPipelining() {

    try (Jedis jedis = POOL.getResource()) {

        Pipeline pipelined = jedis.pipelined();	// (一)
        Response<String> response1 = pipelined.set("mykey1", "myvalue1");
        Response<String> response2 = pipelined.set("mykey2", "myvalue2");
        Response<String> response3 = pipelined.set("mykey3", "myvalue3");

        pipelined.sync();	// (二)

        LOGGER.info("cmd: SET mykey1 myvalue1, result: {}", response1.get());	// (三)
        LOGGER.info("cmd: SET mykey2 myvalue2, result: {}", response2.get());
        LOGGER.info("cmd: SET mykey3 myvalue3, result: {}", response3.get());
    }
}
複製代碼
  • (一): jedis.pipelined(): 獲取一個Pipeline用以批量執行指令.
  • (二): pipelined.sync(): 同步執行, 經過讀取所有Response來同步管道, 這個操做會關閉管道.
  • (三): response1.get(): 獲取執行結果. 注意: 在執行pipelined.sync()以前, get是沒法獲取到結果的.

Lettuce

private final Logger LOGGER = LoggerFactory.getLogger(getClass());

/** redis client */
private static final RedisClient CLIENT
        = RedisClient.create("redis://@test-redis-server:6379/0");
/** * test pipelining with Lettuce */
@Test
public void testPipelining() throws ExecutionException, InterruptedException {

    try (StatefulRedisConnection<String, String> connection = CLIENT.connect()) {

        RedisAsyncCommands<String, String> async = connection.async();
        async.setAutoFlushCommands(false);
        RedisFuture<String> future1 = async.set("mykey1", "myvalue1");
        RedisFuture<String> future2 = async.set("mykey2", "myvalue2");
        RedisFuture<String> future3 = async.set("mykey3", "myvalue3");

        async.flushCommands();

        LOGGER.info("cmd: SET mykey1 myvalue1, result: {}", future1.get());
        LOGGER.info("cmd: SET mykey2 myvalue2, result: {}", future1.get());
        LOGGER.info("cmd: SET mykey3 myvalue3, result: {}", future1.get());
    }
}
複製代碼

RedisTemplate

private final Logger LOGGER = LoggerFactory.getLogger(getClass());

@Resource
private StringRedisTemplate stringRedisTemplate;

/** * test pipelining with RedisTemplate */
@Test
public void testPipelining() {

    List<Object> objects 
        = stringRedisTemplate.executePipelined((RedisCallback<Object>)connection -> {

        connection.set("mykey1".getBytes(), "myvalue1".getBytes());
        connection.set("mykey2".getBytes(), "myvalue2".getBytes());
        connection.set("mykey3".getBytes(), "myvalue3".getBytes());
        return null;	// (一)
    });

    LOGGER.info("cmd: SET mykey myvalue, result: {}", objects);
}
複製代碼
  • (一): 此處必須返回null

簡單對比測試

redis服務器運行在同一個路由器下的樹莓派上.post

/** * pipeline vs direct */
@Test
public void compared() {

    try (Jedis jedis = POOL.getResource()) {   // warm up
        jedis.set("mykey", "myvalue");
    }

    try (Jedis jedis = POOL.getResource()) {
        long start = System.nanoTime();
        Pipeline pipelined = jedis.pipelined();
        for (int index = 0; index < 500; index++) {
            pipelined.set("mykey" + index, "myvalue" + index);
        }
        pipelined.sync();
        long end = System.nanoTime();
        LOGGER.info("pipeline cost: {} ns", end - start);
    }

    try (Jedis jedis = POOL.getResource()) {
        long start = System.nanoTime();
        for (int index = 0; index < 500; index++) {
            jedis.set("mykey" + index, "myvalue" + index);
        }
        long end = System.nanoTime();
        LOGGER.info("direct cost: {} ns", end - start);
    }
}
複製代碼

使用Jedis執行500條set, 執行結果以下:性能

22:16:00.523 [main] INFO - pipeline cost:   73681257 ns		// 管道
22:16:03.040 [main] INFO - direct cost  : 2511915103 ns		// 直接執行
複製代碼

500次set執行時間總和已經和管道執行一次的所消耗的時間不在一個量級上了.

擴展

摘自redis官方文檔

使用管道不單單是爲了下降RTT以減小延遲成本, 實際上使用管道也能大大提升Redis服務器中每秒可執行的總操做量. 這是由於, 在不使用管道的狀況下, 儘管操做單個命令開起來十分簡單, 但實際上這種頻繁的I/O操做形成的消耗是巨大的, 這涉及到系統讀寫的調用, 這意味着從用戶域到內核域.上下文切換會對速度產生極大的損耗.

使用管道操做時, 一般使用單個read() 系統調用讀取許多命令,並經過單個write()系統調用傳遞多個回覆. 所以, 每秒執行的總查詢數最初會隨着較長的管道線性增長, 並最終達到不使用管道技術獲的10倍, 以下圖所示:

系列文章

  1. Redis入門
  2. Redis使用進階
  3. Redis分佈式鎖

參考文獻

[版權聲明]
本文發佈於 樸瑞卿的博客, 容許非商業用途轉載, 但轉載必須保留原做者 樸瑞卿 及連接: blog.piaoruiqing.com. 若有受權方面的協商或合做, 請聯繫郵箱: piaoruiqing@gmail.com.
相關文章
相關標籤/搜索