python走起之第十一話

Redis

redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。javascript

1、Redis安裝和基本使用html

1
2
3
4
wget http: / / download.redis.io / releases / redis - 3.0 . 6.tar .gz
tar xzf redis - 3.0 . 6.tar .gz
cd redis - 3.0 . 6
make

啓動服務端java

1
src / redis - server

啓動客戶端python

1
2
3
4
5
src / redis - cli
redis>  set  foo bar
OK
redis> get foo
"bar"

2、Python操做Redisgit

1
2
3
4
5
6
7
sudo pip install redis
or
sudo easy_install redis
or
源碼安裝
 
詳見:https: / / github.com / WoLpH / redis - py

API使用github

redis-py 的API的使用能夠分類爲:redis

  • 鏈接方式
  • 鏈接池
  • 操做
    • String 操做
    • Hash 操做
    • List 操做
    • Set 操做
    • Sort Set 操做
  • 管道
  • 發佈訂閱

 

一、操做模式數據庫

redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向後兼容舊版本的redis-py。緩存

1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import  redis
 
=  redis.Redis(host = '10.211.55.4' , port = 6379 )
r. set ( 'foo' 'Bar' )
print  r.get( 'foo' )

二、鏈接池服務器

redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import  redis
 
pool  =  redis.ConnectionPool(host = '10.211.55.4' , port = 6379 )
 
=  redis.Redis(connection_pool = pool)
r. set ( 'foo' 'Bar' )
print  r.get( 'foo' )

三、操做

String操做,redis中的String在在內存中按照一個name對應一個value來存儲。如圖:

set(name, value, ex=None, px=None, nx=False, xx=False)

1
2
3
4
5
6
在Redis中設置值,默認,不存在則建立,存在則修改
參數:
      ex,過時時間(秒)
      px,過時時間(毫秒)
      nx,若是設置爲True,則只有name不存在時,當前set操做才執行
      xx,若是設置爲True,則只有name存在時,崗前set操做才執行

setnx(name, value)

1
設置值,只有name不存在時,執行設置操做(添加)

setex(name, value, time)

1
2
3
# 設置值
# 參數:
     # time,過時時間(數字秒 或 timedelta對象)

psetex(name, time_ms, value)

1
2
3
# 設置值
# 參數:
     # time_ms,過時時間(數字毫秒 或 timedelta對象)

mset(*args, **kwargs)

1
2
3
4
5
批量設置值
如:
     mset(k1= 'v1' , k2= 'v2' )
    
     mget({ 'k1' 'v1' 'k2' 'v2' })

get(name)

1
獲取值

mget(keys, *args)

1
2
3
4
5
批量獲取
如:
     mget( 'ylr' 'wupeiqi' )
    
     r.mget([ 'ylr' 'wupeiqi' ])

getset(name, value)

1
設置新值並獲取原來的值

getrange(key, start, end)

1
2
3
4
5
6
# 獲取子序列(根據字節獲取,非字符)
# 參數:
     # name,Redis 的 name
     # start,起始位置(字節)
     # end,結束位置(字節)
# 如: "武沛齊" ,0-3表示 "武"

setrange(name, offset, value)

1
2
3
4
# 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)
# 參數:
     # offset,字符串的索引,字節(一個漢字三個字節)
     # value,要設置的值

