python學習day10

目錄python

Twistedreact

Redisgit

RabbitMQ程序員

 


Twistedgithub

 

事件驅動redis

事件驅動分爲兩個部分:第一,註冊事件;第二,觸發事件。數據庫

自定義事件啓動框架,命名爲:「弒君者」:緩存

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
# event_drive.py
 
event_list = []
 
 
def run():
    for event in event_list:
        obj = event()
        obj.execute()
 
 
class BaseHandler(object):
    """
    用戶必須繼承該類,從而規範全部類的方法(相似於接口的功能)
    """
    def execute(self):
        raise Exception('you must overwrite execute')
 
最牛逼的事件驅動框架

程序員使用「弒君者框架」:服務器

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from source import event_drive
 
 
class MyHandler(event_drive.BaseHandler):
 
    def execute(self):
        print 'event-drive execute MyHandler'
 
 
event_drive.event_list.append(MyHandler)
event_drive.run()

 

Protocols

Protocols描述瞭如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含以下的方法:網絡

makeConnection               在transport對象和服務器之間創建一條鏈接
connectionMade               鏈接創建起來後調用
dataReceived                 接收數據時調用
connectionLost               關閉鏈接時調用
Transports

Transports表明網絡中兩個通訊結點之間的鏈接。Transports負責描述鏈接的細節,好比鏈接是面向流式的仍是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可做爲transports的例子。它們被設計爲「知足最小功能單元,同時具備最大程度的可複用性」,並且從協議實現中分離出來,這讓許多協議能夠採用相同類型的傳輸。Transports實現了ITransports接口,它包含以下的方法:

write                   以非阻塞的方式按順序依次將數據寫到物理鏈接上
writeSequence           將一個字符串列表寫到物理鏈接上
loseConnection          將全部掛起的數據寫入,而後關閉鏈接
getPeer                 取得鏈接中對端的地址信息
getHost                 取得鏈接中本端的地址信息

將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。能夠經過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查。

 

EchoServer

new 4.txt

代碼:

 1 #!/usr/bin/env python
 2 # encoding: utf-8
 3 
 4 from twisted.internet import protocol
 5 from twisted.internet import reactor
 6 
 7 class Echo(protocol.Protocol):
 8     def dataReceived(self, data):   #只要twisted一收到數據,就會調用此方法
 9         self.transport.write(data)  #將接受到的數據發返回給客戶端
10 
11 def main():
12     factory = protocol.ServerFactory()  #定義基礎工廠類(定義一些公共的東西)
13     factory.protocol = Echo     #至關於socketserver中的handle
14 
15     reactor.listenTCP(8888,factory) #不容許寫IP地址
16     reactor.run()
17 
18 if __name__ == '__main__':
19     main()
EchoServer

 

EchoClien

new 5.txt

代碼:

 1 #!/usr/bin/env python
 2 # encoding: utf-8
 3 
 4 from twisted.internet import reactor, protocol
 5 
 6 # a client protocol
 7 
 8 class EchoClient(protocol.Protocol):
 9     """Once connected, send a message, then print the result."""
10 
11     def connectionMade(self):   #連接一創建成功,就會自動調用此方法
12         self.transport.write("hello boy!")  #向服務端發數據
13 
14     def dataReceived(self, data):   #收到數據調用從方法
15         "As soon as any data is received, write it back."
16         print("Server said:", data)
17         self.transport.loseConnection() #將全部掛起的數據寫入,而後關閉鏈接
18 
19     def connectionLost(self, reason):#關閉鏈接時調用
20         print("connection lost")
21 
22 class EchoFactory(protocol.ClientFactory):
23     protocol = EchoClient   #至關於handle
24 
25     def clientConnectionFailed(self, connector, reason):    #若是連接失敗,自動調用此方法
26         print("Connection failed - goodbye!")
27         reactor.stop()
28 
29     def clientConnectionLost(self, connector, reason):  #連接過程當中斷開,自動調用次方法
30         print("Connection lost - goodbye!")
31         reactor.stop()
32 
33 
34 # this connects the protocol to a server running on port 8000
35 def main():
36     f = EchoFactory()
37     reactor.connectTCP("127.0.0.1", 8888, f)
38     reactor.run()
39 
40 # this only runs if the module was *not* imported
41 if __name__ == '__main__':
42     main()
EchoClient

 


