【臨實戰】使用 Python 從 Redis 中刪除 4000W 個 KEY

本文主要涉及 Redis 的如下兩個操做和其 Python 實現,目錄:
  • SCAN 命令
  • DEL 命令
  • 使用 Python SCAN
  • 使用 Python DEL
  • 成果展現

SCAN 命令

SCAN 命令及相關的 SSCAN、HSCAN 和 ZSCAN 命令都用於增量迭代(incrementally iterate)一個集合的元素(a collection of elements):
  • SCAN 用於迭代當前數據庫中的數據庫鍵
  • SSCAN 用於迭代集合鍵中的元素
  • HSCAN 用於迭代哈希鍵中的鍵值對
  • ZSCAN 用於迭代有序集合中的元素(包括元素分值和元素分值)
以上四列命令都支持增量迭代,每次執行都會返回少許元素,因此他們均可以用於生產環境,而不會出現像 KEYS、SMEMBERS 命令同樣 -- 可能會阻塞服務器

不過,增量式迭代命令也不是沒有缺點的:
舉個例子,使用 SMEMBERS 命令能夠返回集合鍵當前包含的全部元素,可是對於 SCAN 這類增量迭代命令來講,由於在堆鍵進行增量迭代的過程當中,鍵可能會被改變,因此增量式迭代命令只能對被返回的元素提供有限的保證(offer limited guarantees about the returned elements)。

由於 SCAN、SSCAN、HSCAN 和 ZSCAN 命令的工做方式都很是類似,可是要記住:
  • SSCAN、HSCAN 和 ZSCAN 命令的第一個參數老是一個數據庫鍵;
  • SCAN 命令則不須要在第一個參數提供任何數據庫鍵 -- 由於它迭代的是當前數據庫中的全部數據庫鍵。

SCAN 命令的基本用法
SCAN 命令是一個基於遊標的迭代器(cursor based iterator):
SCAN 命令每次被調用後,都會向用戶返回一個新的遊標,用戶在下次迭代時須要使用這個新遊標做爲 SCAN 命令的遊標參數,以此來延續以前的迭代過程。
當 SCAN 命令的遊標參數被設置爲 0 時,服務器開始一次新的迭代,而當服務器向用戶返回值爲 0 的遊標時,表示迭代結束。
示例:
redis 127.0.0.1:6379> scan 0
1) "17"
2)  1) "key:12"
    2) "key:8"
    3) "key:4"
    4) "key:14"
    5) "key:16"
    6) "key:17"
    7) "key:15"
    8) "key:10"
    9) "key:3"
    10) "key:7"
    11) "key:1"

redis 127.0.0.1:6379> scan 17
1) "0"
2) 1) "key:5"
   2) "key:18"
   3) "key:0"
   4) "key:2"
   5) "key:19"
   6) "key:13"
   7) "key:6"
   8) "key:9"
   9) "key:11"複製代碼
上面的例子中,第一次迭代用 0 做爲遊標,表示開始第一次迭代。
第二次迭代使用第一次迭代時返回的遊標,即:17。
從示例能夠看出,SCAN 命令的返回是一個兩個元素的數組,第一個元素是新遊標,第二個元素也是一個數組,包含有所被包含的元素。
第二次調用 SCAN 命令時,返回遊標 0,這表示迭代已經結束了,整個數據集(collection)已經被完整遍歷過一遍了。
這個過程被稱爲一次完整遍歷(full iteration)。