setbit(name, offset, value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 對name對應值的二進制表示的位進行操做
 
# 參數:
     # name,redis的name
     # offset,位的索引(將值變換成二進制後再進行索引)
     # value,值只能是 1 或 0
 
# 注:若是在Redis中有一個對應: n1 = "foo",
         那麼字符串foo的二進制表示爲: 01100110  01101111  01101111
     因此,若是執行 setbit( 'n1' 7 1 ),則就會將第 7 位設置爲 1
         那麼最終二進制則變成  01100111  01101111  01101111 ,即: "goo"
 
# 擴展,轉換二進制表示:
 
     # source = "武沛齊"
     source  =  "foo"
 
     for  in  source:
         num  =  ord (i)
         print  bin (num).replace( 'b' ,'')
 
     特別的,若是source是漢字  "武沛齊" 怎麼辦?
     答:對於utf - 8 ,每個漢字佔  3  個字節,那麼  "武沛齊"  則有  9 個字節
        對於漢字, for 循環時候會按照 字節 迭代,那麼在迭代時,將每個字節轉換 十進制數,而後再將十進制數轉換成二進制
         11100110  10101101  10100110  11100110  10110010  10011011  11101001  10111101  10010000
         - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -  - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                     武                         沛                           齊

getbit(name, offset)

1
# 獲取name對應的值的二進制表示中的某位的值 (0或1)

bitcount(key, start=None, end=None)

1
2
3
4
5
# 獲取name對應的值的二進制表示中 1 的個數
# 參數:
     # key,Redis的name
     # start,位起始位置
     # end,位結束位置

bitop(operation, dest, *keys)

1
2
3
4
5
6
7
8
9
10
# 獲取多個值,並將值作位運算,將最後的結果保存至新的name對應的值
 
# 參數:
     # operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或)
     # dest, 新的Redis的name
     # *keys,要查找的Redis的name
 
# 如:
     bitop( "AND" 'new_name' 'n1' 'n2' 'n3' )
     # 獲取Redis中n1,n2,n3對應的值,而後講全部的值作位運算(求並集),而後將結果保存 new_name 對應的值中

strlen(name)

1
# 返回name對應值的字節長度(一個漢字3個字節)

incr(self, name, amount=1)

1
2
3
4
5
6
7
# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(必須是整數)
 
# 注:同incrby

incrbyfloat(self, name, amount=1.0)

1
2
3
4
5
# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(浮點型)

decr(self, name, amount=1)

1
2
3
4
5
# 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。
 
# 參數:
     # name,Redis的name
     # amount,自減數(整數)

append(key, value)

1
2
3
4
5
# 在redis name對應的值後面追加內容
 
# 參數:
     key, redis的name
     value, 要追加的字符串

  

Hash操做,redis中Hash在內存中的存儲格式以下圖:

hset(name, key, value)

1
2
3
4
5
6
7
8
9
# name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)
 
# 參數:
     # name,redis的name
     # key,name對應的hash中的key
     # value,name對應的hash中的value
 
# 注:
     # hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)

hmset(name, mapping)

1
2
3
4
5
6
7
8
# 在name對應的hash中批量設置鍵值對
 
# 參數:
     # name,redis的name
     # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
 
# 如:
     # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

1
# 在name對應的hash中獲取根據key獲取value

hmget(name, keys, *args)

1
2
3
4
5
6
7
8
9
10
11
# 在name對應的hash中獲取多個key的值
 
# 參數:
     # name,reids對應的name
     # keys,要獲取key集合,如:['k1', 'k2', 'k3']
     # *args,要獲取的key,如:k1,k2,k3
 
# 如:
     # r.mget('xx', ['k1', 'k2'])
     # 或
     # print r.hmget('xx', 'k1', 'k2')

hgetall(name)

1
獲取name對應 hash 的全部鍵值

hlen(name)

1
# 獲取name對應的hash中鍵值對的個數

hkeys(name)

1
# 獲取name對應的hash中全部的key的值

hvals(name)

1
# 獲取name對應的hash中全部的value的值

hexists(name, key)

1
# 檢查name對應的hash是否存在當前傳入的key

hdel(name,*keys)

1
# 將name對應的hash中指定key的鍵值對刪除

hincrby(name, key, amount=1)

1
2
3
4
5
# 自增name對應的hash中的指定key的值,不存在則建立key=amount
# 參數:
     # name,redis中的name
     # key, hash對應的key
     # amount,自增數(整數)

hincrbyfloat(name, key, amount=1.0)

1
2
3
4
5
6
7
8
# 自增name對應的hash中的指定key的值,不存在則建立key=amount
 
# 參數:
     # name,redis中的name
     # key, hash對應的key
     # amount,自增數(浮點數)
 
# 自增name對應的hash中的指定key的值,不存在則建立key=amount

hscan(name, cursor=0, match=None, count=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
# 增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆
 
# 參數:
     # name,redis的name
     # cursor,遊標(基於遊標分批取獲取數據)
     # match,匹配指定key,默認None 表示全部的key
     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
 
# 如:
     # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
     # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
     # ...
     # 直到返回值cursor的值爲0時,表示數據已經經過分片獲取完畢

hscan_iter(name, match=None, count=None)

1
2
3
4
5
6
7
8
9
# 利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
 
# 參數:
     # match,匹配指定key,默認None 表示全部的key
     # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
 
# 如:
     # for item in r.hscan_iter('xx'):
     #     print item

  

List操做,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:

lpush(name,values)

1
2
3
4
5
6
7
8
# 在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊
 
# 如:
     # r.lpush('oo', 11,22,33)
     # 保存順序爲: 33,22,11
 
# 擴展:
     # rpush(name, values) 表示從右向左操做

lpushx(name,value)

1
2
3
4
# 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
 
# 更多:
     # rpushx(name, value) 表示從右向左操做

llen(name)

1
# name對應的list元素的個數

linsert(name, where, refvalue, value))

1
2
3
4
5
6
7
# 在name對應的列表的某一個值前或後插入一個新值
 
# 參數:
     # name,redis的name
     # where,BEFORE或AFTER
     # refvalue,標杆值,即:在它先後插入數據
     # value,要插入的數據

r.lset(name, index, value)

1
2
3
4
5
6
# 對name對應的list中的某一個索引位置從新賦值
 
# 參數:
     # name,redis的name
     # index,list的索引位置
     # value,要設置的值

r.lrem(name, value, num)

1
2
3
4
5
6
7
8
# 在name對應的list中刪除指定的值
 
# 參數:
     # name,redis的name
     # value,要刪除的值
     # num,  num=0,刪除列表中全部的指定值;
            # num=2,從前到後,刪除2個;
            # num=-2,從後向前,刪除2個

lpop(name)

1
2
3
4
# 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
 
# 更多:
     # rpop(name) 表示從右向左操做

lindex(name, index)

1
在name對應的列表中根據索引獲取列表元素

lrange(name, start, end)

1
2
3
4
5
# 在name對應的列表分片獲取數據
# 參數:
     # name,redis的name
     # start,索引的起始位置
     # end,索引結束位置

ltrim(name, start, end)

1
2
3
4
5
# 在name對應的列表中移除沒有在start-end索引之間的值
# 參數:
     # name,redis的name
     # start,索引的起始位置
     # end,索引結束位置

rpoplpush(src, dst)

1
2
3
4
# 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
# 參數:
     # src,要取數據的列表的name
     # dst,要添加數據的列表的name

blpop(keys, timeout)

1
2
3
4
5
6
7
8
# 將多個列表排列,按照從左到右去pop對應列表的元素
 
# 參數:
     # keys,redis的name的集合
     # timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
 
# 更多:
     # r.brpop(keys, timeout),從右向左獲取數據

brpoplpush(src, dst, timeout=0)

1
2
3
4
5
6
# 從一個列表的右側移除一個元素並將其添加到另外一個列表的左側
 
# 參數:
     # src,取出並要移除元素的列表對應的name
     # dst,要插入元素的列表對應的name
     # timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞

自定義增量迭代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 因爲redis類庫中沒有提供對列表元素的增量迭代,若是想要循環name對應的列表的全部元素,那麼就須要:
     # 一、獲取name對應的全部列表
     # 二、循環列表
# 可是,若是列表很是大,那麼就有可能在第一步時就將程序的內容撐爆,全部有必要自定義一個增量迭代的功能:
 
def  list_iter(name):
     """
     自定義redis列表增量迭代
     :param name: redis中的name,即:迭代name對應的列表
     :return: yield 返回 列表元素
     """
     list_count  =  r.llen(name)
     for  index  in  xrange (list_count):
         yield  r.lindex(name, index)
 
# 使用
for  item  in  list_iter( 'pp' ):
     print  item

Set操做,Set集合就是不容許重複的列表

sadd(name,values)

1
# name對應的集合中添加元素

scard(name)

1
獲取name對應的集合中元素個數

sdiff(keys, *args)

1
在第一個name對應的集合中且不在其餘name對應的集合的元素集合

sdiffstore(dest, keys, *args)

1
# 獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中

sinter(keys, *args)

1
# 獲取多一個name對應集合的並集

sinterstore(dest, keys, *args)

1
# 獲取多一個name對應集合的並集,再講其加入到dest對應的集合中

sismember(name, value)

1
# 檢查value是不是name對應的集合的成員

smembers(name)

1
# 獲取name對應的集合的全部成員

smove(src, dst, value)

1
# 將某個成員從一個集合中移動到另一個集合

spop(name)

1
# 從集合的右側(尾部)移除一個成員,並將其返回

srandmember(name, numbers)

1
# 從name對應的集合中隨機獲取 numbers 個元素

srem(name, values)

1
# 在name對應的集合中刪除某些值

sunion(keys, *args)

1
# 獲取多一個name對應的集合的並集

sunionstore(dest,keys, *args)

1
# 獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

1
# 同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大

 

有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。

zadd(name, *args, **kwargs)

1
2
3
4
5
# 在name對應的有序集合中添加元素
# 如:
      # zadd('zz', 'n1', 1, 'n2', 2)
      # 或
      # zadd('zz', n1=11, n2=22)

zcard(name)

1
# 獲取name對應的有序集合元素的數量

zcount(name, min, max)

1
# 獲取name對應的有序集合中分數 在 [min,max] 之間的個數

zincrby(name, value, amount)

1
# 自增name對應的有序集合的 name 對應的分數

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 按照索引範圍獲取name對應的有序集合的元素
 
# 參數:
     # name,redis的name
     # start,有序集合索引發始位置(非分數)
     # end,有序集合索引結束位置(非分數)
     # desc,排序規則,默認按照分數從小到大排序
     # withscores,是否獲取元素的分數,默認只獲取元素的值
     # score_cast_func,對分數進行數據轉換的函數
 
# 更多:
     # 從大到小排序
     # zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
     # 按照分數範圍獲取name對應的有序集合的元素
     # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
     # 從大到小排序
     # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

1
2
3
4
# 獲取某個值在 name對應的有序集合中的排行(從 0 開始)
 
# 更多:
     # zrevrank(name, value),從大到小排序

zrangebylex(name, min, max, start=None, num=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 當有序集合的全部成員都具備相同的分值時,有序集合的元素會根據成員的 值 (lexicographical ordering)來進行排序,而這個命令則能夠返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員
# 對集合中的每一個成員進行逐個字節的對比(byte-by-byte compare), 並按照從低到高的順序, 返回排序後的集合成員。 若是兩個字符串有一部份內容是相同的話, 那麼命令會認爲較長的字符串比較短的字符串要大
 
# 參數:
     # name,redis的name
     # min,左區間(值)。 + 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間
     # min,右區間(值)
     # start,對結果進行分片處理,索引位置
     # num,對結果進行分片處理,索引後面的num個元素
 
# 如:
     # ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga
     # r.zrangebylex('myzset', "-", "[ca") 結果爲:['aa', 'ba', 'ca']
 
# 更多:
     # 從大到小排序
     # zrevrangebylex(name, max, min, start=None, num=None)

zrem(name, values)

1
2
3
# 刪除name對應的有序集合中值是values的成員
 
# 如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

1
# 根據排行範圍刪除

zremrangebyscore(name, min, max)

1
# 根據分數範圍刪除

zremrangebylex(name, min, max)

1
# 根據值返回刪除

zscore(name, value)

1
# 獲取name對應有序集合中 value 對應的分數

zinterstore(dest, keys, aggregate=None)

1
2
# 獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲:  SUM  MIN  MAX

zunionstore(dest, keys, aggregate=None)

1
2
# 獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做
# aggregate的值爲:  SUM  MIN  MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

1
# 同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做

  

其餘經常使用操做

delete(*names)

1
# 根據刪除redis中的任意數據類型

exists(name)

1
# 檢測redis的name是否存在

keys(pattern='*')

1
2
3
4
5
6
7
# 根據模型獲取redis的name
 
# 更多:
     # KEYS * 匹配數據庫中全部 key 。
     # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
     # KEYS h*llo 匹配 hllo 和 heeeeello 等。
     # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

1
# 爲某個redis的某個name設置超時時間

rename(src, dst)

1
# 對redis的name重命名爲

move(name, db))

1
# 將redis的某個值移動到指定的db下

randomkey()

1
# 隨機獲取一個redis的name(不刪除)

type(name)

1
# 獲取name對應值的類型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

1
# 同字符串操做,用於增量迭代獲取key

 

四、管道

redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
import  redis
 
pool  =  redis.ConnectionPool(host = '10.211.55.4' , port = 6379 )
 
=  redis.Redis(connection_pool = pool)
 
# pipe = r.pipeline(transaction=False)
pipe  =  r.pipeline(transaction = True )
 
r. set ( 'name' 'alex' )
r. set ( 'role' 'sb' )
 
pipe.execute()

五、發佈訂閱

發佈者:服務器

訂閱者:Dashboad和數據處理

Demo以下:

  RedisHelper

訂閱者:

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from  monitor.RedisHelper  import  RedisHelper
 
obj  =  RedisHelper()
redis_sub  =  obj.subscribe()
 
while  True :
     msg =  redis_sub.parse_response()
     print  msg

發佈者:

RabbitMQ隊列

安裝python rabbitMQ module 

1
2
3
4
5
6
7
pip install pika
or
easy_install pika
or
源碼
  
https: / / pypi.python.org / pypi / pika

實現最簡單的隊列通訊

 

send端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
                'localhost' ))