Redis

redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。

Redis 是一個高性能的key-value數據庫。 redis的出現,很大程度補償了memcached這類key/value存儲的不足,在部 分場合能夠對關係數據庫起到很好的補充做用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客戶端,使用很方便。[1]

Redis支持主從同步。數據能夠從主服務器向任意數量的從服務器上同步,從服務器能夠是關聯其餘從服務器的主服務器。這使得Redis可執行單層樹複製。存盤能夠有意無心的對數據進行寫操做。因爲徹底實現了發佈/訂閱機制,使得從數據庫在任何地方同步樹時,可訂閱一個頻道並接收主服務器完整的消息發佈記錄。同步對讀取操做的可擴展性和數據冗餘頗有幫助。

redis的官網地址,很是好記,是redis.io。(特地查了一下,域名後綴io屬於國家域名,是british Indian Ocean territory,即英屬印度洋領地)

目前,Vmware在資助着redis項目的開發和維護。

 

Redis安裝和基本使用

wget http://download.redis.io/releases/redis-3.0.6.tar.gz
tar xzf redis-3.0.6.tar.gz
cd redis-3.0.6
make

啓動服務端

在redis-3.0.6目錄下
src/redis-server

啓動客戶端

在redis-3.0.6目錄下
src/redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

Python操做Redis

sudo pip install redis
or
sudo easy_install redis
or
源碼安裝
 
詳見:https://github.com/WoLpH/redis-py

 

API使用

redis-py 的API的使用能夠分類爲:

  • 鏈接方式
  • 鏈接池
  • 操做
    • String 操做
    • Hash 操做
    • List 操做
    • Set 操做
    • Sort Set 操做
  • 管道
  • 發佈訂閱

一、操做模式

redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向後兼容舊版本的redis-py。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import redis
 
r = redis.Redis(host=redis服務器IP地址, port=6379)
r.set('foo', 'Bar')
print (r.get('foo'))

二、鏈接池

redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import redis
 
pool = redis.ConnectionPool(host=redis服務器IP, port=6379)
 
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print (r.get('foo'))

三、操做

String操做,redis中的String在在內存中按照一個name對應一個value來存儲。如圖:

 

set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中設置值,默認,不存在則建立,存在則修改
參數:
     ex,過時時間(秒)
     px,過時時間(毫秒)
     nx,若是設置爲True,則只有name不存在時,當前set操做才執行
     xx,若是設置爲True,則只有name存在時,當前前set操做才執行

setnx(name, value):設置值,只有name不存在時,執行設置操做(添加)

 

setex(name, value, time):設置值

參數:

time,過時時間(數字秒 或 timedelta對象)

 

psetex(name,  value,time_ms):設置值

參數:

time_ms,過時時間(數字毫秒 或 timedelta對象)

 

mset(*args, **kwargs):批量設置值

如:

mset(k1='v1', k2='v2')

mget({'k1': 'v1', 'k2': 'v2'})

 

get(name):獲取值

 

mget(keys, *args):批量獲取

如:

mget('ylr', 'wupeiqi')

r.mget(['ylr', 'wupeiqi'])

 

getset(name, value):設置新值並獲取原來的值

 

getrange(key, start, end):獲取子序列(根據字節獲取,非字符)

參數:

name,Redis 的 name

start,起始位置(字節)

end,結束位置(字節)

如: "李小明" ,0-3表示 "李"

 

setrange(name, offset, value):修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)

參數:

offset,字符串的索引,字節(一個漢字三個字節)

value,要設置的值

 

setbit(name, offset, value):對name對應值的二進制表示的位進行操做

參數:
  name,redis的name
  offset,位的索引(將值變換成二進制後再進行索引)
  value,值只能是 1 或 0

 

getbit(name, offset):獲取name對應的值的二進制表示中的某位的值 (0或1)

 

bitcount(key, start=None, end=None):獲取name對應的值的二進制表示中 1 的個數

參數:

key,Redis的name

start,位起始位置

end,位結束位置

 

bitop(operation, dest, *keys):獲取多個值,並將值作位運算,將最後的結果保存至新的name對應的值

