Python下操做Memcache/Redis/RabbitMQ說明

 

1、Memcache
Memcache是一套分佈式的高速緩存系統,由LiveJournal的Brad Fitzpatrick開發,但目前被許多網站使用以提高網站的訪問速度,尤爲對於一些大型的、須要頻繁訪問數據庫的網站訪問速度提高效果十分顯著。node

MemCache的工做流程以下:
先檢查客戶端請求數據是否在memcached中,若有,直接把請求數據返回,再也不對數據庫進行任何操做;若是請求的數據不在memcached中,就去查數據庫,把從數據庫中獲取的數據返回給客戶端,同時把數據緩存一份到memcached中(memcached客戶端不負責,須要程序明確實現);每次更新數據庫的同時更新memcached中的數據,保證一致性;當分配給memcached內存空間用完以後,會使用LRU(Least Recently Used,最近最少使用)策略加上到期失效策略,失效數據首先被替換,而後再替換掉最近未使用的數據。python

Memcache是一個高性能的分佈式的內存對象緩存系統,經過在內存裏維護一個統一的巨大的hash表,它可以用來存儲各類格式的數據,包括圖像、視頻、文件以及數據庫檢索的結果等。簡單的說就是將數據調用到內存中,而後從內存中讀取,從而大大提升讀取速度。linux

Memcached是以守護程序(監聽)方式運行於一個或多個服務器中,隨時會接收客戶端的鏈接和操做,特性以下:
-  在Memcached中能夠保存的item數據量是沒有限制的,只要內存足夠 。
-  Memcached單進程在32位系統中最大使用內存爲2G,若在64位系統則沒有限制,這是因爲32位系統限制單進程最多可以使用2G內存,要使用更多內存,能夠分多個端口開啓多個  Memcached進程 ,
-  最大30天的數據過時時間,設置爲永久的也會在這個時間過時,常量REALTIME_MAXDELTA
-  60*60*24*30控制
-  最大鍵長爲250字節,大於該長度沒法存儲,常量KEY_MAX_LENGTH 250控制
-  單個item最大數據是1MB,超過1MB數據不予存儲,常量POWER_BLOCK 1048576進行控制,
-  它是默認的slab大小
-  最大同時鏈接數是200,經過 conn_init()中的freetotal進行控制,最大軟鏈接數是1024,經過 settings.maxconns=1024 進行控制
-  跟空間佔用相關的參數:settings.factor=1.25, settings.chunk_size=48, 影響slab的數據佔用和步進方式 redis

memcached是一種無阻塞的socket通訊方式服務,基於libevent庫,因爲無阻塞通訊,對內存讀寫速度很是之快。
memcached分服務器端和客戶端,能夠配置多個服務器端和客戶端,應用於分佈式的服務很是普遍。
memcached做爲小規模的數據分佈式平臺是十分有效果的。
memcached是鍵值一一對應,key默認最大不能超過128個字 節,value默認大小是1M,也就是一個slabs,若是要存2M的值(連續的),不能用兩個slabs,由於兩個slabs不是連續的,沒法在內存中 存儲,故須要修改slabs的大小,多個key和value進行存儲時,即便這個slabs沒有利用完,那麼也不會存放別的數據。
memcached已經能夠支持C/C++、Perl、PHP、Python、Ruby、Java、C#、Postgres、Chicken Scheme、Lua、MySQL和Protocol等語言客戶端。數據庫

Memcache的安裝centos

[root@linux-node1 ~]#  yum install libevent-devel -y
[root@linux-node1 ~]#  yum install memcached -y

Memcache的 啓動緩存

[root@linux-node1 ~]# memcached -d -m 10 -u root -l 192.168.56.11 -p 11211 -c 256 -P /tmp/memcached.pid
[root@linux-node1 ~]# netstat -antlp|grep 11211
tcp        0      0 192.168.56.11:11211     0.0.0.0:*               LISTEN      2251/memcached 

參數說明
-d 是啓動一個守護進程
-m 是分配給Memcache使用的內存數量,單位是MB
-u 是運行Memcache的用戶
-l 是監聽的服務器IP地址
-p 是設置Memcache監聽的端口,最好是1024以上的端口
-c 選項是最大運行的併發鏈接數,默認是1024,按照你服務器的負載量來設定
-P 是設置保存Memcache的pid文件

使用python操做Memcached
安裝APIbash

[root@linux-node1 ~]#  yum install -y python-memcached
[root@linux-node1 ~]# python
Python 2.7.5 (default, Jun 24 2015, 00:41:19) 
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import memcache
>>> quit()

1)Memcache的第一次服務器

[root@linux-node1 ~]# python
Python 2.7.5 (default, Jun 24 2015, 00:41:19) 
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import memcache
>>> mc =memcache.Client(['192.168.56.11:11211'],debug=True)
>>> mc.set("foo","bar")
True
>>> ret = mc.get('foo')
>>> print ret