channel  =  connection.channel()
 
#聲明queue
channel.queue_declare(queue = 'hello' )
 
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange = '',
                       routing_key = 'hello' ,
                       body = 'Hello World!' )
print ( " [x] Sent 'Hello World!'" )
connection.close()

receive端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#_*_coding:utf-8_*_
__author__  =  'Alex Li'
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
                'localhost' ))
channel  =  connection.channel()
 
 
#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue = 'hello' )
 
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
 
channel.basic_consume(callback,
                       queue = 'hello' ,
                       no_ack = True )
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

  

Work Queues

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少

消息提供者代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
                'localhost' ))
channel  =  connection.channel()
 
#聲明queue
channel.queue_declare(queue = 'task_queue' )
 
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import  sys
 
message  =  ' ' .join(sys.argv[ 1 :])  or  "Hello World!"
channel.basic_publish(exchange = '',
                       routing_key = 'task_queue' ,
                       body = message,
                       properties = pika.BasicProperties(
                       delivery_mode  =  2 # make message persistent
                       ))
print ( " [x] Sent %r"  %  message)
connection.close()

消費者代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  pika,time
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
                'localhost' ))
channel  =  connection.channel()
 
 
 
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
     time.sleep(body.count(b '.' ))
     print ( " [x] Done" )
     ch.basic_ack(delivery_tag  =  method.delivery_tag)
 
 