參數:

operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或)

dest, 新的Redis的name

*keys,要查找的Redis的name

 

strlen(name):返回name對應值的字節長度(一個漢字3個字節)

 

incr(self, name, amount=1):自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。

參數:

name,Redis的name

amount,自增數(必須是整數)

注:同incrby

 

incrbyfloat(self, name, amount=1.0):自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。

參數:

name,Redis的name

amount,自增數(浮點型)

 

decr(self, name, amount=1):自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。

參數:

name,Redis的name

amount,自減數(整數)

 

append(key, value):在redis name對應的值後面追加內容

參數:

key, redis的name

value, 要追加的字符串

 

Hash操做,redis中Hash在內存中的存儲格式以下圖:

 

hset(name, key, value):name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)

參數:

name,redis的name

key,name對應的hash中的key

value,name對應的hash中的value

注:

hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)

 

hmset(name, mapping):在name對應的hash中批量設置鍵值對

參數:

name,redis的name

mapping,字典,如:{'k1':'v1', 'k2': 'v2'}

 

hget(name,key):在name對應的hash中獲取根據key獲取value

 

hmget(name, keys, *args):在name對應的hash中獲取多個key的值

參數:

name,reids對應的name

keys,要獲取key集合,如:['k1', 'k2', 'k3']

*args,要獲取的key,如:k1,k2,k3

 

hgetall(name):獲取name對應hash的全部鍵值

 

hlen(name):獲取name對應的hash中鍵值對的個數

 

hkeys(name):獲取name對應的hash中全部的key的值

 

hvals(name):獲取name對應的hash中全部的value的值

 

hexists(name, key):檢查name對應的hash是否存在當前傳入的key

 

hdel(name,*keys):將name對應的hash中指定key的鍵值對刪除

 

hincrby(name, key, amount=1):自增name對應的hash中的指定key的值,不存在則建立key=amount

參數:

name,redis中的name

key, hash對應的key

amount,自增數(整數)

 

hincrbyfloat(name, key, amount=1.0):自增name對應的hash中的指定key的值,不存在則建立key=amount

參數:

name,redis中的name

key, hash對應的key

amount,自增數(浮點數)

自增name對應的hash中的指定key的值,不存在則建立key=amount

 

hscan(name, cursor=0, match=None, count=None):增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆

參數:

name,redis的name

cursor,遊標(基於遊標分批取獲取數據)

match,匹配指定key,默認None 表示全部的key

count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數

 

hscan_iter(name, match=None, count=None):利用yield封裝hscan建立生成器,實現分批去redis中獲取數據

參數:

match,匹配指定key,默認None 表示全部的key

count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數

 

List操做,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:

 

lpush(name,values):在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊

如:

r.lpush('oo', 11,22,33)

保存順序爲: 33,22,11

擴展:

rpush(name, values) 表示從右向左操做

 

lpushx(name,value):在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊

更多:

rpushx(name, value) 表示從右向左操做

 

llen(name):name對應的list元素的個數

 

linsert(name, where, refvalue, value)):在name對應的列表的某一個值前或後插入一個新值

參數:

name,redis的name

where,BEFORE或AFTER

refvalue,標杆值,即:在它先後插入數據

value,要插入的數據

 

r.lset(name, index, value):對name對應的list中的某一個索引位置從新賦值

參數:

name,redis的name

index,list的索引位置

value,要設置的值

 

r.lrem(name, value, num):在name對應的list中刪除指定的值

參數:

name,redis的name

value,要刪除的值

num,  num=0,刪除列表中全部的指定值;

num=2,從前到後,刪除2個;

num=-2,從後向前,刪除2個

 

lpop(name):在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素

更多:

rpop(name) 表示從右向左操做

 

lindex(name, index):在name對應的列表中根據索引獲取列表元素

 

lrange(name, start, end):在name對應的列表分片獲取數據

參數:

name,redis的name

start,索引的起始位置

end,索引結束位置

 

ltrim(name, start, end):在name對應的列表中移除沒有在start-end索引之間的值

參數:

name,redis的name

start,索引的起始位置

end,索引結束位置

 

rpoplpush(src, dst):從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊

參數:

src,要取數據的列表的name

dst,要添加數據的列表的name

 

blpop(keys, timeout):將多個列表排列,按照從左到右去pop對應列表的元素

參數:

keys,redis的name的集合

timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞

更多:

r.brpop(keys, timeout),從右向左獲取數據

 

brpoplpush(src, dst, timeout=0):從一個列表的右側移除一個元素並將其添加到另外一個列表的左側

參數:

src,取出並要移除元素的列表對應的name

dst,要插入元素的列表對應的name

timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞

 

自定義增量迭代

# 因爲redis類庫中沒有提供對列表元素的增量迭代,若是想要循環name對應的列表的全部元素,那麼就須要:
    # 一、獲取name對應的全部列表
    # 二、循環列表
# 可是,若是列表很是大,那麼就有可能在第一步時就將程序的內容撐爆,全部有必要自定義一個增量迭代的功能:
 
def list_iter(name):
    """
    自定義redis列表增量迭代
    :param name: redis中的name,即:迭代name對應的列表
    :return: yield 返回 列表元素
    """
    list_count = r.llen(name)
    for index in xrange(list_count):
        yield r.lindex(name, index)
 
# 使用
for item in list_iter('pp'):
    print item

 

Set操做,Set集合就是不容許重複的列表

 

sadd(name,values):name對應的集合中添加元素

 

scard(name):獲取name對應的集合中元素個數

 

sdiff(keys, *args):在第一個name對應的集合中且不在其餘name對應的集合的元素集合

 

sdiffstore(dest, keys, *args):獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中

 

sinter(keys, *args):# 獲取多一個name對應集合的並集

 

sinterstore(dest, keys, *args):獲取多一個name對應集合的並集,再講其加入到dest對應的集合中

 

sismember(name, value):檢查value是不是name對應的集合的成員

 

smembers(name):獲取name對應的集合的全部成員

 

smove(src, dst, value):將某個成員從一個集合中移動到另一個集合

 

spop(name):從集合的右側(尾部)移除一個成員,並將其返回

 

srandmember(name, numbers):從name對應的集合中隨機獲取 numbers 個元素

 

srem(name, values):在name對應的集合中刪除某些值

 

sunion(keys, *args):獲取多一個name對應的集合的並集

 

sunionstore(dest,keys, *args):獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中

 

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大

 

有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。

 

zadd(name, *args, **kwargs):在name對應的有序集合中添加元素

 

zcard(name):獲取name對應的有序集合元素的數量

 

zcount(name, min, max):獲取name對應的有序集合中分數 在 [min,max] 之間的個數

 

zincrby(name, value, amount):自增name對應的有序集合的 name 對應的分數

 

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float):按照索引範圍獲取name對應的有序集合的元素

參數:

name,redis的name

start,有序集合索引發始位置(非分數)

end,有序集合索引結束位置(非分數)

desc,排序規則,默認按照分數從小到大排序

withscores,是否獲取元素的分數,默認只獲取元素的值

score_cast_func,對分數進行數據轉換的函數

更多:

從大到小排序

zrevrange(name, start, end, withscores=False, score_cast_func=float)

按照分數範圍獲取name對應的有序集合的元素

zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)

從大到小排序

zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

 

zrank(name, value):獲取某個值在 name對應的有序集合中的排行(從 0 開始)

更多:

# zrevrank(name, value),從大到小排序

 

zrangebylex(name, min, max, start=None, num=None):

當有序集合的全部成員都具備相同的分值時,有序集合的元素會根據成員的 值 (lexicographical ordering)來進行排序,而這個命令則能夠返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員

對集合中的每一個成員進行逐個字節的對比(byte-by-byte compare), 並按照從低到高的順序, 返回排序後的集合成員。 若是兩個字符串有一部份內容是相同的話, 那麼命令會認爲較長的字符串比較短的字符串要大

# 參數:

# name,redis的name

# min,左區間(值)。 + 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間

# min,右區間(值)

# start,對結果進行分片處理,索引位置

# num,對結果進行分片處理,索引後面的num個元素

# 如:

# ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga

# r.zrangebylex('myzset', "-", "[ca") 結果爲:['aa', 'ba', 'ca']

# 更多:

# 從大到小排序

# zrevrangebylex(name, max, min, start=None, num=None)

 

zrem(name, values):刪除name對應的有序集合中值是values的成員

 

zremrangebyrank(name, min, max):根據排行範圍刪除

 

zremrangebyscore(name, min, max):根據分數範圍刪除

 

zremrangebylex(name, min, max):根據值返回刪除

 

zscore(name, value):獲取name對應有序集合中 value 對應的分數

 

zinterstore(dest, keys, aggregate=None):獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做

aggregate的值爲:  SUM  MIN  MAX

 

zunionstore(dest, keys, aggregate=None):獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做

aggregate的值爲:  SUM  MIN  MAX

 

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做

 

其餘經常使用操做

delete(*names):根據刪除redis中的任意數據類型

 

exists(name):檢測redis的name是否存在

 

keys(pattern='*'):根據模型獲取redis的name

更多:

KEYS * 匹配數據庫中全部 key 。

KEYS h?llo 匹配 hello , hallo 和 hxllo 等。

KEYS h*llo 匹配 hllo 和 heeeeello 等。

KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

 

expire(name ,time):爲某個redis的某個name設置超時時間

 

rename(src, dst):對redis的name重命名爲

 

move(name, db)):將redis的某個值移動到指定的db下

 

randomkey():隨機獲取一個redis的name(不刪除)

 

type(name):獲取name對應值的類型

 

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

同字符串操做,用於增量迭代獲取key

 

管道

redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import redis
 
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
 
r = redis.Redis(connection_pool=pool)
 
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
 
r.set('name', 'alex')
r.set('role', 'sb')
 
pipe.execute()

 

發佈訂閱

 

發佈者:服務器

訂閱者:Dashboad和數據處理

Demo以下:

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import redis
 5 
 6 
 7 class RedisHelper:
 8 
 9     def __init__(self):
10         self.__conn = redis.Redis(host='10.211.55.4')
11         self.chan_sub = 'fm104.5'
12         self.chan_pub = 'fm104.5'
13 
14     def public(self, msg):
15         self.__conn.publish(self.chan_pub, msg)
16         return True
17 
18     def subscribe(self):
19         pub = self.__conn.pubsub()
20         pub.subscribe(self.chan_sub)
21         pub.parse_response()
22         return pub
RedisHelper

訂閱者:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
redis_sub = obj.subscribe()
 
while True:
    msg= redis_sub.parse_response()
    print msg

發佈者:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
obj.public('hello')

更多參見:https://github.com/andymccurdy/redis-py/

http://doc.redisfans.com/


RabbitMQ

實現最簡單的隊列通訊

send端

#!/usr/bin/env python
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
 
#聲明queue
channel.queue_declare(queue='hello') 

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive端

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
 
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

Work Queues

 

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少

消息提供者代碼

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
 
#聲明queue
channel.queue_declare(queue='task_queue')
 
import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                      delivery_mode = 2, 
                      ))
print(" [x] Sent %r" % message)
connection.close()

消息收取者代碼

import pika,time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
 
 
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
 
channel.basic_consume(callback,
                      queue='task_queue',
                      )
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

消息持久化
#聲明Queue隊列時,可添加durable參數(生產者,消費者,均添加)
channel.queue_declare(queue='hello', durable=True)

 

消息公平分發

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

 

channel.basic_qos(prefetch_count=1)

 

帶消息持久化+公平分發的完整代碼

生產者端

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, 
                      ))
print(" [x] Sent %r" % message)
connection.close()

消費者端

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')
 
channel.start_consuming()

 

Publish\Subscribe(消息發佈\訂閱)

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息

fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
表達式符號說明:#表明一個或多個字符,*表明任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout

headers: 經過headers 來決定把消息發給哪些queue

消息publisher

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

有選擇的接收消息(exchange type=direct)

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

 

publisher
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
subscriber
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

更細緻的消息過濾

publisher

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

subscriber

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

Remote procedure call (RPC)

RPC server

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
 
channel = connection.channel()
 
channel.queue_declare(queue='rpc_queue')
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()

RPC client

import pika
import uuid
 
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
 
        self.channel = self.connection.channel()
 
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
相關文章
相關標籤/搜索