緩存,隊列(Redis,RabbitMQ)

Redis

Redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。python

一,安裝及基本使用linux

wget http://download.redis.io/releases/redis-3.0.6.tar.gz
tar xzf redis-3.0.6.tar.gz
cd redis-3.0.6
make

# 啓動服務端
src/redis-server
# 啓動客戶端
src/redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

二,Python操做Redisgit

# 安裝
$ sudo pip install redis
or
$ sudo easy_install redis
or
$ sudo python setup.py install

API的使用github

Redis-py 的API的使用能夠分類爲:正則表達式

  • 鏈接方式
  • 鏈接池
  • 操做
    • String 操做
    • Hash 操做
    • List 操做
    • Set 操做
    • Sort Set 操做
  • 管道
  • 發佈訂閱

1,鏈接方式與鏈接池redis

Redis-py 提供兩個類 Redis 和 StrictRedis 用於實現 Redis 的操做命令,StrictRedis 用於實現大部分官方的命令,並使用官方的語法和命令,Redis 是 StrictRedis 的子類,用於向後兼容舊版本的 Redis-py數據庫

import redis
 
r = redis.Redis(host='192.168.1.5', port=6379)
r.set('foo', 'Bar')
print r.get('foo')

redis-py 使用 connection pool 來管理對一個 redis server 的全部鏈接,避免每次創建、釋放鏈接帶來的額外開銷。默認每一個 Redis 實例都會維護着一個本身的鏈接池。也能夠覆蓋直接創建一個鏈接池,而後做爲參數 Redis,這樣就能夠實現多個 Redis 實例共享一個鏈接池資源。實現客戶端分片或有鏈接如何管理更細的顆粒控制。windows

pool = redis.ConnectionPool(host='192.168.1.5', port=6379)
 
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print r.get('foo')

2,操做緩存

String 操做,String 在內存中格式是一個 name 對應一個 value 來存儲服務器

set(name, value, ex=None, px=None, nx=False, xx=False)

# 在Redis中設置值,默認,不存在則建立,存在則修改 # 參數:  ex,過時時間(秒) px,過時時間(毫秒) nx,若是設置爲True,則只有name不存在時,當前set操做才執行 xx,若是設置爲True,則只有name存在時,崗前set操做才執行

setnx(name, value)

# 設置值,只有name不存在時,執行設置操做(添加)

setex(name, value, time)

# 設置值 # 參數: time,過時時間(數字秒 或 timedelta對象)

psetex(name, time_ms, value)

# 設置值 # 參數: time_ms,過時時間(數字毫秒 或 timedelta對象)

mset(*args, **kwargs)

# 批量設置值 # 如: mset(k1='v1', k2='v2') 或 mget({'k1': 'v1', 'k2': 'v2'})

get(name)

# 獲取值

mget(keys, *args)

# 批量獲取 # 如: mget('ylr', 'nick') 或 r.mget(['ylr', 'nick'])

getset(name, value)

# 設置新值並獲取原來的值

getrange(key, start, end)

 # 獲取子序列(根據字節獲取,非字符) # 參數:  name,Redis 的 name start,起始位置(字節) end,結束位置(字節) # 如: "索寧" ,0-3表示 "索"

setrange(name, offset, value)

# 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加) # 參數:  offset,字符串的索引,字節(一個漢字三個字節) value,要設置的值

setbit(name, offset, value)

# 對name對應值的二進制表示的位進行操做 # 參數: # name,redis的name # offset,位的索引(將值變換成二進制後再進行索引) # value,值只能是 1 或 0 # 注:若是在Redis中有一個對應: n1 = "foo", 那麼字符串foo的二進制表示爲:01100110 01101111 01101111 因此,若是執行 setbit('n1', 7, 1),則就會將第7位設置爲1, 那麼最終二進制則變成 01100111 01101111 01101111,即:"goo"

getbit(name, offset)

# 獲取name對應的值的二進制表示中的某位的值 (0或1)

