1、RabbitMQ概述html
RabbitMQ是一種消息隊列,是一個公共的消息中間件,用於不一樣進程之間的通信。python
除了RabbitMQ之外,還有ZeroMQ、ActiveMQ等等。正則表達式
前面學習了兩種隊列:redis
RabbitMQ是用erlang語言開發的,依賴於erlang。在Windows上安裝RabbitMQ須要安裝erlang環境。數據庫
RabbitMQ支持多種語言接口,Python主要使用pika來對其進行操做,pika是一個純Python的AMQP客戶端。windows
安裝RabbitMQ步驟:瀏覽器
2、RabbitMQ架構緩存
RabbitMQ因爲須要同時爲多個應用服務,其中維護的隊列不止一個。服務器
上圖是一個典型的生產者消費者模型,中間是RabbitMQ消息隊列,他由Exchange和Queue組成。消息隊列服務實體又叫broker(即中間方塊部分)。微信
圖中,紅色部分就是多個隊列。綠色部分是Exchange。
RabbitMQ默認端口:
Client端通信口:5672
後臺管理口:15672 http://localhost:15672
server間內部通信口:25672
新安裝好的RabbitMQ沒法登陸後臺管理界面:
3、最簡單的使用
生產者:
import pika # 創建一個socket連接 conn = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) # 聲明一個管道 channel = conn.channel() # 定義一個queue channel.queue_declare(queue='hello') # 經過管道發送數據到隊列hello中 channel.basic_publish(exchange='', routing_key='hello', # queue的名字 body='Hello World!!') # 內容 print('[X] Sent "Hello World!!"') conn.close() # 關閉連接
消費者:
import pika conn = pika.BlockingConnection( # 若是消費者在其餘機器上,這裏的localhost須要修改成server的IP地址 pika.ConnectionParameters('localhost') ) channel = conn.channel() # 爲何還要在這裏聲明queue,由於咱們不肯定在這以前是否已經有produce聲明瞭queue # 若是能肯定已經存在queue,那就不用聲明瞭 channel.queue_declare(queue='hello') # 參數ch是管道對象內存地址,method是消息要發給誰的信息 def callback(ch, method, properties, body): print('[x] Received %r' % body) # 從hello隊列中中獲取消息 channel.basic_consume( callback, # 若是收到消息就調用這個函數來處理消息 queue='hello', # 指定隊列名稱 no_ack=True # 是否向服務器確認消息處理完畢,見後述詳解no_ack ) print('[*] Waiting for messages.To exit press CTRL+C') # 開始接收,一直接收,沒有就卡住 channel.start_consuming()
在這種模式下,若是一個生產者對應多個消費者(即啓動多個消費者),此時隊列中的消息是以輪訓的方式發送給每一個消費者的(相似負載均衡)。
no_ack:
消費者受到消息後,須要必定的時間來處理消息,使用no_ack=True表示無論處理完仍是沒有處理完,都不會給服務端發確認。這種狀況下,服務端不會管消費者是否處理完消息就直接將消息從隊列中刪除了。
若是要讓消費者在處理完後給服務端發送確認消息後,再從隊列中刪除(若是未處理完就斷掉了,那麼就將消息給另一個活動的消費者處理),那麼就不要使用no_ack=True這個參數,而且在callback方法中添加下面代碼:
ch.basic_ack(delivery_tag=method.delivery_tag)
以下代碼:
# 參數ch是管道對象內存地址,method是消息要發給誰的信息 def callback(ch, method, properties, body): print('[x] Received %r' % body) # 處理完畢後迴應服務端 ch.basic_ack(delivery_tag=method.delivery_tag) # 從hello隊列中中獲取消息 channel.basic_consume( callback, # 若是收到消息就調用這個函數來處理消息 queue='hello1', # 指定隊列名稱 )
4、持久化
在RabbitMQ服務重啓後,隊列和裏面的消息都會丟失。
如何持久化隊列:
# 定義一個queue,並持久化 channel.queue_declare(queue='hello', durable=True)
使用durable=Ture來持久化queue(客戶端和服務器端都得寫)。可是queue中的數據仍是會丟失!
如何持久化隊列中的消息:
# 經過管道發送數據到隊列hello中,並持久化 channel.basic_publish(exchange='', routing_key='hello1', # queue的名字 body='Hello World!!', # 內容 # 持久化隊列中的消息 properties=pika.BasicProperties(delivery_mode=2) )
持久化隊列中消息,如上述代碼中,添加properties=pika.BasicProperties(delivery_mode=2)。
注意:消息持久化的delivery_mode必須搭配隊列持久化durable使用,不然隊列都不持久化,裏面的消息會丟失。
Windows命令行查看queue:
5、Exchange轉發器
Exchange在定義時要指定類型,以決定哪些queue符合條件,能夠接受消息。能夠存在多個exchange。
四種類型:
fanout:全部bind到此exchange的queue均可以接受到消息。
direct:經過routingKey和exchange決定的那個惟一的queue能夠接受消息。(徹底匹配)
topic:全部符合routingKey(能夠是一個表達式)所bind的queue能夠接受消息。(模糊匹配)
header:經過headers來決定把消息發給哪些queue。
Fanout:廣播
生產者:
import pika conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = conn.channel() # 只需定義exchange,無需定義queue了 channel.exchange_declare( exchange='logs1', # 定義exchange名稱 exchange_type='fanout', # exchange類型,這裏爲廣播 durable=True # 持久化exchange ) message = "Hello World!!" channel.basic_publish( exchange='logs', # exchange的名稱 routing_key='', # 之前這裏寫的是queue的名字,如今指定爲空 body=message # 發送的消息 ) print('[X] Sent %r' % message) conn.close()
消費者:
import pika conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = conn.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True # 和生產者一致,須要持久化 ) res = channel.queue_declare(exclusive=True) # 自動生成一個隊列名稱(惟一的名稱),並在此queue的消費者斷開後,自動銷燬queue queue_name = res.method.queue # 獲取自動生成的隊列名例如amq.gen-55TO5iqNph0I5MmG5GlKuA # 將定義好的queue綁定到exchange上 channel.queue_bind(exchange='logs', queue=queue_name ) print('[X] Waiting for logs. To exit press CTRL+C') # 回調函數 def callback(ch, methos, properties, body): print('[X] %r' % body) # 接收消息 channel.basic_consume(callback, queue=queue_name, # 指定隊列名 no_ack=True # 無響應,也可使用主動通知的形式 ) channel.start_consuming() # 開始監聽消息
注意:channel.queue_declare(exclusive=True)中的exclusive=True會自動生成一個惟一的隊列名,並在該隊列的消費者斷開後,自動銷燬該queue。這裏爲何要用這種形式,是由於咱們這裏的隊列只用於接收這個fanout廣播消息,接收完不須要這個queue了就能夠刪除了,若是還須要接收其餘exchange的消息,那可能就須要本身去指定名稱了。
注意注意:fanout模式,在exchange接收到生產者的消息後,只會發給當前已綁定的queues,分發給全部的queue後,消息就刪除了。因此在生產者發送消息後再綁定到exchange的消費者隊列是沒法接受到這條消息的。
direct:
生產者:
import pika conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = conn.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct' # 轉發類型爲direct(徹底匹配) ) severity = 'error' # 發送級別爲error,用於(徹底)匹配 message = 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message ) print('[X] Sent %r:%r' % (severity, message)) conn.close()
消費者:
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = conn.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct' ) res = channel.queue_declare(exclusive=True) # 隨機生成一個queue_name(自動銷燬) queue_name = res.method.queue # 獲取隨機名 severities = ['info', 'warning', 'error'] # 接收三個級別的匹配字符 # 利用for循環,爲同一個queue綁定3個級別的消息(即3個匹配字符) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity ) print('[*] Waiting for logs.To exit press CTRL+C') def callback(ch, method, properties, body): print('[X] %r:%r' % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True ) channel.start_consuming()
direct模式,就是使用routing_key來區分將消息發給哪一個隊列,須要接收的隊列必須使用routing_key來綁定到exchange上。發消息必須制定routing_key,從而exchange才能根據這個值來進行轉發。direct是徹底匹配的模式,一個字符都不能差。
topic:
topic與direct的使用方法很類似,在生產者中沒什麼不一樣,而在消費者中,topic是將一個匹配的表達式(相似正則表達式)和queue綁定到一個exchange中,這樣,只要生產者產生的消息可以匹配這個表達式,exchange就會把這個消息發送給匹配上的queue。
生產者:
import pika conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = conn.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic' # 轉發類型爲direct(徹底匹配) ) severity = 'Leo.warning' # 發送一個應用+級別的消息 message = 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=severity, body=message ) print('[X] Sent %r:%r' % (severity, message)) conn.close()
消費者:
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = conn.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic' ) res = channel.queue_declare(exclusive=True) # 隨機生成一個queue_name(自動銷燬) queue_name = res.method.queue # 獲取隨機名 severity = '*.warning' # 匹配一個級別的任何應用發來的消息,是一個相似於正則的表達式 channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity ) print('[*] Waiting for logs.To exit press CTRL+C') def callback(ch, method, properties, body): print('[X] %r:%r' % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True ) channel.start_consuming()
topic的匹配表達式:
"#":匹配0-N個單詞(單詞由字母和數字組成)。
"*":匹配0-1個單詞。
中間只能用「.」來間隔。例如 「 *.info.* 」, " #.warning.* "
注意:在消費者的代碼中queue_bind函數的參數中routing_key中"*"和"."和"#"都表明着匹配字符。而在生產者的basic_publish函數的參數中routing_key中字符串若是出現"*"、"."和"#",都表明一個普通字符。
6、RabbitMQ實現RPC(簡單版,無exchange部分)
RPC:remote procedure call 遠程過程調用,即客戶端發送一條命令,服務端執行方法調用,並返回結果。
RabbitMQ中實現RPC思路:
生產者--->rpc_Queue--->消費者
消費者<---res_Queue<---生產者
經過兩個隊列,來互相傳遞命令和執行結果。
客戶端代碼:(發送調用命令)
import pika import time import uuid class FibonacciRpcClient(object): def __init__(self): # 連接RabbitMQ self.connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) self.channel = self.connection.channel() # 隨機生成一個隊列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 獲取隊列名稱 # 定義一個消費者,回調函數是on_response(),消息隊列名稱是callback_queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 消費者回調函數 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body # 發送消息(命令),並接受響應 def call(self, n): # 定義self.response爲空 self.response = None # 生成一個uuid,對應一個命令 self.corr_id = str(uuid.uuid4()) # 發送消息,隊列名爲rpc_queue,將響應隊列和uuid一併發送給服務端,隊列名爲 # callback_queue,uuid爲corr_id,body爲消息體 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id ), body=str(n) ) # 消息發送完畢後,循環調用非阻塞模式的start_consuming() # 即connection.process_data_events() # 當沒收到相應時,返回並繼續循環。 # 當有響應時會調用on_response,在on_response中將self.response設置爲body,跳出循環 while self.response is None: self.connection.process_data_events() # 每隔一秒檢查一下是否有響應到達 time.sleep(1) # 返回響應結果 return int(self.response) fibonacci_rpc = FibonacciRpcClient() response = fibonacci_rpc.call(3) print('[.] Got %r' % response)
服務端代碼:(處理命令,返回調用結果)
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fibonacci(n): if n == 0: return 0 elif n == 1: return 1 else: return fibonacci(n-1) + fibonacci(n-2) # 定義一個回調函數,用來處理命令 def on_request(ch, method, props, body): n = int(body) print('[.] fib(%s)' % n) # 執行調用的函數,或去響應內容 response = fibonacci(n) # 將響應內容發送回去,隊列爲客戶端發過來的props.reply_to ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response) ) # 通知客戶端,這個命令已經執行完畢 ch.basic_ack(delivery_tag=method.delivery_tag) # 從rpc_queue中接收消息 channel.basic_consume(on_request, queue='rpc_queue') print('[x] Awaiting RPC requests') # 開始接收消息 channel.start_consuming()
上述代碼主要完成了RPC調用的基本功能,其中函數一個客戶端(發起RPC調用)和一個服務端(處理RPC調用)。
在客戶端代碼中,關鍵點在於在發送消息的同時,將響應返回須要使用的隊列也一併定義並將名稱發送給了服務端(還包含區分消息和相應批次的uuid)。在發送完消息後,進入循環獲取響應的階段。
在服務端代碼中,關鍵點在於接收到消息後,在回調函數中調用處理函數fibonacci(),並將運行結果發送到客戶端告知的響應隊列中。
7、Redis概述
Redis是一個緩存系統,即第三方實現數據共享,例如QQ和微信要共享一個數據name。QQ和微信都分別使用socket連接到redis,而後實現訪問該name數據。
Redis是單線程的,底層應該是使用epoll實現異步處理(不肯定)。
Redis安裝:
在Linux下采用yum等工具安裝,或者源碼安裝。
1.安裝redis前須要安裝epel源,yum install epel-release -y
2.安裝redis,yum install redis -y
3.啓動redis,systemctl start redis.service
4.查看redis是否啓動,systemctl status redis.service 或者 ps -ef | grep redis
Redis命令行使用:
使用redis-cli運行redis命令行:
存數據:
讀數據:
在Python中調用redis:
首先須要在python中安裝redis庫,可使用pycharm安裝。或者使用pip install redis,easy_install redis等方法安裝。
redis默認使用端口號:6379。
import redis # 連接遠程redis服務 r = redis.Redis(host='192.168.1.66', port=6379) r.set('age', '25') # 存入數據 print(r.get('age')) # 讀出數據
使用上述代碼進行redis連接和操做,若是報錯說redis拒絕連接,解決方案以下:
1.修改redis配置文件/etc/redis.conf,將其中的bind 127.0.0.1註釋掉。
2.將protected-mode yes修改成protected-mode no。
3.再次連接,便可成功。
Redis鏈接池:
import redis pool = redis.ConnectionPool(host='192.168.1.66', port=6379) r = redis.Redis(connection_pool=pool) r.set('sex', 'male') print(r.get('sex'))
redis-py使用connection pool來管理一個redis server的全部連接,避免每次創建、釋放連接的資源開銷。默認,每一個redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個連接池。
使用Redis幫助:
在redis-cli中使用命令 help的形式能夠查看命令幫助。
8、Redis的String操做
主要有如下一些經常使用方法:
set(name, value, ex=None, px=None, nx=False, xx=False)
在Redis的set函數中,默認不存在則建立,存在則修改。
參數:
ex,過時時間(秒)
px,過時時間(毫秒)
nx,若是設置爲True,則只有name不存在時,才執行set操做
xx,若是設置爲True,則只有name存在時,才執行set操做
setnx(key, value) = set()+nx 通常不使用,直接用set
setex(key, value, time) = set()+ex 通常不使用,直接用set
psetex(key, value, time) = set()+px 通常不使用,直接用set
mset(k1='v1', k2='v2') 批量設置鍵值 或者 mset({'k1':'v1', 'k2':'v2' })使用字段傳入鍵值對。
mget(k1, k2) 批量獲取值 或者 mget([k1, k2]) 使用列表傳入keys
getset(key, value) 獲取舊值,設置新值
getrange(key, start, end) 獲取某個值的其中幾個字符,例如值爲‘jack’,getrange(key, 0, 2)返回'jac'。
setrange(key, offset, value) 將值從某個位置開始覆蓋字符爲value,例如值爲‘jack’,setrange(key, 1, 'AL'),返回'jALk'。
setbit(key, offset, value) 將某個值的某個二進制位進行修改。例如n1對應的值爲foo,foo對應的二進制爲01100110 01101111 01101111第7位設置爲1,setbit('n1', 7, 1),返回的值變爲01100111 01101111 01101111,foo變爲goo。
setbit一個使用的場景:例如,新浪微博有10億用戶,咱們想統計有哪些用戶在線,使用數據庫修改狀態的方式效率過低。咱們可使用每一個用戶對應一個編號index,編號對應的二進制位0表明不在線,1表明在線。用戶上線就使用setbit(key, index, 1)設置key對應值中的index位爲1,下線就setbit(key,index,0)。而後使用bitcount(key)就能夠統計值裏有多少位爲1,就能夠知道有多少用戶在線。 例如,設置第1億個用戶爲在線,setbit(user,100000000,1)。
getbit(key, offset) 獲取某個位置的二進制值,例如1爲用戶在線,0爲不在線。
strlen(key) 返回key對應value的字節長度(一個漢字三個字節)。
incr(key, amount=1) 自增,運行一次,key對應的value+1,能夠用來處理用戶上線統計(incr login_user)
decr(key, amount=1) 自減,運行一次,key對應的value-1,能夠用來處理用戶下線統計(decr login_user),注意會減爲負數。
incrbyfloat(key, amount=1.2) 浮點數自加,相似incr。
decrbyfloat(key, amount=1.2) 浮點數自減,相似decr。
append(key, value) 在key對應的值後面添加value。例如name='alex',append(name,'leo'),返回alexleo。
9、Redis的hash操做
hset(name, key, value) 至關於name中存在多個鍵值對,例如hset('n2', 'k9', 'v99'),hset('n2', 'kx', 'vx')。
hgetall(name) 返回全部的鍵值對。
hget(name, key) 獲取info中某個key的值,例如 hget('info', 'age')。
hkeys(name) 獲取全部name中的key。
hvals(name) 獲取全部name中的value。
hmset(name,mapping) 批量設置,例如hmset('info', {'k1':'v1', 'k2':'v2'})
hmget(name, key ...) 批量獲取,例如hmget('info', 'k1', 'k2')
hlen(name) 獲取name中有幾個key-value。例如hlen('info'),返回(integer)2
hexist(name, key) 判斷name中是否存在某個鍵值。
hincrby(name, key, amount=1) 使name中key對應的value自增。
hdecrby(name, key, amount=1) 使name中key對應的value自減。
hincebyfloat(name, key, amount=1.2) 浮點自增。
hscan(name, cursor=0, match=None, count=None) 因爲hset能夠保存200多億個key-value,因此使用hkeys來查詢很耗性能,因此可使用hscan進行過濾。例如hscan('info',0,'*e*'),過濾key中帶'e'的key-value。
hscan_iter(name, match=None, count=None) 返回一個迭代器,而後使用循環獲取key-value。
例如:
for item in r.hscan_iter('info'): print item
10、Redis的list操做
lpush(names, val1, val2...) 往列表中插入數據,若是不存在則建立。lpush是從數據的最前面插入(最左邊)。
lrange(names, start, end) 去列表中的一個範圍的數據,例如lrange(names, 0, -1)即取names中的全部數據。
rpush(names, val1, val2..) 與lpush相反,rpush是從最後面插入(最右邊)。
lpushx(names, value) 只有當names存在時才插入,不然無效。
rpushx(names, value) 相似lpushx。
llen(names) 返回names對應list元素個數。
linsert(names, where, refvalue, value) 在names列表中,refvalue值的前面仍是後面插入value。例如linsert(names, BEFORE, 'alex', 'leo'),在names列表的'alex'前插入'leo'。where填寫BEFORE和AFTER。
lset(names, index, value) 在names列表中某個index處從新設置值。
lrem(names, value, num) 刪除names列表中指定的值,num表示刪除的數量(正數爲從前到後,負數爲從後到前)。例如lrem(names, 'leo', 2)即從前到後刪除2個'leo'。
lpop(names) 從names的最前面刪除一個元素,並返回。
rpop(names) 從names的最後面刪除一個元素,並返回。
lindex(names, index) 從names中根據index獲取相應位置的元素。
ltrim(name, start, end) 移除names中start-end下標範圍的元素。
rpoplpush(src, dst) 從一個列表的最右邊取元素,添加到另外一個列表的最左邊。
11、Redis的set(集合)操做
集合就是不容許重複的列表。
sadd(names, val1, val2...) 向names中添加元素,可是不能重複,若是與集合中某個元素重複,則不會插入該元素。
smembers(names) 獲取names集合中的全部元素,這些元素都是不重複的。
sdiff(names1, names2...) 獲取差集,names1-names2
sdiffstore(res, names1, names2...) 獲取names1-names2的結果,並存放到res集合中。
sinter(names1, names2...) 獲取多個集合之間的交集。
sinterstore(res, names1, names2...) 獲取集合之間的交集,並存放到res集合中。
sismember(names, value) 判斷value是不是names集合中的成員。
smove(src, dst, value) 將某個成員從一個集合移動到另一個集合中。
spop(names) 從集合的右側移除一個成員,並將其返回。
srandmember(names, num) 從names集合中隨機獲取num個元素。
srem(names, value...) 在names集合中刪除某些值。
sunion(names1, names2...) 多個集合的並集。
sunionstore(res, names1, names2...) 多個集合的並集,並存放res集合中。
sscan(names, cursor=0, match=None, count=None) 和hscan很像,即過濾。
sscan_iter(names, match=None, count=None) 和hscan_iter很像,即返回一個迭代器,而後使用循環獲取值。
有序集合:
在集合的基礎上,爲每一個元素排序。元素的排序須要根據另外一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。
zadd(names, s1, val1, s2, val2...) 向有序集合中插入數據,每一個數據都有一個分數,用於排序。例如zadd(names,10,'leo',20,'alex')。
zrange(names, start, end, desc=False, withscores=False, score_cast_func=float) 查看有序集合的部分元素,例如zrange(names, 0, -1, withscores=True)查看所有元素(帶分數)。
zcard(names) 獲取names對應有序集合元素的數量。
zcount(names, min, max) 獲取有序集合中分數在[min, max]之間元素的個數。
zincrby(names, value, amount) 自增names有序集合中對應value值元素的分數。
zrank(names, value) 獲取有序集合中value對應元素的排行(從0開始,從小到大)。
zrerang(name, value) 與zrank相反,從大到小。
zrem(names, value) 刪除names中值爲value的元素。
zremrangebyrank(names, min, max) 根據排行範圍(至關於index)刪除。
zremrangebyscore(names, min, max) 根據分數範圍刪除。
zscore(names, value) 獲取names有序集合中value對應元素的分數。
zinterstore(res, keys, aggregate=None) 獲取兩個有序集合的交集,若是其中有值相同分數不一樣的元素,則對其分數進行SUM,MIN,MAX三種操做。
zunionstore(res, keys, aggregate=None) 獲取兩個有序集合的並集,若是其中有值相同的元素,則對其分數進行SUM,MIN,MAX三種操做。
zscan(name, cursor=0, match=None, count=None,score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做。
12、其餘經常使用操做
delete(key) 任意刪除一個key對應的元素。
exist(key) 查看是否存在key對應的元素。
keys(pattern='*') 根據pattern匹配獲取redis中全部的key。
expire(key, time) 爲某個元素設置超時時間(秒)。到時間自動刪除。
rename(src, dst) 重命名。
move(name, db) 將某個值移動到指定的db下。redis中默認有16個db(至關於表),編號爲0-15。每一個db中數據都是新的。使用select db_id來切換。
type(key) 獲取key對應值的類型。
scan(cursor=0, match=None, count=None) 同字符串操做
scan_iter(match=None, count=None) 同字符串操做
十3、Redis管道(pipline)
使用pipe能夠將多個操做黏在一塊兒,變成一個原子操做。
import redis pool = redis.ConnectionPool(host='192.168.1.66', port=6379) r = redis.Redis(host='192.168.1.66', port=6379) pipe = r.pipeline(transaction=True) # 定義一個pipe pipe.set('haha1', 'good') # 加入動做 pipe.set('haha2', 'UP') # 加入動做 pipe.execute() # 開始執行pipe,其中的兩個動做是黏在一塊兒的原子操做
十4、發佈訂閱
使用Redis也能夠作到和RabbitMQ相似的廣播效果。
首先封裝一個RedisHelper:
import redis class RedisHelper(object): def __init__(self): self.__conn = redis.Redis(host='192.168.1.66', port=6379) self.channel_pub = 'fm104.5' # 發佈的頻道 self.channel_sub = 'fm104.5' # 接收的頻道 def publishing(self, msg): self.__conn.publish(self.channel_pub, msg) return True def subscribe(self): sub = self.__conn.pubsub() # 至關於打開收音機 sub.subscribe(self.channel_sub) # 選擇頻道 sub.parse_response() # 準備接收(再次調用纔是正式接收) return sub
建立一個訂閱者:
from redishelper import RedisHelper rh = RedisHelper() rh_sub = rh.subscribe() print('打開收音機,調頻104.5,開始接收消息:') while True: msg = rh_sub.parse_response() print(msg)
建立一個發佈者:
from redishelper import RedisHelper rh = RedisHelper() rh.publishing('Hello World!')
也能夠經過redis-cli來發布,使用PUBLISH fm104.5 'Hello World'便可。