Bitmap(即Bitset),是一串連續的2進制數字(0或1),每一位所在的位置爲偏移(offset),bitmap就是經過最小的單位bit來進行0或者1的設置,表示某個元素對應的值或者狀態。html
Redis從2.2.0版本開始新增了setbit
,getbit
,bitcount
等幾個bitmap相關命令。雖然是新命令,可是並無新增新的數據類型,由於setbit
等命令只不過是在set
上的擴展。在bitmap上可執行AND,OR,XOR以及其它位操做。面試
(1)SETBIT key offset valueredis
對 key 所儲存的字符串值,設置或清除指定偏移量上的位(bit)。位的設置或清除取決於 value 參數,能夠是 0 也能夠是 1 。當 key 不存在時,自動生成一個新的字符串值。spring
字符串會進行伸展(grown)以確保它能夠將 value 保存在指定的偏移量上。當字符串值進行伸展時,空白位置以 0 填充。數據庫
offset 參數必須大於或等於 0 ,小於 2^32 (bit 映射被限制在 512 MB 以內)。緩存
對使用大的 offset 的 SETBIT 操做來講,內存分配可能形成 Redis 服務器被阻塞。服務器
redis> SETBIT bit 10086 1
(integer) 0
即將偏移量10086上的位設置爲1.架構
(2)GETBIT key offset併發
對 key 所儲存的字符串值,獲取指定偏移量上的位(bit)。dom
當 offset 比字符串值的長度大,或者 key 不存在時,返回 0 。
redis> SETBIT bit 10086 1 (integer) 0 redis> GETBIT bit 10086 (integer) 1
(3)BITCOUNT key [start] [end]
計算給定字符串中,被設置爲 1 的比特位的數量。
通常狀況下,給定的整個字符串都會被進行計數,經過指定額外的 start 或 end 參數,可讓計數只在特定的位上進行。
start 和 end 參數的設置和 GETRANGE 命令相似,均可以使用負數值:好比 -1 表示最後一個位,而 -2 表示倒數第二個位,以此類推。
不存在的 key 被當成是空字符串來處理,所以對一個不存在的 key 進行 BITCOUNT 操做,結果爲 0 。
redis> SETBIT bits 0 1 # 0001 (integer) 0 redis> BITCOUNT bits (integer) 1
假設這樣一個場景,假如每一個網站有1億的用戶,那麼咱們怎麼來統計這個網站的日登錄數或者說有哪些用戶登陸過這個網站。
最多見的作法就是設計一張用戶登陸表user_login:
user_uid login_date
0 2017-7-1
1 2017-7-1
0 2017-7-2
若是平均一我的一天登陸1次,那麼1億個用戶一個星期就會產生1 * 1 * 7 = 7億條數據,一個月就會產生30億條數據,這對數據庫的壓力是很大的,只是統計一下用戶登陸,不必花費這麼多的資源。
這個時候咱們就能夠用reids 的bitmap來解決。
用戶是否登陸能夠用0/1來表示,0表明用戶不登錄,1表示登陸,那麼1bit 就能夠表示用戶是否登陸。
1億個用戶一天的數據量也就 1 0000 0000bit = 11.92m,也就是說用戶一天的登陸信息也就產生11.92m的數據量。一個月也就357.63m的數據量。
具體實現過程(爲了實驗方便,咱們就假設4個用戶的id分別爲:0,1,2,3,統計兩天的登陸量):
mon: 1010 (用戶0未登陸,用戶1登陸,用戶2未登陸,用戶3登陸)
tue: 1101 (用戶0登陸,用戶1未登陸,用戶2登陸,用戶3登陸)
127.0.0.1:6379> setbit mon 0 0 (integer) 1 127.0.0.1:6379> setbit mon 1 1 (integer) 1 127.0.0.1:6379> setbit mon 2 0 (integer) 0 127.0.0.1:6379> setbit mon 3 1 (integer) 0 127.0.0.1:6379> 127.0.0.1:6379> setbit tue 0 1 (integer) 1 127.0.0.1:6379> setbit tue 1 0 (integer) 1 127.0.0.1:6379> setbit tue 3 1 (integer) 0 127.0.0.1:6379> setbit tue 4 1 (integer) 1 127.0.0.1:6379>
若是要統計這兩天都登錄的用戶,可使用位運算AND:
127.0.0.1:6379> bitop AND result mon tue (integer) 1 127.0.0.1:6379> getbit result 0 (integer) 0 127.0.0.1:6379> getbit result 1 (integer) 0 127.0.0.1:6379> getbit result 2 (integer) 0 127.0.0.1:6379> getbit result 3 (integer) 1 127.0.0.1:6379>
能夠看到mon 和 tue作and運算,獲得結果result 爲 :1000,則表示用戶3連續兩天都登錄,其餘用戶兩天中只有一天登陸。
假設如今咱們但願記錄本身網站上的用戶的上線頻率,好比說,計算用戶 A 上線了多少天,用戶 B 上線了多少天,諸如此類,以此做爲數據,從而決定讓哪些用戶參加 beta 測試等活動 —— 這個模式可使用 SETBIT 和 BITCOUNT 來實現。
好比說,每當用戶在某一天上線的時候,咱們就使用 SETBIT ,以用戶名做爲 key ,將那天所表明的網站的上線日做爲 offset 參數,並將這個 offset 上的爲設置爲 1 。
舉個例子,若是今天是網站上線的第 100 天,而用戶 peter 在今天閱覽過網站,那麼執行命令 SETBIT peter 100 1 ;若是明天 peter 也繼續閱覽網站,那麼執行命令 SETBIT peter 101 1 ,以此類推。
當要計算 peter 總共以來的上線次數時,就使用 BITCOUNT 命令:執行 BITCOUNT peter ,得出的結果就是 peter 上線的總天數。
也能夠實現相似簽到的功能。
優勢佔用內存更小,查詢方便,能夠指定查詢某個用戶,數據可能略有瑕疵,對於非登錄的用戶,可能不一樣的key映射到同一個id,不然須要維護一個非登錄用戶的映射,有額外的開銷。
缺點若是用戶很是的稀疏,那麼佔用的內存可能會很大。
參考:
拼多多面試真題:如何用Redis統計獨立用戶訪問量! https://mp.weixin.qq.com/s?__biz=MzUxOTAxODc2Mg==&mid=2247485077&idx=1&sn=9adbf940d7e821d73bff4247e17dda41&chksm=f98146f0cef6cfe652aba3381b7babcc260e6d4a10dd66b90bae1abd552e7d765486d90dc069&scene=21#wechat_redirect
用redis的bitmap方式統計上億訪問量的周活躍用戶 https://www.jianshu.com/p/62cf39db5c2f
所謂熱key問題就是,熱點 key,指的是在一段時間內,該 key 的訪問量遠遠高於其餘的 redis key, 致使大部分的訪問流量在通過 proxy 分片以後,都集中訪問到某一個 redis 實例上。
忽然有幾十萬的請求去訪問redis上的某個特定key。那麼,這樣會形成流量過於集中,達到物理網卡上限,從而致使這臺redis的服務器宕機。那接下來這個key的請求,就會直接懟到你的數據庫上,致使你的服務不可用。
其實生活中也是有很多這樣的例子。好比XX明星結婚。那麼關於XX明星的Key就會瞬間增大,就會出現熱數據問題。
ps:hot key和big key問題,你們必定要有所瞭解。
方法一:憑藉業務經驗,進行預估哪些是熱key
其實這個方法仍是挺有可行性的。好比某商品在作秒殺,那這個商品的key就能夠判斷出是熱key。缺點很明顯,並不是全部業務都能預估出哪些key是熱key。
方法二:在客戶端進行收集
這個方式就是在操做redis以前,加入一行代碼進行數據統計。那麼這個數據統計的方式有不少種,也能夠是給外部的通信系統發送一個通知信息。缺點就是對客戶端代碼形成入侵。
方法三:在Proxy層作收集
有些集羣架構是下面這樣的,Proxy能夠是Twemproxy,是統一的入口。能夠在Proxy層作收集上報,可是缺點很明顯,並不是全部的redis集羣架構都有proxy。
方法四:用redis自帶命令
(1) monitor命令,該命令能夠實時抓取出redis服務器接收到的命令,而後寫代碼統計出熱key是啥。固然,也有現成的分析工具能夠給你使用,好比redis-faina。可是該命令在高併發的條件下,有內存增暴增的隱患,還會下降redis的性能。
(2) hotkeys參數,redis 4.0.3提供了redis-cli的熱點key發現功能,執行redis-cli時加上–hotkeys選項便可。可是該參數在執行的時候,若是key比較多,執行起來比較慢。
方法五:本身抓包評估
Redis客戶端使用TCP協議與服務端進行交互,通訊協議採用的是RESP。本身寫程序監聽端口,按照RESP協議規則解析數據,進行分析。缺點就是開發成本高,維護困難,有丟包可能性。
目前業內的方案有兩種
(1) 利用二級緩存
好比利用ehcache,或者一個HashMap均可以。在你發現熱key之後,把熱key加載到系統的JVM中。
針對這種熱key請求,會直接從jvm中取,而不會走到redis層。
假設此時有十萬個針對同一個key的請求過來,若是沒有本地緩存,這十萬個請求就直接懟到同一臺redis上了。
如今假設,你的應用層有50臺機器,OK,你也有jvm緩存了。這十萬個請求平均分散開來,每一個機器有2000個請求,會從JVM中取到value值,而後返回數據。避免了十萬個請求懟到同一臺redis上的情形。
(2) 備份熱key
這個方案也很簡單。不要讓key走到同一臺redis上不就好了。咱們把這個key,在多個redis上都存一份不就行了。接下來,有熱key請求進來的時候,咱們就在有備份的redis上隨機選取一臺,進行訪問取值,返回數據。
假設redis的集羣數量爲N,步驟以下圖所示
僞代碼以下:
在項目運行中,自動發現熱key,並使程序自動處理,主要有兩步:
(1)監控熱key (2)通知系統作處理
(1)監控熱key
一、有贊解決方案:對原生jedis包的JedisPool和Jedis類作了改造,在JedisPool初始化過程當中集成TMC「熱點發現」+「本地緩存」功能Hermes-SDK包的初始化邏輯,使Jedis
客戶端與緩存服務端代理層交互時先與Hermes-SDK
交互,從而完成 「熱點探測」+「本地緩存」功能的透明接入。從監控的角度看,該包對於Jedis-Client的每次key值訪問請求,Hermes-SDK 都會經過其通訊模塊將key訪問事件異步上報給Hermes服務端集羣,以便其根據上報數據進行「熱點探測」。
模塊劃分:
Jedis-Client: Java 應用與緩存服務端交互的直接入口,接口定義與原生 Jedis-Client 無異;
Hermes-SDK:自研「熱點發現+本地緩存」功能的SDK封裝, Jedis-Client 經過與它交互來集成相應能力;
Hermes服務端集羣:接收 Hermes-SDK 上報的緩存訪問數據,進行熱點探測,將熱點 key 推送給 Hermes-SDK 作本地緩存;
緩存集羣:由代理層和存儲層組成,爲應用客戶端提供統一的分佈式緩存服務入口;
基礎組件: etcd 集羣、 Apollo 配置中心,爲 TMC 提供「集羣推送」和「統一配置」能力;
基本流程:
1) key 值獲取
1.Java 應用調用 Jedis-Client 接口獲取key的緩存值時,Jedis-Client 會詢問 Hermes-SDK 該 key 當前是不是 熱點key;
2.對於 熱點key ,直接從 Hermes-SDK 的 熱點模塊 獲取熱點 key 在本地緩存的 value 值,不去訪問 緩存集羣 ,從而將訪問請求前置在應用層;
3.對於非 熱點key ,Hermes-SDK 會經過Callable
回調 Jedis-Client 的原生接口,從 緩存集羣 拿到 value 值;
4.對於 Jedis-Client 的每次 key 值訪問請求,Hermes-SDK 都會經過其 通訊模塊 將 key訪問事件 異步上報給 Hermes服務端集羣 ,以便其根據上報數據進行「熱點探測」;
2)key值過時
1.Java 應用調用 Jedis-Client 的set()
del()
expire()
接口時會致使對應 key 值失效,Jedis-Client 會同步調用 Hermes-SDK 的invalid()
方法告知其「 key 值失效」事件;
2.對於 熱點key ,Hermes-SDK 的 熱點模塊 會先將 key 在本地緩存的 value 值失效,以達到本地數據強一致。同時 通訊模塊 會異步將「 key 值失效」事件經過 etcd集羣 推送給 Java 應用集羣中其餘 Hermes-SDK 節點;
3.其餘Hermes-SDK節點的 通訊模塊 收到 「 key 值失效」事件後,會調用 熱點模塊 將 key 在本地緩存的 value 值失效,以達到集羣數據最終一致;
3)熱點發現
1.Hermes服務端集羣 不斷收集 Hermes-SDK上報的 key訪問事件,對不一樣業務應用集羣的緩存訪問數據進行週期性(3s一次)分析計算,以探測業務應用集羣中的熱點key列表;
2.對於探測到的熱點key列表,Hermes服務端集羣 將其經過 etcd集羣 推送給不一樣業務應用集羣的 Hermes-SDK通訊模塊,通知其對熱點key列表進行本地緩存;
4)配置讀取
1.Hermes-SDK 在啓動及運行過程當中,會從 Apollo配置中心 讀取其關心的配置信息(如:啓動關閉配置、黑白名單配置、etcd地址...);
2.Hermes服務端集羣 在啓動及運行過程當中,會從 Apollo配置中心 讀取其關心的配置信息(如:業務應用列表、熱點閾值配置、 etcd 地址...);
二、其餘解決方案:本身抓包評估
先利用flink搭建一套流式計算系統。而後本身寫一個抓包程序抓redis監聽端口的數據,抓到數據後往kafka裏丟。
接下來,流式計算系統消費kafka裏的數據,進行數據統計便可,也能達到監控熱key的目的。
(2)通知系統作處理
一、有贊解決方案:利用二級緩存進行處理。
有贊在監控到熱key後,Hermes服務端集羣會經過各類手段通知各業務系統裏的Hermes-SDK,告訴他們:"老弟,這個key是熱key,記得作本地緩存。"
因而Hermes-SDK就會將該key緩存在本地,對於後面的請求。Hermes-SDK發現這個是一個熱key,直接從本地中拿,而不會去訪問集羣。
二、其餘解決方案:
好比你的流式計算系統監控到熱key了,往zookeeper裏頭的某個節點裏寫。而後你的業務系統監聽該節點,發現節點數據變化了,就表明發現熱key。最後往本地緩存裏寫,也是能夠的。
有贊透明多級緩存解決方案(TMC) https://www.jianshu.com/p/176c8f8b8eb1
談談Redis的熱key問題如何解決 https://mp.weixin.qq.com/s?__biz=MzUxOTAxODc2Mg==&mid=2247485004&idx=1&sn=5b5e3d188959a5055ec69eb6c16fe75f&chksm=f9814629cef6cf3f27fd3824a9f2849a5f6baa008f4b0d585631027e4988ed0215da55844138&scene=21#wechat_redirect
[Redis] 20萬用戶同時訪問一個熱點Key,如何優化緩存架構? https://www.cnblogs.com/aiqiqi/p/10976161.html
在單機的狀況下,若是有多個線程要同時訪問某個共享資源的時候,咱們能夠採用線程間加鎖的機制,即當某個線程獲取到這個資源後,就當即對這個資源進行加鎖,當使用完資源以後,再解鎖,其它線程就能夠接着使用了。例如,在JAVA中,使用synchronize或者Lock等進行加鎖。
可是到了分佈式系統的時代,這種線程之間的鎖機制,就沒做用了,系統可能會有多份而且部署在不一樣的機器上,這些資源已經不是在線程之間共享了,而是屬於進程之間共享的資源。 所以,爲了解決這個問題,咱們就必須引入「分佈式鎖」。 分佈式鎖,是指在分佈式的部署環境下,經過鎖機制來讓多客戶端互斥的對共享資源進行訪問。
爲了確保分佈式鎖可用,咱們至少要確保鎖的實現同時知足如下四個條件:
* 互斥性。在任意時刻,只有一個客戶端能持有鎖。
* 不會發生死鎖。即便有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其餘客戶端能加鎖。
* 具備容錯性。只要大部分的Redis節點正常運行,客戶端就能夠加鎖和解鎖。
* 解鈴還須繫鈴人。加鎖和解鎖必須是同一個客戶端,客戶端本身不能把別人加的鎖給解了。
(1)錯誤方式一
setnx key value; (1) do something ...(2) del key;(3)
以上方式若是在(1)加鎖成功後,可是在(2)拋出異常,則可能致使del指令沒有被調用,這樣就陷入死鎖,鎖永遠不會被釋放。
(2)錯誤方式二
爲了解決(1)中可能出現的問題,能夠在拿到鎖後,給鎖加上一個過時時間,好比:5秒,這樣即便中間出現問題,也會在5秒後自動釋放鎖。
setnx key value; (1) expire key 5;(2) do something ...(3) del key;(4)
這種方式,因爲(1)和(2)不是原子操做,所以也有可能在(1)執行成功後即拋出異常,而(2)沒有執行,因此仍會出現上例中的死鎖問題。
(3)正確加鎖方式
方式一:
在Redis2.8版本,加入了一個原子的指令:
public static boolean tryLock(Jedis jedis, String lockName, String uniqueValue, int expireTime) { /* nxxx,這個參數咱們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,咱們進行set操做;若key已經存在,則不作任何操做; expx,這個參數咱們傳的是PX,意思是咱們要給這個key加一個過時的設置,具體時間由第五個參數決定。 */ String result = jedis.set(lockName, uniqueValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals(result); }
方式二:
public Boolean tryLock(String lockKey, String uniqueValue, long seconds) { return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); String result = jedis.set(lockKey, uniqueValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, seconds); if (LOCK_SUCCESS.equals(result)) { return Boolean.TRUE; } return Boolean.FALSE; }); }
不能使用 spring-boot 提供的 redisTemplate.opsForValue().set() 命令是由於 spring-boot 對 jedis 的封裝中沒有返回 set 命令的返回值, 這就致使上層沒有辦法判斷 set 執行的結果,所以須要經過 execute 方法調用 RedisCallback 去拿到底層的 Jedis 對象,來直接調用 set 命令。
分佈式鎖要知足第四個條件解鈴還須繫鈴人,經過給value賦值爲uniqueValue,咱們就知道這把鎖是哪一個請求加的了,在解鎖的時候就能夠有依據。uniqueValue可使用UUID.randomUUID().toString()方法生成,或者使用當前線程的線程ID。而lockName則須要使用一個相同的常量,保證競爭的是同一個鎖。
因爲解鎖即執行delete操做,將lockName的鍵值刪除,可是若是直接使用Redis的del操做,沒法判斷當前的鎖是否爲當前線程加的鎖,因此可使用lua腳本的方式:
方式一:
/** * 釋放分佈式鎖 * @param jedis Redis客戶端 * @param lockName 鎖 * @param resourcePath 請求標識 * @return 是否釋放成功 */ public static boolean release(Jedis jedis, String lockName, String resourcePath) { //lua表達式,Redis執行是原子操做 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockName), Collections.singletonList(resourcePath)); return RELEASE_SUCCESS.equals(result); }
方式二:
/** * 與 tryLock 相對應,用做釋放鎖 * @param lockKey * @param clientId * @return */ private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; public Boolean releaseLock(String lockKey, String clientId) { return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> { Jedis jedis = (Jedis) redisConnection.getNativeConnection(); Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), Collections.singletonList(clientId)); if (RELEASE_SUCCESS.equals(result)) { return Boolean.TRUE; } return Boolean.FALSE; }); }
Redis的分佈式鎖不能解決超時問題。若是在加鎖和釋放鎖之間的業務邏輯執行得太長,以致於超出了鎖的超時時間,這時候第一個線程持有的鎖過時了,可是臨界區的業務邏輯尚未執行完,而同時第二個線程就提早從新持有了這把鎖,致使出現問題。
解決方案:
(1)是否能夠經過合理地設置LockTime(鎖超時時間)來解決這個問題?
但LockTime的設置本來就很不容易。LockTime設置太小,鎖自動超時的機率就會增長,鎖異常失效的機率也就會增長。
而LockTime設置過大,萬一服務出現異常沒法正常釋放鎖,那麼出現這種異常鎖的時間也就越長。咱們只能經過經驗去配置,一個能夠接受的值,基本上是這個服務歷史上的平均耗時再增長必定的buff。
(2)既然(1)的方法走不通,那麼能夠採用以下方法
咱們能夠先給鎖設置一個LockTime,而後啓動一個守護線程,讓守護線程在一段時間後,從新去設置這個鎖的LockTime。
實際操做中,咱們要注意如下幾點:
一、和釋放鎖的狀況一致,咱們須要先判斷鎖的對象是否沒有變。不然會形成不管誰持有鎖,守護線程都會去從新設置鎖的LockTime。不該該續的不能瞎續。
二、守護線程要在合理的時間再去從新設置鎖的LockTime,不然會形成資源的浪費。不能動不動就去續。
三、若是持有鎖的線程已經處理完業務了,那麼守護線程也應該被銷燬。不能主人都掛了,守護者還在那裏繼續浪費資源。
代碼實現:
public class SurvivalClamProcessor implements Runnable { private static final int REDIS_EXPIRE_SUCCESS = 1; SurvivalClamProcessor(String field, String key, String value, int lockTime) { this.field = field; this.key = key; this.value = value; this.lockTime = lockTime; this.signal = Boolean.TRUE; } private String field; private String key; private String value; private int lockTime; //線程關閉的標記 private volatile Boolean signal; void stop() { this.signal = Boolean.FALSE; } @Override public void run() { int waitTime = lockTime * 1000 * 2 / 3; while (signal) { try { Thread.sleep(waitTime); if (cacheUtils.expandLockTime(field, key, value, lockTime) == REDIS_EXPIRE_SUCCESS) { if (logger.isInfoEnabled()) { logger.info("expandLockTime 成功,本次等待{}ms,將重置鎖超時時間重置爲{}s,其中field爲{},key爲{}", waitTime, lockTime, field, key); } } else { if (logger.isInfoEnabled()) { logger.info("expandLockTime 失敗,將致使SurvivalClamConsumer中斷"); } this.stop(); } } catch (InterruptedException e) { if (logger.isInfoEnabled()) { logger.info("SurvivalClamProcessor 處理線程被強制中斷"); } } catch (Exception e) { logger.error("SurvivalClamProcessor run error", e); } } if (logger.isInfoEnabled()) { logger.info("SurvivalClamProcessor 處理線程已中止"); } } }
在以上代碼中,咱們將waitTime設置爲Math.max(1, lockTime * 2 / 3),即守護線程許須要等待waitTime後才能夠去從新設置鎖的超時時間,避免了資源的浪費。
同時在expandLockTime時候也去判斷了當前持有鎖的對象是否一致,避免了胡亂重置鎖超時時間的狀況。
其中expandLockTime是經過Lua腳本實現的。延長鎖超時的腳本語句和釋放鎖的Lua腳本相似。
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return '0' end";
而後咱們在得到鎖的代碼以後,添加以下代碼:
SurvivalClamProcessor survivalClamProcessor = new SurvivalClamProcessor(lockField, lockKey, randomValue, lockTime);//建立守護線程 Thread survivalThread = new Thread(survivalClamProcessor); survivalThread.setDaemon(Boolean.TRUE);//後臺線程 survivalThread.start(); Object returnObject = joinPoint.proceed(args);//執行業務代碼 survivalClamProcessor.stop();//業務代碼執行完成後中止守護線程 survivalThread.interrupt();//中斷線程 return returnObject;
這段代碼會先初始化守護線程的內部參數,而後經過start函數啓動線程,最後在業務執行完以後,設置守護線程的關閉標記,最後經過interrupt()去中斷sleep狀態,保證線程及時銷燬。
參考:https://juejin.im/post/5c457f5a6fb9a049d37f6b55