bitcount(key, start=None, end=None)

 # 獲取name對應的值的二進制表示中 1 的個數 # 參數:  key,Redis的name start,位起始位置 end,位結束位置

bitop(operation, dest, *keys)

# 獲取多個值,並將值作位運算,將最後的結果保存至新的name對應的值 # 參數:  operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或) dest, 新的Redis的name *keys,要查找的Redis的name # 如: bitop("AND", 'new_name', 'n1', 'n2', 'n3') 獲取Redis中n1,n2,n3對應的值,而後講全部的值作位運算(求並集),而後將結果保存 new_name 對應的值中

strlen(name)

# 返回name對應值的字節長度(一個漢字3個字節)

incr(self, name, amount=1)

# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。 # 參數:  name,Redis的name amount,自增數(必須是整數) # 注:同incrb

incrbyfloat(self, name, amount=1.0)

# 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。 # 參數:  name,Redis的name amount,自增數(浮點型)

decr(self, name, amount=1)

# 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。 # 參數:  name,Redis的name amount,自減數(整數)

append(key, value)

# 在redis name對應的值後面追加內容 # 參數:  key, redis的name value, 要追加的字符串

Hash 操做,redis 中 Hash 在內存中的存儲格式相似字典

 hset(name, key, value)

# name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改) # 參數:  name,redis的name key,name對應的hash中的key value,name對應的hash中的value # 注: hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)

hmset(name, mapping)

# 在name對應的hash中批量設置鍵值對 # 參數:  name,redis的name mapping,字典,如:{'k1':'v1', 'k2': 'v2'} # 如: # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

# 在name對應的hash中獲取根據key獲取value

hmget(name, keys, *args)

# 在name對應的hash中獲取多個key的值 # 參數:  name,reids對應的name keys,要獲取key集合,如:['k1', 'k2', 'k3'] *args,要獲取的key,如:k1,k2,k3 # 如: r.mget('xx', ['k1', 'k2']) 或 print r.hmget('xx', 'k1', 'k2')

hgetall(name)

# 獲取name對應hash的全部鍵值

hlen(name)

# 獲取name對應的hash中鍵值對的個數

hkeys(name)

# 獲取name對應的hash中全部的key的值

hvals(name)

# 獲取name對應的hash中全部的value的值

hexists(name, key)

# 檢查name對應的hash是否存在當前傳入的key

hdel(name,*keys)

# 將name對應的hash中指定key的鍵值對刪除

hincrby(name, key, amount=1)

 自增name對應的hash中的指定key的值,不存在則建立key=amount 參數:  name,redis中的name key, hash對應的key  amount,自增數(整數)

hincrbyfloat(name, key, amount=1.0)

# 自增name對應的hash中的指定key的值,不存在則建立key=amount # 參數: # name,redis中的name # key, hash對應的key # amount,自增數(浮點數) # 自增name對應的hash中的指定key的值,不存在則建立key=amount

hscan(name, cursor=0, match=None, count=None)

# 增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而放置內存被撐爆 # 參數: # name,redis的name # cursor,遊標(基於遊標分批取獲取數據) # match,匹配指定key,默認None 表示全部的key # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數 # 如: # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None) # 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None) # ... # 直到返回值cursor的值爲0時,表示數據已經經過分片獲取完畢

hscan_iter(name, match=None, count=None)

# 利用yield封裝hscan建立生成器,實現分批去redis中獲取數據 # 參數: # match,匹配指定key,默認None 表示全部的key # count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數 # 如: # for item in r.hscan_iter('xx'): # print item

List 操做,redis 中的 List 在在內存中按照一個 name 對應一個 List 來存儲,像變量對應一個列表。

lpush(name,values)

# 在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊 # 如: # r.lpush('oo', 11,22,33) # 保存順序爲: 33,22,11 # 擴展: # rpush(name, values) 表示從右向左操做

lpushx(name,value)

# 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊 # 更多: # rpushx(name, value) 表示從右向左操做

llen(name)

# name對應的list元素的個數

