Redis:解決分佈式高併發修改同一個Key的問題

本篇文章是經過watch(監控)+mutil(事務)實現應用於在分佈式高併發處理等相關場景。下邊先經過redis-cli.exe來測試多個線程修改時,遇到問題及解決問題。html

高併發下修改同一個key遇到的問題:

1)定義一個hash類型的key,key爲:lock_test,元素locker的值初始化爲0。java

2)實現高併發下對locker元素的值遞增:定義64個多線程,併發的對lock_test元素locker的值進行修改。redis

package com.dx.es;

import java.util.concurrent.CountDownLatch;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class Test_UnLock {
    public static void main(String[] args) {
        final JedisPool pool = RedisUtil.getPool();

        // 得到jedis對象
        Jedis jedis = pool.getResource();
        jedis.hset("lock_test", "locker", "0");
        String val = jedis.hget("lock_test", "locker");
        System.out.println("lock_test.locker的初始值為:" + val);
        jedis.close();

        int threahSize = 64;
        final CountDownLatch threadsCountDownLatch = new CountDownLatch(threahSize);

        Runnable handler = new Runnable() {
            public void run() {
                Jedis jedis = pool.getResource();

                Integer integer = Integer.valueOf(jedis.hget("lock_test", "locker"));
                jedis.hset("lock_test", "locker", String.valueOf(integer + 1));

                jedis.close();
                threadsCountDownLatch.countDown();
            }
        };

        for (int i = 0; i < threahSize; i++) {
            new Thread(handler).start();
        }

        // 等待全部並行子線程任務完成。
        try {
            threadsCountDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("complete");

        val = jedis.hget("lock_test", "locker");
        System.out.println(val);
    }
}

RedisUtil.java數組

package com.dx.es;

import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {
    public static JedisPool getPool() {
        // 簡單建立 Jedis的方法:
        // final Jedis jedis = new Jedis("127.0.0.1",6379);

        // 下邊使用線程池的方案:jedis對象是線程不安全的,所以在併發狀況下要使用JedisPool,默認狀況下jedisPool只支持8個鏈接,所以在聲明JedisPool時要先修改JedisPool的最大鏈接數
        JedisPoolConfig config = new JedisPoolConfig();
        // 修改最大鏈接數
        config.setMaxTotal(32);
        
        // 聲明一個線程池
        JedisPool pool = new JedisPool(config, "127.0.0.1", 6379);

        return pool;
    }
}

此時,會出現如下問題:緩存

  1. A線程獲取key的值爲0,而B線程也獲取jkey的值0,則A把key值遞增爲1,B線程也實現把key值遞增爲1。兩個線程都執行了key值修改:0到1。
  2. 在1)中最終key修改成了1,可是c線程獲取key的值爲0(由於c線程讀取key值時,a、b線程還未觸發修改,所以c線程讀取到的值爲0),此時d線程讀取到的值爲1(由於d線程讀取key值時,a、b線程已觸發修改,一次d線程取到的值爲1)。
  3. 此時假設d線程優先觸發遞增,則在c線程未觸發提交以前d線程已經把值修改了2,可是c此時並不知道在它獲取到值到修改以前這段時間發生了什麼,直接把值修改1。

此時執行打印結果爲:安全

lock_test.locker的初始值為:0
complete
24 #備註:也多是其餘值,多是正確值64的可能性比較小。

經過watch+mutil解決併發修改的問題:

須要掌握Redis 事務命令:

下表列出了 redis 事務的相關命令:多線程

序號 命令 描述 可用版本
 1  DISCARD

Redis Discard 命令用於取消事務,放棄執行事務塊內的全部命令。併發

語法 redis Discard 命令基本語法以下:dom

redis 127.0.0.1:6379> DISCARD
 >= 2.0.0
 2  EXEC

Redis Exec 命令用於執行全部事務塊內的命令。分佈式

語法 redis Exec 命令基本語法以下:

redis 127.0.0.1:6379>Exec
 >= 1.2.0
 3  MULTI

Redis Multi 命令用於標記一個事務塊的開始。

事務塊內的多條命令會按照前後順序被放進一個隊列當中,最後由 EXEC 命令原子性(atomic)地執行。