精簡一下內容,補充三點:
  1. 由於 SCAN 命令僅僅使用遊標來記錄迭代狀態,因此在迭代過程當中,若是這個數據集的元素有增減,若是是減,不保證元素不返回;若是是增,也不保證必定返回;並且在某種狀況下同一個元素還可能被返回屢次。因此對迭代返回的元素所執行的操做最好能夠重複執行屢次(冪等)。
  2. 增量迭代命令不保證每次迭代所返回的元素數量(沒掃到嘛),可是咱們可使用 COUNT 選項對命令的行爲進行必定程度的調整。COUNT 參數的默認值爲 10,在迭代一個足夠大的、由哈希表實現的數據庫、集合鍵、哈希鍵或者有序集合鍵時,若是用戶沒有使用 MATCH 選項,那麼命令返回的數量一般和 COUNT 選項指定的同樣,或者多一些(😓),在迭代編碼爲整數集合(intset:一個由整數值構成的小集合)或編碼爲壓縮列表(ziplist:由不一樣值構成的一個小哈希或者一個小有序集合)時,會無視 COUNT 選項指定的值,在第一次迭代就將數據集的全部元素都返回給用戶。
  3. MATCH 選項,直接看示例吧,以下
示例:
redis 127.0.0.1:6379> sadd myset 1 2 3 foo foobar feelsgood
(integer) 6

redis 127.0.0.1:6379> sscan myset 0 match f*
1) "0"
2) 1) "foo"
   2) "feelsgood"
   3) "foobar"複製代碼
注意:對元素的模式匹配工做是在命令從數據集中取出元素以後,向客戶端返回元素以前進行的,因此有可能返回空
示例:
redis 127.0.0.1:6379> scan 0 MATCH *11*
1) "288"
2) 1) "key:911"

redis 127.0.0.1:6379> scan 288 MATCH *11*
1) "224"
2) (empty list or set)

redis 127.0.0.1:6379> scan 224 MATCH *11*
1) "80"
2) (empty list or set)

redis 127.0.0.1:6379> scan 80 MATCH *11*
1) "176"
2) (empty list or set)

redis 127.0.0.1:6379> scan 176 MATCH *11* COUNT 1000
1) "0"
2)  1) "key:611"
    2) "key:711"
    3) "key:118"
    4) "key:117"
    5) "key:311"
    6) "key:112"
    7) "key:111"
    8) "key:110"
    9) "key:113"
   10) "key:211"
   11) "key:411"
   12) "key:115"
   13) "key:116"
   14) "key:114"
   15) "key:119"
   16) "key:811"
   17) "key:511"
   18) "key:11"複製代碼
注意:最後一次迭代,經過 COUNT 選項指定爲 1000 強制命令爲本次迭代掃描更多元素,從而使返回的元素也變多了。

DEL 命令

這個比較簡單,刪除給定的一個或者多個 key
redis> SET name "redis"
OK
redis> SET type "key-value store"
OK
redis> SET website "redis.com"
OK
redis> DEL name type website
(integer) 3複製代碼

使用 Python SCAN

安裝 redis 包
pip install redis複製代碼
完整代碼示例:
import redis

pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)

cursor_number, keys = r.execute_command('scan', 0, "count", 200000)

while True:
    if cursor_number == 0:
        # 結束一次完整的比遍歷
        break
    cursor_number, keys = r.execute_command('scan', cursor_number, "count", 200000)
    # do something with keys

複製代碼
我將須要刪除的 key 存在一個文件裏,有 2.2G,大概 4000W 個,下一步就是刪除了

使用 Python DEL

由於文件很大,咱們用到一個小技巧,分塊讀取
with open("/data/rediskeys") as kf:
    lines = kf.readlines(1024*1024)複製代碼
調用 delete 方法時,用到一個小技巧就是『*』星號
r.delete(*taskkey_list)複製代碼
咱們看一下定義就清楚了:
delete method
放上完整代碼:
import redis
import time

pool=redis.ConnectionPool(host='redis_hostname', port=6379, max_connections=100)
r = redis.StrictRedis(connection_pool=pool)

start_time = time.time()
SUCCESS_DELETED = 0

with open("/data/rediskeys") as kf:
    while True:
        lines = kf.readlines(1024*1024)
        if not lines:
            break
        else:
            taskkey_list = [i.strip() for i in lines if i.startswith("UCS:TASKKEY")]
            SUCCESS_DELETED += r.delete(*taskkey_list)

        print SUCCESS_DELETED

end_time = time.time()
print end_time - start_time, SUCCESS_DELETED複製代碼

成果展現

結束,下篇再見
相關文章
相關標籤/搜索