#!/usr/bin/python2 import sys import os import redis import time import datetime import threading string_keys=[] hash_keys=[] list_keys=[] set_keys=[] zset_keys=[] ttl_dict={} hash_mutex = threading.Lock() list_mutex = threading.Lock() set_mutex = threading.Lock() zset_mutex = threading.Lock() string_mutex = threading.Lock() index=0 def import_string(source, dest): keys_count = len(string_keys) pipeSrc = source.pipeline(transaction=False) pipeDst = dest.pipeline(transaction=False) global index global string_mutex pipe_size=1000 while 1 : string_mutex.acquire() if index >= keys_count : string_mutex.release() break old_index = index if index + pipe_size < keys_count : index += pipe_size else : index = keys_count max_index = index string_mutex.release() cur_index = old_index while (cur_index < max_index): pipeSrc.get(string_keys[cur_index]) cur_index +=1 results=pipeSrc.execute() for value in results: pipeDst.set(string_keys[old_index], value) if string_keys[old_index] in ttl_dict.keys() : pipeDst.expire(string_keys[old_index],ttl_dict[string_keys[old_index]]); old_index +=1 pipeDst.execute() def import_hash(source, dest): keys_count = len(hash_keys) pipeSrc = source.pipeline(transaction=False) pipeDst = dest.pipeline(transaction=False) while 1: hash_mutex.acquire() if len(hash_keys) == 0 : hash_mutex.release() break; key=hash_keys.pop(0) hash_mutex.release() hkeys=source.hkeys(key) keys_count = len(hkeys) index=0 pipe_size=1000 while index < keys_count: old_index=index num=0 while (index < keys_count) and (num < pipe_size): pipeSrc.hget(key, hkeys[index]) index +=1 num +=1 results=pipeSrc.execute() for value in results: pipeDst.hset(key, hkeys[old_index], value) old_index +=1 pipeDst.execute() if key in ttl_dict.keys() : dest.expire(key,ttl_dict[key]); def import_set(source, dest): keys_count = len(set_keys) pipeDst = dest.pipeline(transaction=False) while 1: set_mutex.acquire() if len(set_keys) == 0 : set_mutex.release() break; key=set_keys.pop(0) set_mutex.release() sValues=source.smembers(key) value_count = len(sValues) index=0 pipe_size=1000 while index < value_count: old_index=index num=0 while (index < value_count) and (num < pipe_size): pipeDst.sadd(key, sValues.pop()) index +=1 num +=1 pipeDst.execute() if key in ttl_dict.keys() : dest.expire(key,ttl_dict[key]); def import_zset(source, dest): keys_count = len(zset_keys) pipeSrc = source.pipeline(transaction=False) pipeDst = dest.pipeline(transaction=False) while 1: zset_mutex.acquire() if len(zset_keys) == 0 : zset_mutex.release() break; key=zset_keys.pop(0) zset_mutex.release() zset_size = source.zcard(key) index=0 pipe_size=1000 while index < zset_size: members = source.zrange(key, index, index+pipe_size) index += len(members) for member in members: pipeSrc.zscore(key, member) scores = pipeSrc.execute() i=0 for member in members: pipeDst.zadd(key, member, scores[i]) i+=1 pipeDst.execute() if key in ttl_dict.keys() : dest.expire(key,ttl_dict[key]); def import_list(source, dest): keys_count = len(list_keys) pipeDst = dest.pipeline(transaction=False) while 1: list_mutex.acquire() if len(list_keys) == 0 : list_mutex.release() break; key=list_keys.pop(0) list_mutex.release() list_size = source.llen(key) index=0 pipe_size=1000 while index < list_size: results = source.lrange(key, index, index+pipe_size) index += len(results) for value in results: pipeDst.rpush(key, value) pipeDst.execute() if key in ttl_dict.keys() : dest.expire(key,ttl_dict[key]); def read_type_keys(source): keys=source.keys() keys_count = len(keys) pipe = source.pipeline(transaction=False) #for key in keys: index=0 pipe_size=5000 while index < keys_count: old_index=index num=0 while (index < keys_count) and (num < pipe_size): pipe.type(keys[index]) index +=1 num +=1 results=pipe.execute() for type in results: if type == "string": string_keys.append(keys[old_index]) elif type == "list": list_keys.append(keys[old_index]) elif type == "hash": hash_keys.append(keys[old_index]) elif type == "set": set_keys.append(keys[old_index]) elif type == "zset": zset_keys.append(keys[old_index]) else : print keys[old_index]," is not find when TYPE" old_index +=1 index=0 while index < keys_count: old_index=index num=0 while (index < keys_count) and (num < pipe_size): pipe.ttl(keys[index]) index +=1 num +=1 results=pipe.execute() for ttl_val in results: if ttl_val != None : ttl_dict[keys[old_index]]=ttl_val old_index +=1 def import_data(source): dest=redis.Redis(host=DstIP,port=DstPort) import_string(source, dest) import_hash(source, dest) import_list(source, dest) import_set(source, dest) import_zset(source, dest) if __name__=='__main__': argc = len(sys.argv) if argc != 6: print "usage: %s sourceIP sourcePort destIP destPort connectionNum" % (sys.argv[0]) exit(1) SrcIP = sys.argv[1] SrcPort = int(sys.argv[2]) DstIP = sys.argv[3] DstPort = int(sys.argv[4]) ConnNum = int(sys.argv[5]) source=redis.Redis(host=SrcIP,port=SrcPort) print "Begin Read Keys" read_type_keys(source) print "String Key Count is:",len(string_keys) print "Set Key Count is:",len(set_keys) print "ZSet Key Count is:",len(zset_keys) print "List Key Count is:",len(list_keys) print "Hash Key Count is:",len(hash_keys) start=datetime.datetime.now() threads = [] for i in xrange(ConnNum): t = threading.Thread(target=import_data,args=(source,)) threads.append(t) for t in threads: t.setDaemon(True) t.start() for t in threads: t.join() stop=datetime.datetime.now() diff=stop-start print "Finish, token time:",str(diff)
使用方法以下
./import_data.py sourceIP sourcePort destIP destPort connectionNum
sourceIP:導出庫ip(數據源)
sourcePort:導出庫port(數據源)
destIP:導入庫ip
destPort:導入庫Port
connectionNum: 數據導入時使用的鏈接數python