channel.basic_consume(callback,
                       queue = 'task_queue' ,
                       )
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上  

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

1
2
3
4
5
6
7
8
def  callback(ch, method, properties, body):
     print  " [x] Received %r"  %  (body,)
     time.sleep( body.count( '.' ) )
     print  " [x] Done"
     ch.basic_ack(delivery_tag  =  method.delivery_tag)
 
channel.basic_consume(callback,
                       queue = 'hello' )

  Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered

    

消息持久化  

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable  use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

1
channel.queue_declare(queue = 'hello' , durable = True )

  

Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:

1
channel.queue_declare(queue = 'task_queue' , durable = True )

  

This queue_declare change needs to be applied to both the producer and consumer code.

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

1
2
3
4
5
6
channel.basic_publish(exchange = '',
                       routing_key = "task_queue" ,
                       body = message,
                       properties = pika.BasicProperties(
                          delivery_mode  =  2 # make message persistent
                       ))

消息公平分發

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

 

1
channel.basic_qos(prefetch_count = 1 )

 

帶消息持久化+公平分發的完整代碼

生產者端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.queue_declare(queue = 'task_queue' , durable = True )
 
message  =  ' ' .join(sys.argv[ 1 :])  or  "Hello World!"
channel.basic_publish(exchange = '',
                       routing_key = 'task_queue' ,
                       body = message,
                       properties = pika.BasicProperties(
                          delivery_mode  =  2 # make message persistent
                       ))
