安裝模塊:python
pip3 install redisredis
鏈接方式:ide
r = redis.Redis(host='localhost',port=6379)ui
鏈接池:爲了節約資源,減小屢次鏈接帶來的消耗。code
pool=redis.ConnectionPool(host='localhost',port=6379,decode_responses=True)對象
常規操做:事務
import redis r = redis.Redis(host='localhost',port=6379) r.set('foo','bar') print(r.get('foo'))
鏈接池:ip
import redis pool = redis.ConnectionPool(host='localhost',port=6379,decode_responses=True) # 默認設置的值和取得的值都是bytes類型,若是想改成str類型,能夠添加decode_responses=True r1 = redis.Redis(connection_pool=pool) r2 = redis.Redis(connection_pool=pool) r1.set('name','jack') print(r1.get('name')) r2.set('age',18) print(r2.get('age')) print(r1.client_list()) print(r2.client_list())
管道:ci
import redis,time r = redis.Redis(host='localhost',port=6379,decode_responses=True) pipe = r.pipeline(transaction=True) pipe.set('name1','Alin') pipe.set('name2','Lnda') pipe.set('name3','Tony') time.sleep(5) pipe.execute() print(r.mget('name1','name2','name3'))
事務:python可使用管道來代替事務資源
import redis,time import redis.exceptions r = redis.Redis(host='localhost',port=6379,decode_responses=True) pipe = r.pipeline() print(r.get('name1')) try: pipe.multi() pipe.set('age1',22) pipe.set('age2',23) pipe.set('age3',24) time.sleep(5) pipe.execute() print(r.mget('age1','age2','age3')) except redis.exceptions.WatchError as e: print('Error')
訂閱和發佈:
發佈方:
import redis r = redis.Redis(host='localhost',port=6379,decode_responses=True) while True: msg = input('echo>>:') if len(msg) == 0: continue elif msg == 'quit': r.publish('cctv1',msg) break else: r.publish('cctv1',msg)
訂閱方:
import redis r = redis.Redis(host='localhost',port=6379,decode_responses=True) chan = r.pubsub() #返回一個發佈訂閱對象 msg_reciver = chan.subscribe('cctv1') #訂閱 msg = chan.parse_response() # 返回一個確認 print(msg) print('訂閱成功,開始接收...') while True: msg = chan.parse_response() #接收消息 if msg[2] == 'quit': #格式:類型,頻道,消息 break else: print('>>:', msg[2])