linsert(name, where, refvalue, value))

# 在name對應的列表的某一個值前或後插入一個新值 # 參數: # name,redis的name # where,BEFORE或AFTER # refvalue,標杆值,即:在它先後插入數據 # value,要插入的數據

r.lset(name, index, value)

# 對name對應的list中的某一個索引位置從新賦值 # 參數: # name,redis的name # index,list的索引位置 # value,要設置的值
 

r.lrem(name, value, num)

# 在name對應的list中刪除指定的值 # 參數: # name,redis的name # value,要刪除的值 # num, num=0,刪除列表中全部的指定值; # num=2,從前到後,刪除2個; # num=-2,從後向前,刪除2個

lpop(name)

# 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素 # 更多: # rpop(name) 表示從右向左操做

lindex(name, index)

# 在name對應的列表中根據索引獲取列表元素

lrange(name, start, end)

# 在name對應的列表分片獲取數據 # 參數: # name,redis的name # start,索引的起始位置 # end,索引結束位置

ltrim(name, start, end)

# 在name對應的列表中移除沒有在start-end索引之間的值 # 參數: # name,redis的name # start,索引的起始位置 # end,索引結束位置

rpoplpush(src, dst)

# 從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊 # 參數: # src,要取數據的列表的name # dst,要添加數據的列表的name

blpop(keys, timeout)

# 將多個列表排列,按照從左到右去pop對應列表的元素 # 參數: # keys,redis的name的集合 # timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞 # 更多: # r.brpop(keys, timeout),從右向左獲取數據

brpoplpush(src, dst, timeout=0)

# 從一個列表的右側移除一個元素並將其添加到另外一個列表的左側 # 參數: # src,取出並要移除元素的列表對應的name # dst,要插入元素的列表對應的name # timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞

自定義增量迭代

# 因爲redis類庫中沒有提供對列表元素的增量迭代,若是想要循環name對應的列表的全部元素,那麼就須要: # 一、獲取name對應的全部列表 # 二、循環列表 # 可是,若是列表很是大,那麼就有可能在第一步時就將程序的內容撐爆,全部有必要自定義一個增量迭代的功能: def list_iter(name): """ 自定義redis列表增量迭代 :param name: redis中的name,即:迭代name對應的列表 :return: yield 返回 列表元素 """ list_count = r.llen(name) for index in xrange(list_count): yield r.lindex(name, index) # 使用 for item in list_iter('pp'): print item

Set 操做,Set 集合就是不容許重複的列表

sadd(name,values)

# name對應的集合中添加元素

scard(name)

# 獲取name對應的集合中元素個數

sdiff(keys, *args)

# 在第一個name對應的集合中且不在其餘name對應的集合的元素集合

sdiffstore(dest, keys, *args)

# 獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中

sinter(keys, *args)

# 獲取多一個name對應集合的並集

sinterstore(dest, keys, *args)

# 獲取多一個name對應集合的並集,再講其加入到dest對應的集合中

sismember(name, value)

# 檢查value是不是name對應的集合的成員

smembers(name)

# 獲取name對應的集合的全部成員

smove(src, dst, value)

# 將某個成員從一個集合中移動到另一個集合

spop(name)

# 從集合的右側(尾部)移除一個成員,並將其返回

srandmember(name, numbers)

# 從name對應的集合中隨機獲取 numbers 個元素

srem(name, values)

# 在name對應的集合中刪除某些值

sunion(keys, *args)

# 獲取多一個name對應的集合的並集

sunionstore(dest,keys, *args)

# 獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

# 同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大

有序集合,在集合的基礎上,爲每一個元素排序;元素的排序須要根據另一個值來進行比較,因此對於有序集合,每個元素有兩個值:值和分數,分數是專門來作排序的。

zadd(name, *args, **kwargs)
# 在name對應的有序集合中添加元素 # 如: # zadd('zz', 'n1', 1, 'n2', 2) ## zadd('zz', n1=11, n2=22)

 zcard(name)

# 獲取name對應的有序集合元素的數量