舒適提示:上面的debug=True;是表示運行出現錯誤的時候,顯示錯誤信息,上線後,移除該參數。數據結構

2)Memcache和集羣是一對好基友
python-memcached模塊原生支持集羣的操做,其原理是在內存維護一個主機列表

#!/usr/bin/env python
# coding:utf-8
import memcache

mc = memcache.Client([('192.168.56.11',1),('192.168.56.12',2)],debug=True)
# 主機列表:li = [192.168.56.11:11211,192.168.56.12:11211,192.168.56.13:11211,]

mc.set("foo","234")
# 1:將"foo"字符串轉換成數字
# 2:將數字和主機列表長度求餘數
# 3:li[餘數],能夠獲得主機和IP
# 4:鏈接主機ip 將foo = 234 保存到mem上

ret = mc.get('foo')
print ret

3)add
添加一條鍵值對,若是已經存在的key,重複執行add操做異常。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache

mc = memcache.Client(['192.168.56.11:11211'],debug=True)
mc.add('k1','v1')

屢次執行會報錯

[root@linux-node1 test]# python 1.py 
[root@linux-node1 test]# python 1.py 
MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'

4)replace
修改某個key的值,若是key值不存在,則異常

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache

mc = memcache.Client(['192.168.56.11:11211'],debug=True)
# 若是存在k1,則替換成功;
mc.replace('k1','v1v1v1')

不存在則報錯

[root@linux-node1 test]# cat 2.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache

mc = memcache.Client(['192.168.56.11:11211'],debug=True)
# 若是存在k1,則替換成功;
mc.replace('k1k1','v1v1v1')

[root@linux-node1 test]# python 2.py 
MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED'

5)set和set_multi
set                  設置一個鍵值對,若是key不存在,則建立,若是key存在,則修改
set_multi        設置多個鍵值對,若是key不存在,則建立,若是key存在,則修改

[root@linux-node1 test]# cat 3.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache

mc = memcache.Client(['192.168.56.11:11211'],debug=True)
mc.set('key1','ccccc')
# 一個鍵值對

mc.set_multi({'key2': 'ggggg','key3': 'tttt'})

[root@linux-node1 test]# python 3.py

6)delete和delete_multi
delete 在Memcached中刪除指定的一個鍵值對
delete_multi 在Memcached中刪除指定的多個鍵值對

[root@linux-node1 test]# cat 4.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache

mc = memcache.Client(['192.168.56.11:11211'],debug=True)
mc.delete('key1')
# 一個鍵值對

mc.delete_multi(['key2','key3'])

[root@linux-node1 test]# python 4.py

7)get和get_multi
get              獲取一個鍵值對
get_multi    取多一個鍵值對

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['192.168.56.11:11211], debug=True)
 
val = mc.get('key1')
item_dict = mc.get_multi(["key2", "key3"])

8)append和prepend
append     修改指定key的值,在該值 後面 追加內容
prepend    修改指定key的值,在該值 前面 插入內容

[root@linux-node1 test]# cat 5.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache

mc = memcache.Client(['192.168.56.11:11211'],debug=True)
# 假設k1 = v1

mc.append('k1','after')
# 在v1的後面加after

mc.prepend('k1','before')
# 在v1的前面加before

9)decr和incr
incr    自增,將Memcached中的某一個值增長 N ( N默認爲1 )
decr  自減,將Memcached中的某一個值減小 N ( N默認爲1 )

#!/usr/bin/env python
# coding:utf-8
import memcache

mc = memcache.Client(['192.168.56.11',1],debug=True)

mc.set('k1','123')

# 默認變化的值爲1 
mc.incr('k1',10)
# k1的值會變成133

mc.decr('k1',23)
# k1的值會變成110

10)若是用終端,同時操做一組數據的話,會出現非正常數據,使用gets和cas來解決

#!/usr/bin/env python
# coding:utf-8
import memcache

mc = memcache.Client(['192.168.56.11',1],debug=True)
# 假設k1=1111
mc.gets('k1')
# 此時A用戶gets了數據。
# 進行了減法操做
# 來了B用戶,也對k1進行了操做,這時候,執行下面的操做就會報錯
mc.cas('product_count', "1110")

解析:
本質上每次執行gets時,會從memcache中獲取一個自增的數字,經過cas去修改gets的值時,會攜帶以前獲取的自增值和memcache中的自增值進行比較,若是相等,則能夠提交,若是不想等,那表示在gets和cas執行之間,又有其餘人執行了gets(獲取了緩衝的指定值), 如此一來有可能出現非正常數據,則不容許修改。

2、Redis
簡介:
Redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。

