目錄python
Twistedreact
Redisgit
RabbitMQ程序員
Twistedgithub
事件驅動redis
事件驅動分爲兩個部分:第一,註冊事件;第二,觸發事件。數據庫
自定義事件啓動框架,命名爲:「弒君者」:緩存
#!/usr/bin/env python # -*- coding:utf-8 -*- # event_drive.py event_list = [] def run(): for event in event_list: obj = event() obj.execute() class BaseHandler(object): """ 用戶必須繼承該類,從而規範全部類的方法(相似於接口的功能) """ def execute(self): raise Exception('you must overwrite execute') 最牛逼的事件驅動框架
程序員使用「弒君者框架」:服務器
#!/usr/bin/env python # -*- coding:utf-8 -*- from source import event_drive class MyHandler(event_drive.BaseHandler): def execute(self): print 'event-drive execute MyHandler' event_drive.event_list.append(MyHandler) event_drive.run()
Protocols描述瞭如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含以下的方法:網絡
makeConnection 在transport對象和服務器之間創建一條鏈接 connectionMade 鏈接創建起來後調用 dataReceived 接收數據時調用 connectionLost 關閉鏈接時調用
Transports表明網絡中兩個通訊結點之間的鏈接。Transports負責描述鏈接的細節,好比鏈接是面向流式的仍是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可做爲transports的例子。它們被設計爲「知足最小功能單元,同時具備最大程度的可複用性」,並且從協議實現中分離出來,這讓許多協議能夠採用相同類型的傳輸。Transports實現了ITransports接口,它包含以下的方法:
write 以非阻塞的方式按順序依次將數據寫到物理鏈接上 writeSequence 將一個字符串列表寫到物理鏈接上 loseConnection 將全部掛起的數據寫入,而後關閉鏈接 getPeer 取得鏈接中對端的地址信息 getHost 取得鏈接中本端的地址信息
將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。能夠經過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查。
EchoServer
代碼:
1 #!/usr/bin/env python 2 # encoding: utf-8 3 4 from twisted.internet import protocol 5 from twisted.internet import reactor 6 7 class Echo(protocol.Protocol): 8 def dataReceived(self, data): #只要twisted一收到數據,就會調用此方法 9 self.transport.write(data) #將接受到的數據發返回給客戶端 10 11 def main(): 12 factory = protocol.ServerFactory() #定義基礎工廠類(定義一些公共的東西) 13 factory.protocol = Echo #至關於socketserver中的handle 14 15 reactor.listenTCP(8888,factory) #不容許寫IP地址 16 reactor.run() 17 18 if __name__ == '__main__': 19 main()
EchoClien
代碼:
1 #!/usr/bin/env python 2 # encoding: utf-8 3 4 from twisted.internet import reactor, protocol 5 6 # a client protocol 7 8 class EchoClient(protocol.Protocol): 9 """Once connected, send a message, then print the result.""" 10 11 def connectionMade(self): #連接一創建成功,就會自動調用此方法 12 self.transport.write("hello boy!") #向服務端發數據 13 14 def dataReceived(self, data): #收到數據調用從方法 15 "As soon as any data is received, write it back." 16 print("Server said:", data) 17 self.transport.loseConnection() #將全部掛起的數據寫入,而後關閉鏈接 18 19 def connectionLost(self, reason):#關閉鏈接時調用 20 print("connection lost") 21 22 class EchoFactory(protocol.ClientFactory): 23 protocol = EchoClient #至關於handle 24 25 def clientConnectionFailed(self, connector, reason): #若是連接失敗,自動調用此方法 26 print("Connection failed - goodbye!") 27 reactor.stop() 28 29 def clientConnectionLost(self, connector, reason): #連接過程當中斷開,自動調用次方法 30 print("Connection lost - goodbye!") 31 reactor.stop() 32 33 34 # this connects the protocol to a server running on port 8000 35 def main(): 36 f = EchoFactory() 37 reactor.connectTCP("127.0.0.1", 8888, f) 38 reactor.run() 39 40 # this only runs if the module was *not* imported 41 if __name__ == '__main__': 42 main()
Redis
redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。
Redis 是一個高性能的key-value數據庫。 redis的出現,很大程度補償了memcached這類key/value存儲的不足,在部 分場合能夠對關係數據庫起到很好的補充做用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客戶端,使用很方便。[1]
Redis支持主從同步。數據能夠從主服務器向任意數量的從服務器上同步,從服務器能夠是關聯其餘從服務器的主服務器。這使得Redis可執行單層樹複製。存盤能夠有意無心的對數據進行寫操做。因爲徹底實現了發佈/訂閱機制,使得從數據庫在任何地方同步樹時,可訂閱一個頻道並接收主服務器完整的消息發佈記錄。同步對讀取操做的可擴展性和數據冗餘頗有幫助。
redis的官網地址,很是好記,是redis.io。(特地查了一下,域名後綴io屬於國家域名,是british Indian Ocean territory,即英屬印度洋領地)
目前,Vmware在資助着redis項目的開發和維護。
Redis安裝和基本使用
wget http://download.redis.io/releases/redis-3.0.6.tar.gz tar xzf redis-3.0.6.tar.gz cd redis-3.0.6 make
啓動服務端
在redis-3.0.6目錄下
src/redis-server
啓動客戶端
在redis-3.0.6目錄下 src/redis-cli redis> set foo bar OK redis> get foo "bar"
Python操做Redis
sudo pip install redis or sudo easy_install redis or 源碼安裝 詳見:https://github.com/WoLpH/redis-py
API使用
redis-py 的API的使用能夠分類爲:
一、操做模式
redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向後兼容舊版本的redis-py。
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis r = redis.Redis(host=redis服務器IP地址, port=6379) r.set('foo', 'Bar') print (r.get('foo'))
二、鏈接池
redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host=redis服務器IP, port=6379) r = redis.Redis(connection_pool=pool) r.set('foo', 'Bar') print (r.get('foo'))
三、操做
String操做,redis中的String在在內存中按照一個name對應一個value來存儲。如圖:
set(name, value, ex=None, px=None, nx=False, xx=False)
在Redis中設置值,默認,不存在則建立,存在則修改
參數:
ex,過時時間(秒)
px,過時時間(毫秒)
nx,若是設置爲True,則只有name不存在時,當前set操做才執行
xx,若是設置爲True,則只有name存在時,當前前set操做才執行
setnx(name, value):設置值,只有name不存在時,執行設置操做(添加)
setex(name, value, time):設置值
參數:
time,過時時間(數字秒 或 timedelta對象)
psetex(name, value,time_ms):設置值
參數:
time_ms,過時時間(數字毫秒 或 timedelta對象)
mset(*args, **kwargs):批量設置值
如:
mset(k1=
'v1'
, k2=
'v2'
)
或
mget({
'k1'
:
'v1'
,
'k2'
:
'v2'
})
get(name):獲取值
mget(keys, *args):批量獲取
如:
mget(
'ylr'
,
'wupeiqi'
)
或
r.mget([
'ylr'
,
'wupeiqi'
])
getset(name, value):設置新值並獲取原來的值
getrange(key, start, end):獲取子序列(根據字節獲取,非字符)
參數:
name,Redis 的 name
start,起始位置(字節)
end,結束位置(字節)
如: "李小明" ,0-3表示 "李"
setrange(name, offset, value):修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)
參數:
offset,字符串的索引,字節(一個漢字三個字節)
value,要設置的值
setbit(name, offset, value):對name對應值的二進制表示的位進行操做
參數:
name,redis的name
offset,位的索引(將值變換成二進制後再進行索引)
value,值只能是 1 或 0
getbit(name, offset):獲取name對應的值的二進制表示中的某位的值 (0或1)
bitcount(key, start=None, end=None):獲取name對應的值的二進制表示中 1 的個數
參數:
key,Redis的name
start,位起始位置
end,位結束位置
bitop(operation, dest, *keys):獲取多個值,並將值作位運算,將最後的結果保存至新的name對應的值
參數:
operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或)
dest, 新的Redis的name
*keys,要查找的Redis的name
strlen(name):返回name對應值的字節長度(一個漢字3個字節)
incr(self, name, amount=1):自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
參數:
name,Redis的name
amount,自增數(必須是整數)
注:同incrby
incrbyfloat(self, name, amount=1.0):自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
參數:
name,Redis的name
amount,自增數(浮點型)
decr(self, name, amount=1):自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。
參數:
name,Redis的name
amount,自減數(整數)
append(key, value):在redis name對應的值後面追加內容
參數:
key, redis的name
value, 要追加的字符串
Hash操做,redis中Hash在內存中的存儲格式以下圖:
hset(name, key, value):name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)
參數:
name,redis的name
key,name對應的hash中的key
value,name對應的hash中的value
注:
hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)
hmset(name, mapping):在name對應的hash中批量設置鍵值對
參數:
name,redis的name
mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
hget(name,key):在name對應的hash中獲取根據key獲取value
hmget(name, keys, *args):在name對應的hash中獲取多個key的值
參數:
name,reids對應的name
keys,要獲取key集合,如:['k1', 'k2', 'k3']
*args,要獲取的key,如:k1,k2,k3
hgetall(name):獲取name對應
hash
的全部鍵值
hlen(name):獲取name對應的hash中鍵值對的個數
hkeys(name):獲取name對應的hash中全部的key的值
hvals(name):獲取name對應的hash中全部的value的值
hexists(name, key):檢查name對應的hash是否存在當前傳入的key
hdel(name,*keys):將name對應的hash中指定key的鍵值對刪除
hincrby(name, key, amount=1):自增name對應的hash中的指定key的值,不存在則建立key=amount
參數:
name,redis中的name
key, hash對應的key
amount,自增數(整數)
hincrbyfloat(name, key, amount=1.0):自增name對應的hash中的指定key的值,不存在則建立key=amount
參數:
name,redis中的name
key, hash對應的key
amount,自增數(浮點數)
自增name對應的hash中的指定key的值,不存在則建立key=amount
hscan(name, cursor=0, match=None, count=None):增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆
參數:
name,redis的name
cursor,遊標(基於遊標分批取獲取數據)
match,匹配指定key,默認None 表示全部的key
count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
hscan_iter(name, match=None, count=None):利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
參數:
match,匹配指定key,默認None 表示全部的key
count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
List操做,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:
lpush(name,values):在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊
如:
r.lpush('oo', 11,22,33)
保存順序爲: 33,22,11
擴展:
rpush(name, values) 表示從右向左操做
lpushx(name,value):在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
更多:
rpushx(name, value) 表示從右向左操做
llen(name):name對應的list元素的個數
linsert(name, where, refvalue, value)):在name對應的列表的某一個值前或後插入一個新值
參數:
name,redis的name
where,BEFORE或AFTER
refvalue,標杆值,即:在它先後插入數據
value,要插入的數據
r.lset(name, index, value):對name對應的list中的某一個索引位置從新賦值
參數:
name,redis的name
index,list的索引位置
value,要設置的值
r.lrem(name, value, num):在name對應的list中刪除指定的值
參數:
name,redis的name
value,要刪除的值
num, num=0,刪除列表中全部的指定值;
num=2,從前到後,刪除2個;
num=-2,從後向前,刪除2個
lpop(name):在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
更多:
rpop(name) 表示從右向左操做
lindex(name, index):在name對應的列表中根據索引獲取列表元素
lrange(name, start, end):在name對應的列表分片獲取數據
參數:
name,redis的name
start,索引的起始位置
end,索引結束位置
ltrim(name, start, end):在name對應的列表中移除沒有在start-end索引之間的值
參數:
name,redis的name
start,索引的起始位置
end,索引結束位置
rpoplpush(src, dst):從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
參數:
src,要取數據的列表的name
dst,要添加數據的列表的name
blpop(keys, timeout):將多個列表排列,按照從左到右去pop對應列表的元素
參數:
keys,redis的name的集合
timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
更多:
r.brpop(keys, timeout),從右向左獲取數據
brpoplpush(src, dst, timeout=0):從一個列表的右側移除一個元素並將其添加到另外一個列表的左側
參數:
src,取出並要移除元素的列表對應的name
dst,要插入元素的列表對應的name
timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞
自定義增量迭代
# 因爲redis類庫中沒有提供對列表元素的增量迭代,若是想要循環name對應的列表的全部元素,那麼就須要: # 一、獲取name對應的全部列表 # 二、循環列表 # 可是,若是列表很是大,那麼就有可能在第一步時就將程序的內容撐爆,全部有必要自定義一個增量迭代的功能: def list_iter(name): """ 自定義redis列表增量迭代 :param name: redis中的name,即:迭代name對應的列表 :return: yield 返回 列表元素 """ list_count = r.llen(name) for index in xrange(list_count): yield r.lindex(name, index) # 使用 for item in list_iter('pp'): print item
Set操做,Set集合就是不容許重複的列表
sadd(name,values):name對應的集合中添加元素
scard(name):獲取name對應的集合中元素個數
sdiff(keys, *args):在第一個name對應的集合中且不在其餘name對應的集合的元素集合
sdiffstore(dest, keys, *args):獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中
sinter(keys, *args):# 獲取多一個name對應集合的並集
sinterstore(dest, keys, *args):獲取多一個name對應集合的並集,再講其加入到dest對應的集合中
sismember(name, value):檢查value是不是name對應的集合的成員
smembers(name):獲取name對應的集合的全部成員
smove(src, dst, value):將某個成員從一個集合中移動到另一個集合
spop(name):從集合的右側(尾部)移除一個成員,並將其返回
srandmember(name, numbers):從name對應的集合中隨機獲取 numbers 個元素
srem(name, values):在name對應的集合中刪除某些值
sunion(keys, *args):獲取多一個name對應的集合的並集
sunionstore(dest,keys, *args):獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大
有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。
zadd(name, *args, **kwargs):在name對應的有序集合中添加元素
zcard(name):獲取name對應的有序集合元素的數量
zcount(name, min, max):獲取name對應的有序集合中分數 在 [min,max] 之間的個數
zincrby(name, value, amount):自增name對應的有序集合的 name 對應的分數
r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float):按照索引範圍獲取name對應的有序集合的元素
參數:
name,redis的name
start,有序集合索引發始位置(非分數)
end,有序集合索引結束位置(非分數)
desc,排序規則,默認按照分數從小到大排序
withscores,是否獲取元素的分數,默認只獲取元素的值
score_cast_func,對分數進行數據轉換的函數
更多:
從大到小排序
zrevrange(name, start, end, withscores=False, score_cast_func=float)
按照分數範圍獲取name對應的有序集合的元素
zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
從大到小排序
zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
zrank(name, value):獲取某個值在 name對應的有序集合中的排行(從 0 開始)
更多:
# zrevrank(name, value),從大到小排序
zrangebylex(name, min, max, start=None, num=None):
當有序集合的全部成員都具備相同的分值時,有序集合的元素會根據成員的 值 (lexicographical ordering)來進行排序,而這個命令則能夠返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員
對集合中的每一個成員進行逐個字節的對比(byte-by-byte compare), 並按照從低到高的順序, 返回排序後的集合成員。 若是兩個字符串有一部份內容是相同的話, 那麼命令會認爲較長的字符串比較短的字符串要大
# 參數:
# name,redis的name
# min,左區間(值)。 + 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間
# min,右區間(值)
# start,對結果進行分片處理,索引位置
# num,對結果進行分片處理,索引後面的num個元素
# 如:
# ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga
# r.zrangebylex('myzset', "-", "[ca") 結果爲:['aa', 'ba', 'ca']
# 更多:
# 從大到小排序
# zrevrangebylex(name, max, min, start=None, num=None)
zrem(name, values):刪除name對應的有序集合中值是values的成員
zremrangebyrank(name, min, max):根據排行範圍刪除
zremrangebyscore(name, min, max):根據分數範圍刪除
zremrangebylex(name, min, max):根據值返回刪除
zscore(name, value):獲取name對應有序集合中 value 對應的分數
zinterstore(dest, keys, aggregate=None):獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
aggregate的值爲: SUM MIN MAX
zunionstore(dest, keys, aggregate=None):獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做
aggregate的值爲: SUM MIN MAX
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做
其餘經常使用操做
delete(*names):根據刪除redis中的任意數據類型
exists(name):檢測redis的name是否存在
keys(pattern='*'):根據模型獲取redis的name
更多:
KEYS * 匹配數據庫中全部 key 。
KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
KEYS h*llo 匹配 hllo 和 heeeeello 等。
KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
expire(name ,time):爲某個redis的某個name設置超時時間
rename(src, dst):對redis的name重命名爲
move(name, db)):將redis的某個值移動到指定的db下
randomkey():隨機獲取一個redis的name(不刪除)
type(name):獲取name對應值的類型
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
同字符串操做,用於增量迭代獲取key
管道
redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。
#!/usr/bin/env python # -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host='10.211.55.4', port=6379) r = redis.Redis(connection_pool=pool) # pipe = r.pipeline(transaction=False) pipe = r.pipeline(transaction=True) r.set('name', 'alex') r.set('role', 'sb') pipe.execute()
發佈訂閱
發佈者:服務器
訂閱者:Dashboad和數據處理
Demo以下:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import redis 5 6 7 class RedisHelper: 8 9 def __init__(self): 10 self.__conn = redis.Redis(host='10.211.55.4') 11 self.chan_sub = 'fm104.5' 12 self.chan_pub = 'fm104.5' 13 14 def public(self, msg): 15 self.__conn.publish(self.chan_pub, msg) 16 return True 17 18 def subscribe(self): 19 pub = self.__conn.pubsub() 20 pub.subscribe(self.chan_sub) 21 pub.parse_response() 22 return pub
訂閱者:
#!/usr/bin/env python # -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper obj = RedisHelper() redis_sub = obj.subscribe() while True: msg= redis_sub.parse_response() print msg
發佈者:
#!/usr/bin/env python # -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper obj = RedisHelper() obj.public('hello')
更多參見:https://github.com/andymccurdy/redis-py/
RabbitMQ
實現最簡單的隊列通訊
send端
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() #聲明queue channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少
消息提供者代碼
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #聲明queue channel.queue_declare(queue='task_queue') import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2,
)) print(" [x] Sent %r" % message) connection.close()
消息收取者代碼
import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='task_queue', ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
#聲明Queue隊列時,可添加durable參數(生產者,消費者,均添加) channel.queue_declare(queue='hello', durable=True)
若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
channel.basic_qos(prefetch_count=1)
帶消息持久化+公平分發的完整代碼
生產者端
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, )) print(" [x] Sent %r" % message) connection.close()
消費者端
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
表達式符號說明:#表明一個或多個字符,*表明任何字符
例:#.a會匹配a.a,aa.a,aaa.a等
*.a會匹配a.a,b.a,c.a等
注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout
headers: 經過headers 來決定把消息發給哪些queue
消息publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消息subscriber
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
publisher
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
subscriber
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
RPC server
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
RPC client
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)