zcount(name, min, max)

# 獲取name對應的有序集合中分數 在 [min,max] 之間的個數

zincrby(name, value, amount)

# 自增name對應的有序集合的 name 對應的分數

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

# 按照索引範圍獲取name對應的有序集合的元素 # 參數: # name,redis的name # start,有序集合索引發始位置(非分數) # end,有序集合索引結束位置(非分數) # desc,排序規則,默認按照分數從小到大排序 # withscores,是否獲取元素的分數,默認只獲取元素的值 # score_cast_func,對分數進行數據轉換的函數 # 更多: # 從大到小排序 # zrevrange(name, start, end, withscores=False, score_cast_func=float) # 按照分數範圍獲取name對應的有序集合的元素 # zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float) # 從大到小排序 # zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

# 獲取某個值在 name對應的有序集合中的排行(從 0 開始) # 更多: # zrevrank(name, value),從大到小排序

zrangebylex(name, min, max, start=None, num=None)

# 當有序集合的全部成員都具備相同的分值時,有序集合的元素會根據成員的 值 (lexicographical ordering)來進行排序,而這個命令則能夠返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員 # 對集合中的每一個成員進行逐個字節的對比(byte-by-byte compare), 並按照從低到高的順序, 返回排序後的集合成員。 若是兩個字符串有一部份內容是相同的話, 那麼命令會認爲較長的字符串比較短的字符串要大 # 參數: # name,redis的name # min,左區間(值)。 + 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間 # min,右區間(值) # start,對結果進行分片處理,索引位置 # num,對結果進行分片處理,索引後面的num個元素 # 如: # ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga # r.zrangebylex('myzset', "-", "[ca") 結果爲:['aa', 'ba', 'ca'] # 更多: # 從大到小排序 # zrevrangebylex(name, max, min, start=None, num=None)

zrem(name, values)

# 刪除name對應的有序集合中值是values的成員 # 如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

# 根據排行範圍刪除

zremrangebyscore(name, min, max)

# 根據分數範圍刪除

zremrangebylex(name, min, max)

# 根據值返回刪除

zscore(name, value)

# 獲取name對應有序集合中 value 對應的分數

zinterstore(dest, keys, aggregate=None)

# 獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做 # aggregate的值爲: SUM MIN MAX

zunionstore(dest, keys, aggregate=None)

# 獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做 # aggregate的值爲: SUM MIN MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

# 同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做

其它 經常使用操做

delete(*names)
# 根據刪除redis中的任意數據類型

 exists(name)

# 檢測redis的name是否存在

keys(pattern='*')

# 根據模型獲取redis的name # 更多: # KEYS * 匹配數據庫中全部 key 。 # KEYS h?llo 匹配 hello , hallo 和 hxllo 等。 # KEYS h*llo 匹配 hllo 和 heeeeello 等。 # KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

# 爲某個redis的某個name設置超時時間

rename(src, dst)

# 對redis的name重命名爲

move(name, db))

# 將redis的某個值移動到指定的db下

randomkey()

# 隨機獲取一個redis的name(不刪除)

type(name)

# 獲取name對應值的類型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

# 同字符串操做,用於增量迭代獲取key

3,管道

默認狀況下,redis-py 每次在執行請求時都會建立和斷開一次鏈接操做(鏈接池申請鏈接,歸還鏈接池),若是想要在一次請求中執行多個命令,則可使用 pipline 實現一次請求執行多個命令,而且默認狀況下 pipline 是原子性操做。

見如下實例:

import redis
 
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
 
r = redis.Redis(connection_pool=pool)
 
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
 
r.set('name', 'nick')
r.set('age', '18')
 
pipe.execute()
 

4,發佈和訂閱

發佈者:服務器

訂閱者:Dashboad 和數據處理

發佈訂閱的 Demo 以下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='10.211.55.4')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.chan_sub)
        pub.parse_response()
        return pub
RedisHelper

訂閱者:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
redis_sub = obj.subscribe()
 