Redis支持主從同步。數據能夠從主服務器向任意數量的從服務器上同步,從服務器能夠是關聯其餘從服務器的主服務器。這使得Redis可執行單層樹複製。存盤能夠有意無心的對數據進行寫操做。因爲徹底實現了發佈/訂閱機制,使得從數據庫在任何地方同步樹時,可訂閱一個頻道並接收主服務器完整的消息發佈記錄。同步對讀取操做的可擴展性和數據冗餘頗有幫助。

Redis與memcache的比較
Redis 是一個高性能的key-value數據庫。 redis的出現,很大程度補償了memcached這類key/value存儲的不足,在部 分場合能夠對關係數據庫起到很好的補充做用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客戶端,使用很方便。

數據模型:
Redis的外圍由一個鍵、值映射的字典構成。與其餘非關係型數據庫主要不一樣在於:Redis中值的類型
不只限於字符串,還支持以下抽象數據類型:
-  字符串列表
-  無序不重複的字符串集合
-  有序不重複的字符串集合
-  鍵、值都爲字符串的哈希表
-  值的類型決定了值自己支持的操做。Redis支持不一樣無序、有序的列表,無序、有序的集合間的交集、並集等高級服務器端原子操做。

數據結構:
redis提供五種數據類型:string,hash,list,set及zset(sorted set)。

存儲:
-  redis使用了兩種文件格式:全量數據和增量請求。
-  全量數據格式是把內存中的數據寫入磁盤,便於下次讀取文件進行加載;
-  增量請求文件則是把內存中的數據序列化爲操做請求,用於讀取文件進行replay獲得數據,序列化的操做包括SET、RPUSH、SADD、ZADD。
-  redis的存儲分爲內存存儲、磁盤存儲和log文件三部分,配置文件中有三個參數對其進行配置。
-  save seconds updates,save配置,指出在多長時間內,有多少次更新操做,就將數據同步到數據文件。這個能夠多個條件配合,好比默認配置文件中的設置,就設置了三個條件。
-  appendonly yes/no ,appendonly配置,指出是否在每次更新操做後進行日誌記錄,若是不開啓,可能會在斷電時致使一段時間內的數據丟失。由於redis自己同步數據文件是按上面的save條件來同步的,因此有的數據會在一段時間內只存在於內存中。
-  appendfsync no/always/everysec ,appendfsync配置,no表示等操做系統進行數據緩存同步到磁盤,always表示每次更新操做後手動調用fsync()將數據寫到磁盤,everysec表示每秒同步一次。

安裝啓動:
直接yum安裝:

[root@linux-node1 ~]#  yum -y install redis
[root@linux-node1 ~]# systemctl start redis
[root@linux-node1 ~]# ps -ef |grep redis
redis     16359      1  0 16:55 ?        00:00:00 /usr/bin/redis-server 127.0.0.1:6379
root      16363  16335  0 16:56 pts/0    00:00:00 grep --color=auto redis

啓動客戶端:

[root@linux-node1 ~]# redis-cli 
127.0.0.1:6379> set foo bar
OK
127.0.0.1:6379> get foo
"bar"

使用python操做redis
首先安裝組件:

[root@linux-node1 ~]# yum -y install python-pip
[root@linux-node1 ~]# pip install redis

經常使用操做
1)操做模式
redis-py提供兩個類Redis和StictRedis用於實現Redis命令,StrictRedis用於實現大部分官方的功能,並使用官方的語法和命令,Redis是StricRedis的子類,用於向後兼容舊版本的redis-py。

[root@linux-node1 test]# cat 6.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
r = redis.Redis(host='127.0.0.1',port=6379)
r.set('foo','bar')
print r.get('foo')
[root@linux-node1 test]# python 6.py 
bar

2)鏈接池
redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。

[root@linux-node1 test]# cat 6.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
pool = redis.ConnectionPool(host='127.0.0.1',port=6379)
r=redis.Redis(connection_pool=pool)
r.set('foo','bar')
print r.get('foo')
[root@linux-node1 test]# python 6.py 
bar

3)管道
redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
pool = redis.ConnectionPool(host='127.0.0.1',port=6379)
r=redis.Redis(connection_pool=pool)

#pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
r.set('foo','bar')
r.set('foo111','bar111')
ret = pipe.execute()

4)發佈訂閱

頻道:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='10.211.55.4')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.chan_sub)
        pub.parse_response()
        return pub

訂閱者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from monitor.RedisHelper import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscribe()

while True:
    msg= redis_sub.parse_response()
    print msg

發佈者

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from monitor.RedisHelper import RedisHelpe
obj = RedisHelper()
obj.public('hello')

3、RabbitMQ
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。

RabbitMQ的安裝

### 安裝配置epel源,若是你是centos7,請換epel源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
### 安裝erlang
   $ yum -y install erlang
### 安裝RabbitMQ
   $ yum -y install rabbitmq-server