語法 redis Multi 命令基本語法以下:

redis 127.0.0.1:6379>Multi
 >= 1.2.0
 4  UNWATCH

Redis Unwatch 命令用於取消 WATCH 命令對全部 key 的監視。

語法 redis Unwatch 命令基本語法以下:

redis 127.0.0.1:6379> UNWATCH 
 >= 2.2.0
 5  WATCH

Redis Watch 命令用於監視一個(或多個) key ,若是在事務執行以前這個(或這些) key 被其餘命令所改動,那麼事務將被打斷

語法 redis Watch 命令基本語法以下:

WATCH key [key ...]
 >= 2.2.0

 Redis 事務能夠一次執行多個命令, 而且帶有如下兩個重要的保證:

  • 批量操做在發送 EXEC 命令前被放入隊列緩存。
  • 收到 EXEC 命令後進入事務執行,事務中任意命令執行失敗,其他的命令依然被執行。
  • 在事務執行過程,其餘客戶端提交的命令請求不會插入到事務執行命令序列中。

一個事務從開始到執行會經歷如下三個階段:

  • 開始事務。
  • 命令入隊。
  • 執行事務。

備註:概念性摘自《http://www.runoob.com/redis/redis-transactions.html》

redis-cli.exe下的事務操做:

# 事務被成功執行
redis 127.0.0.1:6379> MULTI OK redis 127.0.0.1:6379> INCR user_id QUEUED redis 127.0.0.1:6379> INCR user_id QUEUED redis 127.0.0.1:6379> INCR user_id QUEUED redis 127.0.0.1:6379> PING QUEUED redis 127.0.0.1:6379> EXEC 1) (integer) 1 2) (integer) 2 3) (integer) 3 4) PONG

併發狀況下使用watch+mutil操做:

事務塊內全部命令的返回值,按命令執行的前後順序排列。 當操做被打斷時,返回空值 nil 。

A線程:

# 監視 key ,且事務成功執行
redis 127.0.0.1:6379> WATCH lock lock_times OK redis 127.0.0.1:6379> MULTI OK redis 127.0.0.1:6379> SET lock "huangz" QUEUED redis 127.0.0.1:6379> INCR lock_times QUEUED redis 127.0.0.1:6379> EXEC 1) OK 2) (integer) 1

B線程:

# 監視 key ,且事務被打斷
redis 127.0.0.1:6379> WATCH lock lock_times OK redis 127.0.0.1:6379> MULTI OK redis 127.0.0.1:6379> SET lock "joe" # 就在這時,另外一個客戶端修改了 lock_times 的值 QUEUED redis 127.0.0.1:6379> INCR lock_times QUEUED redis 127.0.0.1:6379> EXEC # 由於 lock_times 被修改, joe 的事務執行失敗 (nil)

上邊演示了A、B線程併發下的watch+mutil操做狀況。

解決高併發下修改同一個key遇到的問題:

package com.dx.es;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;