while True:
    msg = redis_sub.parse_response()
    print(msg)

發佈者:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
obj.public('hello')

更多參見:https://github.com/andymccurdy/redis-py/   

     http://doc.redisfans.com/

RabbitMQ

RabbitMQ 是信息傳輸的中間者。本質上,從生產者(producers)接收消息,轉發這些消息給消費者(consumers)。換句話說,可以根據指定的規則進行消息轉發,緩衝,和持久化

Windows下安裝

下載Erlang,地址:http://www.erlang.org/download/otp_win32_R15B.exe ,雙擊安裝便可(首先裝)

下載RabbitMQ,地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4.exe ,雙擊安裝便可

# python使用RabbitMQ, 使用python的類庫 pika
下載pika, 地址: pip install -i pika http://pypi.douban.com/simple/

RabbitMQ之Windows環境啓動

# 一、以應用方式啓動
# 後臺啓動
rabbitmq-server -detached

# 直接啓動,若是你關閉窗口或者須要在改窗口使用其餘命令時應用就會中止
Rabbitmq - server

關閉: rabbitmqctl stop

# 二、以服務方式啓動(安裝完以後在任務管理器中服務一欄能看到RabbtiMq)

rabbitmq-service install # 安裝服務

rabbitmq-service start   # 開始服務

Rabbitmq-service stop    # 中止服務

Rabbitmq-service enable  # 使服務有效

Rabbitmq-service disable # 使服務無效

rabbitmq-service help    # 幫助

# 三、Rabbitmq管理插件啓動

rabbitmq-plugins enable rabbitmq_management  # 啓動

rabbitmq-plugins disable rabbitmq_management # 關閉

# 四、Rabbitmq節點管理方式
rabbitmqctl
# 查看隊列
rabbitmqctl list_queues


"""
當rabbitmq-service install以後默認服務是enable的,
    若是這時設置服務爲disable的話,rabbitmq-service start就會報錯。
當rabbitmq-service start正常啓動服務以後,使用disable是沒有效果的
"""
View Code

Linux下安裝

安裝配置epel源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
安裝erlang
   $ yum -y install erlang
 
安裝RabbitMQ
   $ yum -y install rabbitmq-server

注意:service rabbitmq-server start/stop

sudo rabbitmqctl add_user alex 123
# 設置用戶爲administrator角色
sudo rabbitmqctl set_user_tags alex administrator
# 設置權限
sudo rabbitmqctl set_permissions -p "/" alex '.''.''.'

# 而後重啓rabbiMQ服務
sudo /etc/init.d/rabbitmq-server restart
 
# 而後可使用剛纔的用戶遠程鏈接rabbitmq server了。
Linux添加radditmq User並設定權限方可登錄

A,基本操做

Windows

import pika

# ######################### 生產者 #########################

# 鏈接到RabbitMQ服務器,本地測試使用localhost
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  # 創建了rabbit 協議的通道

# 聲明queue
channel.queue_declare(queue='hello')

# 發送消息到上面聲明的hello隊列,
# 其中exchange表示交換器,能精確指定消息應該發送到哪一個隊列,
# routing_key設置爲隊列的名稱,body就是發送的內容
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

print(" [x] Sent 'Hello World!'")
connection.close()
sender.py
import pika
import time


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....",body)


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

rabbitmqctl list_queues        服務端執行命令查看隊列及消息

Linux

在windows下編寫程序遠程鏈接Liunx RabbitMQ服務

import pika