### 啓動
[root@linux-node1 test]# /usr/sbin/rabbitmq-server start

[root@linux-node1 test]# ps -ef |grep rabbitmq-server 
root      17096  16335  0 18:36 pts/0    00:00:00 /bin/sh /usr/sbin/rabbitmq-server start 
root      17105  17096  0 18:36 pts/0    00:00:00 su rabbitmq -s /bin/sh -c /usr/lib/rabbitmq/bin/rabbitmq-server  "start" 
root      17181  16510  0 18:36 pts/3    00:00:00 grep --color=auto rabbitmq-server

安裝API

[root@linux-node1 test]# pip install pika

使用API操做RabbitMQ
基於Queue實現生產者消費者模型

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
 
 
message = Queue.Queue(10)
 
 
def producer(i):
    while True:
        message.put(i)
 
 
def consumer(i):
    while True:
        msg = message.get()
 
 
for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()
 
for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
生產者:

#!/usr/bin/env python
# coding:utf-8
import pika
# 建立一個鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
# 打開一個通道
channel = connection.channel()
# 隊列聲明叫hello
channel.queue_declare(queue='hello')
# 向隊列中插入數據,往hello對隊列中插入hello world
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消費者:

#!/usr/bin/env python
# coding:utf-8
import pika
# 建立一個鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
# 打開一個通道,建立頻道
channel = connection.channel()
# 隊列聲明叫hello
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
# 消費隊列中的內容,同時執行callback函數,callback函數中的body就是拿取到的內容
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)      # 出現斷開的狀況,數據就會丟失;這種方式發佈者和聽衆是沒有相互應答的
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

1)acknowledgment 消息不丟失(消費者失聯)
no-ack = False,若是生產者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。在消費者中操做

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)
# 這裏使用False

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2)durable消息不丟失
若是是生產者掛掉了呢,你說你咋整?消息隊列能夠作持久化,哈哈哈
生產者

#!/usr/bin/env python
# coding:utf-8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

# 聲明這個消息隊列的時候就要使用durable來聲明持久化
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))    # 指定消息的時候,也是要持久化
print(" [x] Sent 'Hello World!'")
connection.close()

消費者

#!/usr/bin/env python
# coding:utf-8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()

# 這個無關緊要
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

已經存在的消息隊列,沒法進行持久化,只能從新生成隊列才行。

3)消息獲取順序
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列。這個須要在消費者上來操做。

#!/usr/bin/env python
# coding:utf-8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)
# 誰來誰取
channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

4)發佈訂閱
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。
exchange type = fanout 表示能夠給多個隊列發數據,能夠理解爲廣播
發佈者

#!/usr/bin/env python
# coding:utf-8
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=((‘
127.0.0.1
'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',  # logs隨便起名字
                         type='fanout') # 能夠給多個隊列發數據

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

訂閱者一

#!/usr/bin/env python
# coding:utf-8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
# 定義聲明,爲logs
channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 生成了一個隨機名稱

# 讓隊列和exchange進行綁定,之後生產者發送來數據,exchange就會往queue中轉發數據
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

# 一直去取數據
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

訂閱者二

#!/usr/bin/env python
# coding:utf-8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
# 定義聲明,爲logs
channel.exchange_declare(exchange='logs',
                         type='fanout')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 生成了一個隨機名稱
 
# 讓隊列和exchange進行綁定,之後生產者發送來數據,exchange就會往queue中轉發數據
channel.queue_bind(exchange='logs',
                   queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
# 一直去取數據
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

能夠同時使用兩個來測試,同時都能接收到數據

5)關鍵字發送
exchange type = direct
以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
訂閱者一:

#!/usr/bin/env python
# coding:utf-8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key='aaa')
 
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='bbb')

print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

訂閱者二

#!/usr/bin/env python
# coding:utf-8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key='bbb')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='ccc') 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

生產者

#!/usr/bin/env python
# coding:utf-8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
                         type='direct')
 
message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key='ccc',    # 指定訂閱者的隊列關鍵字是ccc
                      body=message)
print(" [x] Sent %r" % (message))

此時,只有訂閱者二符合,會有數據收到

6)模糊匹配
exchange type = topic
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。
#  表示能夠匹配 0 個 或 多個 單詞
*  表示只能匹配 一個 單詞
訂閱者1

#!/usr/bin/env python
# coding:utf-8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key='cgt.*')
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

訂閱者2

#!/usr/bin/env python
# coding:utf-8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key='cgt.#')
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

生產者

#!/usr/bin/env python
# coding:utf-8
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='127.0.0.1'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key='cgt.caoxiaojian.com',
                      body=message)
print(" [x] Sent %r:%r" % (message))
connection.close()

這樣測試,只有訂閱者2收到,由於他是#

相關文章
相關標籤/搜索