public class Test_Lock3 {
    public static void main(String[] args) {
        final JedisPool pool = RedisUtil.getPool();

        // 對測試key賦初始值
        Jedis jedis = pool.getResource();
        jedis.hset("lock_test", "locker", "0");
        String val = jedis.hget("lock_test", "locker");
        System.out.println("lock_test.locker的初始值為:" + val);
        jedis.close();

        int threahSize = 64;
        final CountDownLatch threadsCountDownLatch = new CountDownLatch(threahSize);

        Runnable handler = new Runnable() {
            public void run() {
                Jedis jedis = pool.getResource();

                while (true) {
                    jedis.watch("lock_test");
                    String val = jedis.hget("lock_test", "locker");
                    Integer integer = Integer.valueOf(val);
                    Transaction tx = jedis.multi();

                    tx.hset("lock_test", "locker", String.valueOf(integer + 1));

                    List<Object> exec = tx.exec();

                    if (exec == null || exec.isEmpty()) {
                        System.out.println(Thread.currentThread().getName() + ":" + "Error:(" + val + "=>" + (integer + 1) + ")");
                    } else {
                        String values = "";
                        for (int i = 0; i < exec.size(); i++) {
                            values += exec.get(i).toString();
                        }
                        System.out.println(Thread.currentThread().getName() + ":" + values + ":(" + val + "=>" + (integer + 1) + ")");
                        break;
                    }

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                jedis.close();
                threadsCountDownLatch.countDown();
            }
        };

        for (int i = 0; i < threahSize; i++) {
            new Thread(handler).start();
        }

        // 等待全部並行子線程任務完成。
        try {
            threadsCountDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("complete");

        val = jedis.hget("lock_test", "locker");
        System.out.println(val);
    }
}

打印結果:

lock_test.locker的初始值為:0
Thread-8:0:(0=>1)
Thread-56:Error:(1=>2)
Thread-53:0:(1=>2)
Thread-25:Error:(0=>1)
Thread-18:Error:(0=>1)
Thread-3:Error:(0=>1)
Thread-30:Error:(2=>3)
Thread-11:Error:(2=>3)
Thread-9:Error:(0=>1)
Thread-63:0:(2=>3)
Thread-7:Error:(0=>1)
Thread-10:0:(3=>4)
Thread-34:0:(4=>5)
Thread-65:Error:(4=>5)
Thread-24:Error:(0=>1)
Thread-17:Error:(0=>1)
Thread-62:0:(5=>6)
Thread-29:Error:(6=>7)
Thread-61:0:(6=>7)
Thread-64:0:(7=>8)
Thread-16:Error:(0=>1)
Thread-19:Error:(8=>9)
Thread-6:Error:(0=>1)
Thread-28:Error:(0=>1)
Thread-21:Error:(0=>1)
Thread-14:Error:(0=>1)
Thread-20:Error:(0=>1)
Thread-5:Error:(0=>1)
Thread-13:Error:(0=>1)
Thread-15:Error:(0=>1)
Thread-22:Error:(0=>1)
Thread-4:Error:(0=>1)
Thread-12:Error:(0=>1)
Thread-23:Error:(0=>1)
Thread-54:Error:(0=>1)
Thread-57:Error:(0=>1)
Thread-26:0:(8=>9)
Thread-27:Error:(8=>9)
Thread-32:Error:(9=>10)
Thread-35:Error:(9=>10)
Thread-56:0:(9=>10)
Thread-33:Error:(10=>11)
Thread-50:0:(10=>11)
Thread-31:0:(11=>12)
Thread-38:0:(12=>13)
Thread-25:Error:(13=>14)
Thread-36:0:(13=>14)
Thread-39:Error:(14=>15)
Thread-14:0:(14=>15)
Thread-19:Error:(14=>15)
Thread-17:Error:(14=>15)
Thread-6:Error:(14=>15)
Thread-9:Error:(14=>15)
Thread-33:Error:(14=>15)
Thread-35:Error:(14=>15)
Thread-23:Error:(14=>15)
Thread-18:Error:(14=>15)
Thread-15:Error:(14=>15)
Thread-11:Error:(14=>15)
Thread-7:Error:(14=>15)
Thread-57:Error:(14=>15)
Thread-27:Error:(14=>15)
Thread-16:Error:(14=>15)
Thread-65:Error:(14=>15)
Thread-24:Error:(14=>15)
Thread-13:Error:(14=>15)
Thread-32:Error:(15=>16)
Thread-28:Error:(14=>15)
Thread-21:Error:(14=>15)
Thread-30:Error:(14=>15)
Thread-54:Error:(14=>15)
Thread-22:Error:(14=>15)
Thread-25:Error:(14=>15)
Thread-3:Error:(14=>15)
Thread-29:Error:(14=>15)
Thread-5:Error:(14=>15)
Thread-12:Error:(14=>15)
Thread-20:Error:(14=>15)
Thread-40:0:(15=>16)
Thread-4:Error:(14=>15)
Thread-41:0:(16=>17)
Thread-44:0:(17=>18)
Thread-45:0:(18=>19)
Thread-47:0:(19=>20)
Thread-43:0:(20=>21)
Thread-48:0:(21=>22)
Thread-37:0:(22=>23)
Thread-49:0:(23=>24)
Thread-55:0:(24=>25)
Thread-60:0:(25=>26)
Thread-42:0:(26=>27)
Thread-52:0:(27=>28)
Thread-46:0:(28=>29)
Thread-58:0:(29=>30)
Thread-51:0:(30=>31)
Thread-66:0:(31=>32)
Thread-59:0:(32=>33)
Thread-17:0:(33=>34)
Thread-19:Error:(33=>34)
Thread-39:Error:(33=>34)
Thread-28:0:(34=>35)
Thread-54:Error:(34=>35)
Thread-65:Error:(34=>35)
Thread-25:Error:(34=>35)
Thread-30:Error:(34=>35)
Thread-5:Error:(35=>36)
Thread-13:Error:(35=>36)
Thread-16:Error:(34=>35)
Thread-6:Error:(34=>35)
Thread-9:Error:(34=>35)
Thread-21:Error:(35=>36)
Thread-29:Error:(35=>36)
Thread-33:Error:(34=>35)
Thread-57:Error:(35=>36)
Thread-24:Error:(34=>35)
Thread-22:Error:(34=>35)
Thread-32:Error:(35=>36)
Thread-23:Error:(34=>35)
Thread-7:Error:(34=>35)
Thread-15:Error:(34=>35)
Thread-4:0:(35=>36)
Thread-20:Error:(35=>36)
Thread-12:Error:(35=>36)
Thread-35:0:(36=>37)
Thread-18:Error:(36=>37)
Thread-11:Error:(36=>37)
Thread-3:Error:(36=>37)
Thread-27:Error:(37=>38)
Thread-39:Error:(37=>38)
Thread-19:0:(37=>38)
Thread-7:Error:(38=>39)
Thread-33:0:(38=>39)
Thread-29:Error:(38=>39)
Thread-16:Error:(38=>39)
Thread-22:Error:(38=>39)
Thread-65:Error:(38=>39)
Thread-54:Error:(38=>39)
Thread-57:Error:(38=>39)
Thread-30:Error:(38=>39)
Thread-21:Error:(38=>39)
Thread-24:Error:(38=>39)
Thread-32:Error:(39=>40)
Thread-5:Error:(39=>40)
Thread-13:Error:(39=>40)
Thread-6:Error:(38=>39)
Thread-25:Error:(38=>39)
Thread-9:Error:(38=>39)
Thread-20:0:(39=>40)
Thread-12:Error:(39=>40)
Thread-15:Error:(38=>39)
Thread-23:Error:(38=>39)
Thread-18:Error:(40=>41)
Thread-3:Error:(40=>41)
Thread-27:0:(40=>41)
Thread-11:Error:(40=>41)
Thread-39:0:(41=>42)
Thread-7:Error:(42=>43)
Thread-54:0:(42=>43)
Thread-22:Error:(42=>43)
Thread-30:Error:(42=>43)
Thread-57:Error:(42=>43)
Thread-65:Error:(42=>43)
Thread-32:Error:(43=>44)
Thread-24:Error:(43=>44)
Thread-5:Error:(42=>43)
Thread-21:Error:(42=>43)
Thread-16:Error:(43=>44)
Thread-29:Error:(43=>44)
Thread-6:0:(43=>44)
Thread-9:Error:(43=>44)
Thread-23:Error:(43=>44)
Thread-25:Error:(43=>44)
Thread-15:Error:(43=>44)
Thread-12:Error:(43=>44)
Thread-13:Error:(44=>45)
Thread-11:0:(44=>45)
Thread-18:Error:(44=>45)
Thread-3:Error:(44=>45)
Thread-57:0:(45=>46)
Thread-7:Error:(45=>46)
Thread-22:Error:(45=>46)
Thread-30:Error:(45=>46)
Thread-9:0:(46=>47)
Thread-65:Error:(46=>47)
Thread-25:Error:(46=>47)
Thread-24:Error:(46=>47)
Thread-21:Error:(47=>48)
Thread-32:Error:(47=>48)
Thread-5:Error:(47=>48)
Thread-15:Error:(47=>48)
Thread-16:0:(47=>48)
Thread-29:Error:(47=>48)
Thread-12:Error:(48=>49)
Thread-23:Error:(47=>48)
Thread-13:0:(48=>49)
Thread-18:Error:(49=>50)
Thread-3:Error:(49=>50)
Thread-7:0:(49=>50)
Thread-30:Error:(49=>50)
Thread-22:Error:(49=>50)
Thread-25:0:(50=>51)
Thread-65:Error:(50=>51)
Thread-12:0:(51=>52)
Thread-21:Error:(51=>52)
Thread-32:Error:(52=>53)
Thread-24:Error:(52=>53)
Thread-29:Error:(52=>53)
Thread-5:Error:(52=>53)
Thread-23:Error:(52=>53)
Thread-15:Error:(52=>53)
Thread-18:0:(52=>53)
Thread-3:Error:(52=>53)
Thread-30:0:(53=>54)
Thread-22:Error:(53=>54)
Thread-65:0:(54=>55)
Thread-24:0:(55=>56)
Thread-21:Error:(55=>56)
Thread-32:Error:(55=>56)
Thread-5:0:(56=>57)
Thread-29:Error:(57=>58)
Thread-15:0:(57=>58)
Thread-23:Error:(57=>58)
Thread-3:0:(58=>59)
Thread-22:0:(59=>60)
Thread-32:0:(60=>61)
Thread-21:Error:(60=>61)
Thread-29:0:(61=>62)
Thread-23:Error:(61=>62)
Thread-21:0:(62=>63)
Thread-23:0:(63=>64)
complete
64

備註:實際應用場景中我這裏不是實現自增,上邊代碼自增目的:測試是否可以在高併發下修改同一個key時實現鎖的功能。

實際應用:高併發使用redis同一個key存儲統計TOPN結果

需求:

1)擁有多個記錄,每條記錄數據格式:日期,統計指標1,統計指標2,統計指標3,統計指標4

2)要求分別統計出4個指標的最大值,並記錄各自指標最大值時對應的記錄。

