redis經過pipeline提高吞吐量

案例目標

簡單介紹 redis pipeline 的機制,結合一段實例說明pipeline 在提高吞吐量方面發生的效用。redis

案例背景

應用系統在數據推送或事件處理過程當中,每每出現數據流通過多個網元;
然而在某些服務中,數據操做對redis 是強依賴的,在最近的一次分析中發現:
一次數據推送會對 redis 產生近30次讀寫操做!服務器

在數據推送業務中的性能壓測中,以數據上報 -> 下發應答爲一次事務;
而對於這樣的讀寫模型,redis 的操做過於頻繁,很快便致使系統延時太高,吞吐量低下,沒法知足目標;併發

優化過程 主要針對業務代碼作的優化,其中redis 操做通過大量合併,最終下降到原來的1/5,而系統吞吐量也提高明顯。
其中,redis pipeline(管道機制) 的應用是一個關鍵手段。dom

pipeline的解釋

Pipeline指的是管道技術,指的是客戶端容許將多個請求依次發給服務器,過程當中而不須要等待請求的回覆,在最後再一併讀取結果便可。
管道技術使用普遍,例如許多POP3協議已經實現支持這個功能,大大加快了從服務器下載新郵件的過程。
Redis很早就支持管道(pipeline)技術。(所以不管你運行的是什麼版本,你均可以使用管道(pipelining)操做Redis)性能

普通請求模型

[圖-pipeline1]優化

Pipeline請求模型

[圖-pipeline2]線程

從兩個圖的對比中可看出,普通的請求模型是同步的,每次請求對應一次IO操做等待;
而Pipeline 化以後全部的請求合併爲一次IO,除了時延能夠下降以外,還能大幅度提高系統吞吐量。code

代碼實例

說明
本地開啓50個線程,每一個線程完成1000個key的寫入,對比pipeline開啓及不開啓兩種場景下的性能表現。blog

相關常量事件

// 併發任務
    private static final int taskCount = 50;
    // pipeline大小
    private static final int batchSize = 10;
    // 每一個任務處理命令數
    private static final int cmdCount = 1000;

    private static final boolean usePipeline = true;

初始化鏈接

JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxActive(200);
        poolConfig.setMaxIdle(100);
        poolConfig.setMaxWait(2000);
        poolConfig.setTestOnBorrow(false);
        poolConfig.setTestOnReturn(false);

        jedisPool = new JedisPool(poolConfig, host, port);

併發啓動任務,統計執行時間

public static void main(String[] args) throws InterruptedException {
        init();

        flushDB();

        long t1 = System.currentTimeMillis();
        ExecutorService executor = Executors.newCachedThreadPool();

        CountDownLatch latch = new CountDownLatch(taskCount);
        for (int i = 0; i < taskCount; i++) {
            executor.submit(new DemoTask(i, latch));
        }

        latch.await();
        executor.shutdownNow();

        long t2 = System.currentTimeMillis();

        System.out.println("execution finish time(s):" + (t2 - t1) / 1000.0);

    }

DemoTask 封裝了執行key寫入的細節,區分不一樣場景

public void run() {
            logger.info("Task[{}] start.", id);
            try {
                if (usePipeline) {
                    runWithPipeline();
                } else {
                    runWithNonPipeline();
                }
            } finally {
                latch.countDown();
            }

            logger.info("Task[{}] end.", id);
     }

不使用Pipeline的場景比較簡單,循環執行set操做

for (int i = 0; i < cmdCount; i++) {
                Jedis jedis = get();
                try {
                    jedis.set(key(i), UUID.randomUUID().toString());
                } finally {
                    if (jedis != null) {
                        jedisPool.returnResource(jedis);
                    }
                }
                if (i % batchSize == 0) {
                    logger.info("Task[{}] process -- {}", id, i);
                }
            }

使用Pipeline,須要處理分段,如10個做爲一批命令執行

for (int i = 0; i < cmdCount;) {
                Jedis jedis = get();

                try {
                    Pipeline pipeline = jedis.pipelined();
                    int j;
                    for (j = 0; j < batchSize; j++) {
                        if (i + j < cmdCount) {
                            pipeline.set(key(i + j), UUID.randomUUID().toString());
                        } else {
                            break;
                        }
                    }
                    pipeline.sync();
                    logger.info("Task[{}] pipeline -- {}", id, i + j);

                    i += j;

                } finally {
                    if (jedis != null) {
                        jedisPool.returnResource(jedis);
                    }
                }

            }

運行結果

不使用Pipeline,總體執行26s;而使用Pipeline優化後的代碼,執行時間僅須要3s!

NoPipeline-stat

[圖-nopipeline]

Pipeline-stat

[圖-pipeline]

注意事項

  • pipeline機制能夠優化吞吐量,但沒法提供原子性/事務保障,而這個能夠經過Redis-Multi等命令實現。
    參考這裏
  • 部分讀寫操做存在相關依賴,沒法使用pipeline實現,可利用Script機制,但須要在可維護性方面作好取捨。

擴展閱讀

官方文檔-Redis-Pipelining
官方文檔-Redis-Transaction

相關文章
相關標籤/搜索