print ( " [x] Sent %r"  %  message)
connection.close()

消費者端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
import  pika
import  time
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.queue_declare(queue = 'task_queue' , durable = True )
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
     time.sleep(body.count(b '.' ))
     print ( " [x] Done" )
     ch.basic_ack(delivery_tag  =  method.delivery_tag)
 
channel.basic_qos(prefetch_count = 1 )
channel.basic_consume(callback,
                       queue = 'task_queue' )
 
channel.start_consuming()

  

Publish\Subscribe(消息發佈\訂閱) 

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息


fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

   表達式符號說明:#表明一個或多個字符,*表明任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 

headers: 經過headers 來決定把消息發給哪些queue

消息publisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'logs' ,
                          type = 'fanout' )
 
message  =  ' ' .join(sys.argv[ 1 :])  or  "info: Hello World!"
channel.basic_publish(exchange = 'logs' ,
                       routing_key = '',
                       body = message)
print ( " [x] Sent %r"  %  message)
connection.close()

消息subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#_*_coding:utf-8_*_
__author__  =  'Alex Li'
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'logs' ,
                          type = 'fanout' )
 
result  =  channel.queue_declare(exclusive = True #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name  =  result.method.queue
 
channel.queue_bind(exchange = 'logs' ,
                    queue = queue_name)
 
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] %r"  %  body)
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()

  

有選擇的接收消息(exchange type=direct) 

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

publisher

subscriber 

  

更細緻的消息過濾

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

publisher

subscriber

To receive all the logs run:

python receive_logs_topic.py "#"

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical" 

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

  

Remote procedure call (RPC)

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:

1
2
3
fibonacci_rpc  =  FibonacciRpcClient()
result  =  fibonacci_rpc.call( 4 )
print ( "fib(4) is %r"  %  result)

RPC server

RPC client

相關文章
相關標籤/搜索