3)記錄中各個字段的模擬公式:

// 數據格式:
// 第一列:日期
// 第二列:f(x) = x*0.7+1(x天然數)      最大值 45.1
// 第三列:f(x)= -x*x+20x+1(天然數)    a=-1,b=20,c=1 , 當x=-b/2a時 y 最大值:-10*10+20*10+1=101
// 第四列:f(x)= -x*x+30x+1(天然數)    a=-1,b=30,c=1 , 當x=-b/2a時 y 最大值:-15*15+30*15+1=445-225+1=221
// 第五列:f(x)= 640/(x+1)(x天然數)    最大值 640    

使用mutil+watch實現:

首先,把最終統計結果存儲到redis的key(lock_test)中,lock_test是一個hash類型,其中存儲四個鍵值對,每一個鍵值對分別存儲各自的指標最大值時對應的記錄。

指標配置項存儲到FeildConfig.java

    static class FieldConfig {
        private String key;
        private Integer index;

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }

        public Integer getIndex() {
            return index;
        }

        public void setIndex(Integer index) {
            this.index = index;
        }

        public FieldConfig(String key, Integer index) {
            super();
            this.key = key;
            this.index = index;
        }
    }

在測試類的static構造函數中初始化各自指標在一條記錄的中索引位置,及一共包含的指標類型:

public class Test_Lock3 {
    。。。
    final static List<FieldConfig> keys = new ArrayList<FieldConfig>();

    static {
        keys.add(new FieldConfig("101", 1));
        keys.add(new FieldConfig("102", 2));
        keys.add(new FieldConfig("103", 3));
        keys.add(new FieldConfig("104", 4));
    }
    。。。
}

備註:101表明指標1,對應在被統計記錄中的第1索引位置(備註:這裏索引是對記錄按照「,」分割後的數組索引位置)。

製做測試數據:

        List<String> dataItems=new ArrayList<String>();

        // 數據格式:
        // 第一列:日期
        // 第二列:f(x) = x*0.7+1(x天然數)      最大值 45.1
        // 第三列:f(x)= -x*x+20x+1(天然數)    a=-1,b=20,c=1 , 當x=-b/2a時 y 最大值:-10*10+20*10+1=101
        // 第四列:f(x)= -x*x+30x+1(天然數)    a=-1,b=30,c=1 , 當x=-b/2a時 y 最大值:-15*15+30*15+1=445-225+1=221
        // 第五列:f(x)= 640/(x+1)(x天然數)    最大值 640        
        for(int x=0;x<threahSize;x++){
            String dataItem = "2018-07-29 00:00:00," + (x*0.7d+1) + "," + (-1d*x*x+20*x+1) + "," + (-1d*x*x+30*x+1) + "," + (640d/(x+1));
            dataItems.add(dataItem);
        }