credentials = pika.PlainCredentials('linux_username', 'linux_password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'host_ip_addr', credentials=credentials))

channel = connection.channel() # 創建了rabbit 協議的通道

# 聲明queue
channel.queue_declare(queue='hello')

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
sender.py
import pika
import time

credentials = pika.PlainCredentials('linux_username', 'linux_password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'host_ip_addr', credentials=credentials))


channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):

    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....",body)


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

B,acknowledgment   消息反饋機制 確認(消息不丟失)

no-ack = False,若是消費者因爲某些狀況宕了,那 RabbitMQ 會從新將該任務放入隊列中。

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除;若是RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。這裏不存在timeout概念,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。
這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多;消費者重啓後會重複消費這些消息並重復執行業務邏輯…..

import pika
import time


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....", body)


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

ch.basic_ack(delivery_tag = method.delivery_tag)

去除no_ack=True參數或者設置爲False也能夠

消息確認就是當工做者完成任務後,會反饋給rabbitmq。修改worker.py中的回調函數:

def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....", body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
callback

C,durable   消息持久化存儲(隊列,消息不丟失)

雖然有了消息反饋機制,可是若是rabbitmq自身掛掉的話,那麼任務仍是會丟失。因此須要將任務持久化存儲起來。聲明持久化存儲:

將隊列(Queue)與消息(Message)都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是須要對這種小几率事件也要管理起來,那麼要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此不講解RabbitMQ相關的事務。

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  # 創建了rabbit 協議的通道

# durable=True 聲明持久化存儲
channel.queue_declare(queue='task_queue', durable=True)


channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body='Hello World!',
                      # 在發送任務的時候,用delivery_mode=2來標記消息爲持久化存儲
                      properties=pika.BasicProperties(
                          delivery_mode=2,  
                      ))

print(" [x] Sent 'Hello World!'")
connection.close()
sender.py
import pika
import time


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....", body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    callback,
    queue='task_queue',
    no_ack=False  # 默認爲False
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

D,Fair dispatch(公平調度)

上面實例中,雖然每一個工做者是依次分配到任務,可是每一個任務不必定同樣。可能有的任務比較重,執行時間比較久;有的任務比較輕,執行時間比較短。若是能公平調度就最好了,使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工做者分配多個任務,即只有工做者完成任務以後,纔會再次接收到任務。

import pika
import time


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)


def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(20)
    print(" [x] msg process done....", body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    callback,
    queue='task_queue',
    no_ack=False  # 默認爲False
)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

E,發佈訂閱

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

若是須要將消息廣播出去,讓每一個接收端都能收到,那麼就要使用交換機。

交換機的工做原理:消息發送端先將消息發送給交換機,交換機再將消息發送到綁定的消息隊列,然後每一個接收端都能從各自的消息隊列裏接收到信息。

  • exchange若是爲空,表示是使用匿名的交換機。
  • routing_key在使用匿名交換機的時候才須要指定,表示發送到哪一個隊列的意思。

轉發器可分爲三種類型,以下

I,fanout

白話來說,相似於收聽廣播,具備實時性。

較於以前的sender.py,做了倆個改動

  • 定義交換機
  • 不是將消息發送到hello隊列,而是發送到交換機
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 定義交換機
channel.exchange_declare(exchange='messages', type='fanout')

# 將消息發送到交換機
channel.basic_publish(exchange='messages', routing_key='', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
sender.py

上示代碼中, basic_publish方法的參數exchange被設定爲相應交換機,由於是要廣播出去,發送到全部隊列,因此routing_key就不須要設定了。

rabbitmqctl list_exchanges 命令 來查看交換機信息

較於以前的recevie.py,也主要做了倆處改動

  • 定義交換機
  • 不使用hello隊列了,隨機生成一個臨時隊列,並綁定到交換機上
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

# 定義交換機
channel.exchange_declare(exchange='messages', type='fanout')

# 隨機生成隊列,並綁定到交換機上
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='messages', queue=queue_name)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body,))


