redis keys導入導出

 

#!/usr/bin/python2
import sys
import os
import redis
import time
import datetime
import threading

string_keys=[]
hash_keys=[]
list_keys=[]
set_keys=[]
zset_keys=[]
ttl_dict={}
hash_mutex = threading.Lock()
list_mutex = threading.Lock()
set_mutex = threading.Lock()
zset_mutex = threading.Lock()
string_mutex = threading.Lock()
index=0

def import_string(source, dest):
    keys_count = len(string_keys)
    pipeSrc = source.pipeline(transaction=False)
    pipeDst = dest.pipeline(transaction=False)
    global index
    global string_mutex
    pipe_size=1000

    while 1 :
        string_mutex.acquire()
        if index >= keys_count :
            string_mutex.release()
            break
        old_index = index
        if index + pipe_size < keys_count :
            index += pipe_size
        else : 
            index = keys_count
        max_index = index
        string_mutex.release()

        cur_index = old_index
        while (cur_index < max_index):
            pipeSrc.get(string_keys[cur_index])
            cur_index +=1
        results=pipeSrc.execute()
        for value in results:
            pipeDst.set(string_keys[old_index], value)
            if string_keys[old_index] in ttl_dict.keys() :
                pipeDst.expire(string_keys[old_index],ttl_dict[string_keys[old_index]]);
            old_index +=1
        pipeDst.execute()

def import_hash(source, dest):
    keys_count = len(hash_keys)
    pipeSrc = source.pipeline(transaction=False)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        hash_mutex.acquire()
        if len(hash_keys) == 0 :
            hash_mutex.release()
            break;
        key=hash_keys.pop(0)
        hash_mutex.release()
        hkeys=source.hkeys(key)
        keys_count = len(hkeys)
        index=0
        pipe_size=1000
        while index < keys_count:
            old_index=index
            num=0
            while (index < keys_count) and (num < pipe_size):
                pipeSrc.hget(key, hkeys[index])
                index +=1
                num +=1
            results=pipeSrc.execute()
            for value in results:
                pipeDst.hset(key, hkeys[old_index], value)
                old_index +=1
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def import_set(source, dest):
    keys_count = len(set_keys)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        set_mutex.acquire()
        if len(set_keys) == 0 :
            set_mutex.release()
            break;
        key=set_keys.pop(0)
        set_mutex.release()
        sValues=source.smembers(key)
        value_count = len(sValues)
        index=0
        pipe_size=1000
        while index < value_count:
            old_index=index
            num=0
            while (index < value_count) and (num < pipe_size):
                pipeDst.sadd(key, sValues.pop())
                index +=1
                num +=1
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def import_zset(source, dest):
    keys_count = len(zset_keys)
    pipeSrc = source.pipeline(transaction=False)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        zset_mutex.acquire()
        if len(zset_keys) == 0 :
            zset_mutex.release()
            break;
        key=zset_keys.pop(0)
        zset_mutex.release()
        zset_size = source.zcard(key)
        index=0
        pipe_size=1000
        while index < zset_size:
            members = source.zrange(key, index, index+pipe_size)
            index += len(members)
            for member in members:
                pipeSrc.zscore(key, member)
            scores = pipeSrc.execute()
            i=0
            for member in members:
                pipeDst.zadd(key, member, scores[i])
                i+=1
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def import_list(source, dest):
    keys_count = len(list_keys)
    pipeDst = dest.pipeline(transaction=False)
    while 1:
        list_mutex.acquire()
        if len(list_keys) == 0 :
            list_mutex.release()
            break;
        key=list_keys.pop(0)
        list_mutex.release()
        list_size = source.llen(key)
        index=0
        pipe_size=1000
        while index < list_size:
            results = source.lrange(key, index, index+pipe_size)
            index += len(results)
            for value in results:
                pipeDst.rpush(key, value)
            pipeDst.execute()
        if key in ttl_dict.keys() :
            dest.expire(key,ttl_dict[key]);

def read_type_keys(source):
    keys=source.keys()
    keys_count = len(keys)
    pipe = source.pipeline(transaction=False)
    #for key in keys:
    index=0
    pipe_size=5000
    while index < keys_count:
        old_index=index
        num=0
        while (index < keys_count) and (num < pipe_size):
            pipe.type(keys[index])
            index +=1
            num +=1
        results=pipe.execute()
        for type in results:
            if type == "string":
                string_keys.append(keys[old_index])
            elif type == "list":
                list_keys.append(keys[old_index])
            elif type == "hash":
                hash_keys.append(keys[old_index])
            elif type == "set":
                set_keys.append(keys[old_index])
            elif type == "zset":
                zset_keys.append(keys[old_index])
            else :
                print keys[old_index]," is not find when TYPE"
            old_index +=1

    index=0
    while index < keys_count:
        old_index=index
        num=0
        while (index < keys_count) and (num < pipe_size):
            pipe.ttl(keys[index])
            index +=1
            num +=1
        results=pipe.execute()
        for ttl_val in results:
            if ttl_val != None :
                ttl_dict[keys[old_index]]=ttl_val
            old_index +=1

def import_data(source):
    dest=redis.Redis(host=DstIP,port=DstPort)
    import_string(source, dest)
    import_hash(source, dest)
    import_list(source, dest)
    import_set(source, dest)
    import_zset(source, dest)

if __name__=='__main__':
    argc = len(sys.argv)
    if argc != 6:
        print "usage: %s sourceIP sourcePort destIP destPort connectionNum" % (sys.argv[0])
        exit(1)
    SrcIP = sys.argv[1]
    SrcPort = int(sys.argv[2])
    DstIP = sys.argv[3]
    DstPort = int(sys.argv[4])
    ConnNum = int(sys.argv[5])

    source=redis.Redis(host=SrcIP,port=SrcPort)

    print "Begin Read Keys"
    read_type_keys(source)
    print "String Key Count is:",len(string_keys)
    print "Set Key Count is:",len(set_keys)
    print "ZSet Key Count is:",len(zset_keys)
    print "List Key Count is:",len(list_keys)
    print "Hash Key Count is:",len(hash_keys)

    start=datetime.datetime.now()
    threads = []
    for i in xrange(ConnNum):
        t = threading.Thread(target=import_data,args=(source,))
        threads.append(t)

    for t in threads:
        t.setDaemon(True)
        t.start()
    for t in threads:
        t.join()
    stop=datetime.datetime.now()
    diff=stop-start
    print "Finish, token time:",str(diff)

 

使用方法以下
./import_data.py sourceIP sourcePort destIP destPort connectionNum
sourceIP:導出庫ip(數據源)
sourcePort:導出庫port(數據源)
destIP:導入庫ip
destPort:導入庫Port
connectionNum: 數據導入時使用的鏈接數python

相關文章
相關標籤/搜索