如下是總體測試代碼:

package com.dx.es;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;

public class Test_Lock3 {
    final static JedisPool pool = RedisUtil.getPool();
    final static List<FieldConfig> keys = new ArrayList<FieldConfig>();

    static {
        keys.add(new FieldConfig("101", 1));
        keys.add(new FieldConfig("102", 2));
        keys.add(new FieldConfig("103", 3));
        keys.add(new FieldConfig("104", 4));
    }

    public static void main(String[] args) {
        int threahSize = 64;
        final CountDownLatch threadsCountDownLatch = new CountDownLatch(threahSize);
        Jedis jedis = pool.getResource();
        jedis.del("lock_test");
        
        List<String> dataItems=new ArrayList<String>();

        // 數據格式:
        // 第一列:日期
        // 第二列:f(x) = x*0.7+1(x天然數)      最大值 45.1
        // 第三列:f(x)= -x*x+20x+1(天然數)    a=-1,b=20,c=1 , 當x=-b/2a時 y 最大值:-10*10+20*10+1=101
        // 第四列:f(x)= -x*x+30x+1(天然數)    a=-1,b=30,c=1 , 當x=-b/2a時 y 最大值:-15*15+30*15+1=445-225+1=221
        // 第五列:f(x)= 640/(x+1)(x天然數)    最大值 640        
        for(int x=0;x<threahSize;x++){
            String dataItem = "2018-07-29 00:00:00," + (x*0.7d+1) + "," + (-1d*x*x+20*x+1) + "," + (-1d*x*x+30*x+1) + "," + (640d/(x+1));
            dataItems.add(dataItem);
        }
        
        
        for (int i = 0; i < threahSize; i++) {
            Jedis jedisT = pool.getResource();
            
            Runnable handler = new MyRunable(jedisT, threadsCountDownLatch, dataItems.get(i));

            new Thread(handler).start();
        }

        // 等待全部並行子線程任務完成。
        try {
            threadsCountDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("complete");

        Map<String, String> values = jedis.hgetAll("lock_test");
        jedis.close();

        System.out.println(values);
    }

    static class MyRunable implements Runnable {
        private Jedis jedis = null;
        private CountDownLatch threadsCountDownLatch = null;
        private String newLine = null;

        public MyRunable(Jedis jedis, CountDownLatch threadsCountDownLatch, String newLine) {
            this.jedis = jedis;
            this.threadsCountDownLatch = threadsCountDownLatch;
            this.newLine = newLine;
        }

        public void run() {
            while (true) {
                this.jedis.watch("lock_test");

                Map<String, String> newValues = new HashMap<String, String>();
                Map<String, String> oldValues = this.jedis.hgetAll("lock_test");
                
                String newValueStr="";

                for (FieldConfig key : keys) {
                    if (oldValues.containsKey(key.getKey())) {
                        Double oldValue = Double.valueOf(oldValues.get(key.getKey()).split(",")[key.getIndex()]);
                        Double newValue = Double.valueOf(newLine.split(",")[key.getIndex()]);
                        if (newValue > oldValue) {
                            newValues.put(key.getKey(), newLine);
                            newValueStr=newLine;
                        } else {
                            newValues.put(key.getKey(), oldValues.get(key.getKey()));
                            newValueStr=oldValues.get(key.getKey());
                        }
                    } else {
                        newValues.put(key.getKey(), newLine);
                        newValueStr=newLine;
                    }
                }

                Transaction tx = this.jedis.multi();
                tx.hmset("lock_test", newValues);

                List<Object> exec = tx.exec();

                if (exec == null || exec.isEmpty()) {
                    //System.out.println(Thread.currentThread().getName() + ":" + "Error:(" + oldValueStr + "->"+newValueStr+")");
                } else {
                    String values = "";
                    for (int i = 0; i < exec.size(); i++) {
                        values += exec.get(i).toString();
                    }
                    System.out.println(Thread.currentThread().getName() + ":" + values + ":(" + newLine +"->"+newValueStr+")");
                    break;
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            this.jedis.close();
            this.threadsCountDownLatch.countDown();
        }
    }

    static class FieldConfig {
        private String key;
        private Integer index;

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }

        public Integer getIndex() {
            return index;
        }

        public void setIndex(Integer index) {
            this.index = index;
        }

        public FieldConfig(String key, Integer index) {
            super();
            this.key = key;
            this.index = index;
        }
    }
}

測試:

Thread-23:OK:(2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-29:OK:(2018-07-29 00:00:00,19.2,-155.0,105.0,23.703703703703702->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-33:OK:(2018-07-29 00:00:00,22.0,-299.0,1.0,20.64516129032258->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-24:OK:(2018-07-29 00:00:00,15.7,-20.0,190.0,29.09090909090909->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-36:OK:(2018-07-29 00:00:00,24.099999999999998,-428.0,-98.0,18.823529411764707->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-32:OK:(2018-07-29 00:00:00,21.299999999999997,-260.0,30.0,21.333333333333332->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-39:OK:(2018-07-29 00:00:00,26.2,-575.0,-215.0,17.2972972972973->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-40:OK:(2018-07-29 00:00:00,26.9,-628.0,-258.0,16.842105263157894->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-25:OK:(2018-07-29 00:00:00,16.4,-43.0,177.0,27.82608695652174->2018-07-29 00:00:00,15.0,1.0,201.0,30.476190476190474)
Thread-3:OK:(2018-07-29 00:00:00,1.0,1.0,1.0,640.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-16:OK:(2018-07-29 00:00:00,10.1,92.0,222.0,45.714285714285715->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-21:OK:(2018-07-29 00:00:00,13.6,37.0,217.0,33.68421052631579->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-45:OK:(2018-07-29 00:00:00,30.4,-923.0,-503.0,14.883720930232558->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-28:OK:(2018-07-29 00:00:00,18.5,-124.0,126.0,24.615384615384617->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-46:OK:(2018-07-29 00:00:00,31.099999999999998,-988.0,-558.0,14.545454545454545->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-12:OK:(2018-07-29 00:00:00,7.3,100.0,190.0,64.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-34:OK:(2018-07-29 00:00:00,22.7,-340.0,-30.0,20.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-49:OK:(2018-07-29 00:00:00,33.199999999999996,-1195.0,-735.0,13.617021276595745->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-50:OK:(2018-07-29 00:00:00,33.9,-1268.0,-798.0,13.333333333333334->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-52:OK:(2018-07-29 00:00:00,35.3,-1420.0,-930.0,12.8->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-53:OK:(2018-07-29 00:00:00,36.0,-1499.0,-999.0,12.549019607843137->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-54:OK:(2018-07-29 00:00:00,36.699999999999996,-1580.0,-1070.0,12.307692307692308->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-55:OK:(2018-07-29 00:00:00,37.4,-1663.0,-1143.0,12.075471698113208->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-56:OK:(2018-07-29 00:00:00,38.099999999999994,-1748.0,-1218.0,11.851851851851851->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-57:OK:(2018-07-29 00:00:00,38.8,-1835.0,-1295.0,11.636363636363637->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-58:OK:(2018-07-29 00:00:00,39.5,-1924.0,-1374.0,11.428571428571429->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-59:OK:(2018-07-29 00:00:00,40.199999999999996,-2015.0,-1455.0,11.228070175438596->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-60:OK:(2018-07-29 00:00:00,40.9,-2108.0,-1538.0,11.03448275862069->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-61:OK:(2018-07-29 00:00:00,41.599999999999994,-2203.0,-1623.0,10.847457627118644->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-62:OK:(2018-07-29 00:00:00,42.3,-2300.0,-1710.0,10.666666666666666->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-63:OK:(2018-07-29 00:00:00,43.0,-2399.0,-1799.0,10.491803278688524->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-64:OK:(2018-07-29 00:00:00,43.699999999999996,-2500.0,-1890.0,10.32258064516129->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-65:OK:(2018-07-29 00:00:00,44.4,-2603.0,-1983.0,10.158730158730158->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-11:OK:(2018-07-29 00:00:00,6.6,97.0,177.0,71.11111111111111->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-41:OK:(2018-07-29 00:00:00,27.599999999999998,-683.0,-303.0,16.41025641025641->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-47:OK:(2018-07-29 00:00:00,31.799999999999997,-1055.0,-615.0,14.222222222222221->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-37:OK:(2018-07-29 00:00:00,24.799999999999997,-475.0,-135.0,18.285714285714285->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-48:OK:(2018-07-29 00:00:00,32.5,-1124.0,-674.0,13.91304347826087->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-5:OK:(2018-07-29 00:00:00,2.4,37.0,57.0,213.33333333333334->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-42:OK:(2018-07-29 00:00:00,28.299999999999997,-740.0,-350.0,16.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-8:OK:(2018-07-29 00:00:00,4.5,76.0,126.0,106.66666666666667->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-9:OK:(2018-07-29 00:00:00,5.199999999999999,85.0,145.0,91.42857142857143->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-43:OK:(2018-07-29 00:00:00,29.0,-799.0,-399.0,15.609756097560975->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-7:OK:(2018-07-29 00:00:00,3.8,65.0,105.0,128.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-44:OK:(2018-07-29 00:00:00,29.7,-860.0,-450.0,15.238095238095237->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-18:OK:(2018-07-29 00:00:00,11.5,76.0,226.0,40.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-20:OK:(2018-07-29 00:00:00,12.899999999999999,52.0,222.0,35.55555555555556->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-15:OK:(2018-07-29 00:00:00,9.399999999999999,97.0,217.0,49.23076923076923->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-27:OK:(2018-07-29 00:00:00,17.799999999999997,-95.0,145.0,25.6->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-17:OK:(2018-07-29 00:00:00,10.799999999999999,85.0,225.0,42.666666666666664->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-10:OK:(2018-07-29 00:00:00,5.8999999999999995,92.0,162.0,80.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-66:OK:(2018-07-29 00:00:00,45.099999999999994,-2708.0,-2078.0,10.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-19:OK:(2018-07-29 00:00:00,12.2,65.0,225.0,37.64705882352941->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-26:OK:(2018-07-29 00:00:00,17.099999999999998,-68.0,162.0,26.666666666666668->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-4:OK:(2018-07-29 00:00:00,1.7,20.0,30.0,320.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-35:OK:(2018-07-29 00:00:00,23.4,-383.0,-63.0,19.393939393939394->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-22:OK:(2018-07-29 00:00:00,14.299999999999999,20.0,210.0,32.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-31:OK:(2018-07-29 00:00:00,20.599999999999998,-223.0,57.0,22.06896551724138->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-51:OK:(2018-07-29 00:00:00,34.599999999999994,-1343.0,-863.0,13.061224489795919->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-30:OK:(2018-07-29 00:00:00,19.9,-188.0,82.0,22.857142857142858->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-6:OK:(2018-07-29 00:00:00,3.0999999999999996,52.0,82.0,160.0->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-14:OK:(2018-07-29 00:00:00,8.7,100.0,210.0,53.333333333333336->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-13:OK:(2018-07-29 00:00:00,8.0,101.0,201.0,58.18181818181818->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
Thread-38:OK:(2018-07-29 00:00:00,25.5,-524.0,-174.0,17.77777777777778->2018-07-29 00:00:00,1.0,1.0,1.0,640.0)
complete
{
101=2018-07-29 00:00:00,45.099999999999994,-2708.0,-2078.0,10.0,
102
=2018-07-29 00:00:00,8.0,101.0,201.0,58.18181818181818,
103=2018-07-29 00:00:00,11.5,76.0,226.0,40.0,
104=2018-07-29 00:00:00,1.0,1.0,1.0,640.0

}

參考:《經過Jedis的setnx、multi事務及watch實現三種分佈式跨JVM鎖的方法代碼示例》注意:本章文章中介紹了三種鎖的方案,可是這三種方案並不能解決topn的問題,僅供參考。

相關文章
相關標籤/搜索