channel.basic_consume(callback, queue=queue_name, no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

上示代碼中,queue_declare的參數exclusive=True表示當接收端退出時,銷燬臨時產生的隊列,這樣就不會佔用資源。

打開另一個終端,執行send.py,能夠觀察到receive.py接收到了消息。若是有多個終端執行receive.py,那麼每一個receive.py都會接收到消息。

II,direct

上述實例中,說明了關於交換機的使用,已經能實現給全部接收端發送消息,可是若是須要自由定製,有的消息發給其中一些接收端,有些消息發送給另一些接收端,要怎麼辦呢?這種狀況下就要用到路由鍵了。

路由鍵的工做原理:每一個接收端的消息隊列在綁定交換機的時候,能夠設定相應的路由鍵。發送端經過交換機發送信息時,能夠指明路由鍵 ,交換機會根據路由鍵把消息發送到相應的消息隊列,這樣接收端就能接收到消息了。

實例的功能: 將info、warning、error三種級別的信息發送到不一樣的接收端。

較於fanout類型send.py,做了倆處改動:

  • 設定交換機的類型(type)爲direct。上一篇是設置爲fanout,表示廣播的意思,會將消息發送到全部接收端,這裏設置爲direct表示要根據設定的路由鍵來發送消息。
  • 發送信息時設置發送的路由鍵。
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 定義交換機,設置類型爲direct
channel.exchange_declare(exchange='messages', type='direct')

# 定義三個路由鍵
routings = ['info', 'warning', 'error']

# 將消息依次發送到交換機,並設置路由鍵
for routing in routings:
    message = '%s message.' % routing
    channel.basic_publish(exchange='messages',
                          routing_key=routing,
                          body=message)
    print(message)

connection.close()
send.py

較於fanout類型receive.py,做了三處改動:

  • 設定交換機的類型(type)爲direct。
  • 增長命令行獲取參數功能,參數即爲路由鍵。
  • 將隊列綁定到交換機上時,設定路由鍵。
import pika, sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 定義交換機,設置類型爲direct
channel.exchange_declare(exchange='messages', type='direct')

# 從命令行獲取路由鍵參數,若是沒有,則設置爲info
routings = sys.argv[1:]
if not routings:
    routings = ['info']

# 生成臨時隊列,並綁定到交換機上,設置路由鍵
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
    channel.queue_bind(exchange='messages',
                       queue=queue_name,
                       routing_key=routing)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body,))


channel.basic_consume(callback, queue=queue_name, no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

打開兩個終端,一個運行代碼python receive.py info warning,表示只接收info和warning的消息。另一個終端運行send.py,能夠觀察到接收終端只接收到了info和warning的消息。若是打開多個終端運行receive.py,並傳入不一樣的路由鍵參數,能夠觀察到更明顯的效果。

當接收端正在運行時,可使用rabbitmqctl list_bindings來查看綁定狀況。

III,topic

direct模式以將消息發送到相應的隊列,這裏的路由鍵是要徹底匹配,好比info消息的只能發到路由鍵爲info的消息隊列。

路由鍵模糊匹配,就是可使用正則表達式,和經常使用的正則表示式不一樣,這裏的話「#」表示全部、所有的意思;「*」只匹配到一個詞。

本次將不做比較了,經過上述倆種模式,此種模式相對容易理解

要進行路由鍵模糊匹配,因此交換機的類型要設置爲topic,設置爲topic,就可使用#,*的匹配符號了。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 定義交換機,設置類型爲topic
channel.exchange_declare(exchange='messages', type='topic')

# 定義路由鍵   定義了四種類型的消息
routings = ['happy.work', 'happy.life', 'sad.work', 'sad.life']

# 將消息依次發送到交換機,並設定路由鍵
for routing in routings:
    message = '%s message.' % routing
    channel.basic_publish(exchange='messages',
                          routing_key=routing,
                          body=message)
    print(message)

connection.close()
send.py
import pika, sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()

# 定義交換機,設置類型爲topic
channel.exchange_declare(exchange='messages', type='topic')

# 從命令行獲取路由參數,若是沒有,則報錯退出
routings = sys.argv[1:]
if not routings:
    print('>>', sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[0],))
    exit()

# 生成臨時隊列,並綁定到交換機上,設置路由鍵
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
    channel.queue_bind(exchange='messages',
                       queue=queue_name,
                       routing_key=routing)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body,))


channel.basic_consume(callback, queue=queue_name, no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
receive.py

執行流程

# 打開四個終端,一個運行以下,表示任何事情均可以和她說:

python receive.py #

# 另一個終端 運行以下,表示能夠和她分享開心的事:

python receive.py happy.*

# 第三個運行以下,表示工做上的事情能夠和她分享:

python receive.py *.work

# 最後一個運行python send.py
result

寫在此模式最後:

一、發送信息時,若是不設置路由鍵,那麼路由鍵設置爲 * 的接收端是否能接收消息

發送信息時,若是不設置路由鍵,默認是表示廣播出去,理論上全部接收端均可以收到消息,可是筆者試了下,路由鍵設置爲"*"的接收端收不到任何消息。  只有發送消息時,設置路由鍵爲一個詞,路由鍵設置爲"*"的接收端才能收到消息。在這裏,每一個詞使用"."符號分開的。

二、發送消息時,若是路由鍵設置爲 ..,那麼路由鍵設置爲 #.* 的接收端是否能接收到消息?若是發送消息時,路由鍵設置爲一個詞呢

經測試,ok

三、a.*.# 和 a.# 的區別

"a.#"只要字符串開頭的一個詞是a就能夠了,好比a、a.haha、a.haha.haha。而這樣的詞是不行的,如abs、abc、abc.haha。 "a.*.#"必需要知足a.*的字符串才能夠,好比a.、a.haha、a.haha.haha。而這樣的詞是不行的,如a。

F,遠程結果返回(交互式)

前面的例子都有個共同點,就是發送端發送消息出去後沒有結果返回。若是隻是單純發送消息,固然沒有問題了,可是在實際中,經常會須要接收端將收到的消息進行處理以後,返回給發送端。

處理方法描述:發送端在發送信息前,產生一個接收消息的臨時隊列,該隊列用來接收返回的結果。

DEMO1

假設有一個控制中心和一個計算節點,控制中心會將一個天然數N發送給計算節點,計算節點將N值加1後,返回給控制中心。這裏用sender.py模擬控制中心,recevie.py模擬計算節點。

import pika


class Center(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        # 定義接收返回消息的隊列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response,
                                   no_ack=True,
                                   queue=self.callback_queue)

    # 定義接收到返回消息的處理方法
    def on_response(self, ch, method, props, body):
        self.response = body

    def request(self, n):
        self.response = None
        # 發送計算請求,並聲明返回隊列
        self.channel.basic_publish(exchange='',
                                   routing_key='compute_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                   ),
                                   body=str(n))
        # 接收返回的數據
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


center = Center()

print(" [x] Requesting increase(30)")
response = center.request(30)
print(" [.] Got %r" % (response,))
sender.py
import pika, time

# 鏈接rabbitmq服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

# 定義隊列
channel.queue_declare(queue='compute_queue')
print(' [*] Waiting for n')


# 將n值加1
def increase(n):
    time.sleep(10)
    return n + 1


# 定義接收到消息的處理方法
def request(ch, method, properties, body):
    print(" [.] increase(%s)" % (body,))

    response = increase(int(body))

    # 將計算結果發送回控制中心
    ch.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(request, queue='compute_queue')

channel.start_consuming()
receive.py

上例代碼定義了接收返回數據的隊列和處理方法,而且在發送請求的時候將該隊列賦值給reply_to,在計算節點代碼中就是經過這個參數來獲取返回隊列的。

DEMO2

假設有多個計算節點,控制中心開啓多個線程,往這些計算節點發送數字,要求計算結果並返回,可是控制中心只開啓了一個隊列,全部線程都是從這個隊列裏獲取消息,每一個線程如何肯定收到的消息就是該線程對應的呢?這個就是correlation id的用處了。correlation翻譯成中文就是相互關聯,也表達了這個意思。

correlation id運行原理:控制中心發送計算請求時設置correlation id,然後計算節點將計算結果,連同接收到的correlation id一塊兒返回,這樣控制中心就能經過correlation id來標識請求。其實correlation id也能夠理解爲請求的惟一標識碼。

相關文章
相關標籤/搜索