WEEK11:redis緩存、rabbitMQ隊列

  • RabbitMQ 消息隊列 
    python調用須要安裝pika模塊
    實現最簡單的隊列通訊,消息代理就是一箇中間軟件模塊,把消息從一個軟件服務傳遞到另一個軟件服務上去。
    • 簡易消費者生產者模型
      • 消費者
         1 import pika
         2 
         3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
         4 #聲明一個管道
         5 channel = connection.channel()
         6 # 聲明隊列
         7 channel.queue_declare(queue='hello',durable=True)#durable=True使隊列持久化
         8 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
         9 channel.basic_publish(exchange='', #定義
        routing_key='hello',
        body='Hello World!',
        properties=pike.BasicProperties(delivery_mode=2,abc='alex')
        )#delivery_mode=2使消息持久化 10 print(" [x] Sent 'Hello World!'") 11 connection.close()
      • 生產者
         1 import pika
         2 
         3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
         4 channel = connection.channel()
         5 #若是確認這個隊列聲明過了,則能夠不在此聲明。若是不肯定消費者或者生產者先運行,那麼能夠在二者中都進行聲明。
         6 channel.queue_declare(queue='hello',durable=True)#durable=True使隊列持久化
         7 
         8 def callback(ch, method, properties, body):
         9     #ch是管道內存對象地址
        10     #method是包含給接收消息者等屬性
        11     print(ch,method,properties)
        12     print(" [x] Received %r" % body)
        #客戶端對服務端的消息進行手動確認
        ch.basic_ack(delivery_tag=method.delivery_tag)
        channel.basic_qos(prefetch_count=1)#實現廣播效果,消息公平分發
        13 #消費消息 14 channel.basic_consume(callback,#若是收到消息,就調用callback函數來處理消息 15 queue='hello')#從哪個隊列裏接收消息17 18 print(' [*] Waiting for messages. To exit press CTRL+C') 19 #開始接收消息 20 channel.start_consuming()
    • 五種隊列(https://www.cnblogs.com/ysocean/p/9251884.html)
      • 簡單隊列
        一個生產者對應一個消費者。注意這裏消費者有自動確認消息和手動確認消息兩種模式
      • work模式(競爭消費者模式)
        一個生產者對應多個消費之,但只能有一個消費者得到消息。效率高的消費者消費消息多,能夠用來進行負載均衡。
      • 發佈訂閱模式
        一個消費者將消息首先發送到交換器,交換器綁定到多個隊列,而後被監聽該隊列的消費者所接受並消費。X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這裏的交換器是 fanout。
      • 路由模式
        生產者將消息發送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發送的消息會指定一個路由key,那麼消息只會發送到相應key相同的隊列,接着監聽該隊列的消費者消費消息。也就是讓消費者有選擇的接收消息。
      • 主題模式
        上面的路由模式是根據路由key進行完整的匹配(徹底相等才發送消息),這裏的通配符模式通俗的來說就是模糊匹配。
        符號「#」表示匹配一個或多個詞,符號「*」表示匹配一個詞。
    • 四種交換機
      交換器分爲四種,分別是:direct、fanout、topic和 headers。前面三種分別對應路由模式、發佈訂閱模式和通配符模式,headers 交換器容許匹配 AMQP 消息的 header 而非路由鍵,除此以外,header 交換器和 direct 交換器徹底一致,可是性能卻差不少,所以基本上不會用到該交換器。
      • direct
        若是路由鍵徹底匹配的話,消息纔會被投放到相應的隊列。(經過routingKey和exchange決定的那個惟一的queue能夠接收消息)
        • 生產者javascript

           1 import pika
           2 import sys
           3 
           4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
           5 channel = connection.channel()
           6 
           7 channel.exchange_declare(exchange='direct_logs',
           8                          type='direct')
           9 
          10 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
          11 message = ' '.join(sys.argv[2:]) or 'Hello World!'
          12 channel.basic_publish(exchange='direct_logs',
          13                       routing_key=severity,
          14                       body=message)
          15 print(" [x] Sent %r:%r" % (severity, message))
          16 connection.close()
          生產者
        • 消費者 html

           1 import pika
           2 import sys
           3 
           4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
           5 channel = connection.channel()
           6 channel.exchange_declare(exchange='direct_logs',
           7                          type='direct')
           8 
           9 result = channel.queue_declare(exclusive=True)
          10 queue_name = result.method.queue
          11 
          12 severities = sys.argv[1:]
          13 if not severities:
          14     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
          15     sys.exit(1)
          16 
          17 for severity in severities:
          18     channel.queue_bind(exchange='direct_logs',
          19                        queue=queue_name,
          20                        routing_key=severity)
          21 
          22 print(' [*] Waiting for logs. To exit press CTRL+C')
          23 
          24 
          25 def callback(ch, method, properties, body):
          26     print(" [x] %r:%r" % (method.routing_key, body))
          27 
          28 
          29 channel.basic_consume(callback,
          30                       queue=queue_name,
          31                       no_ack=True)
          32 
          33 channel.start_consuming()
          消費者
      • fanout
        當發送一條消息到fanout交換器上時,它會把消息投放到全部附加在此交換器上的隊列。(全部綁定到此exchange的隊列均可以接收消息(消費者相似收音機,生產者相似電臺))
        • 生產者
           1 import pika
           2 import sys
           3 
           4 connection = pika.BlockingConnection(pika.ConnectionParameters(
           5     host='localhost'))
           6 channel = connection.channel()
           7 channel.exchange_declare(exchange='logs',#轉發器
           8                          type='fanout')#設置模式
           9 
          10 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
          11 channel.basic_publish(exchange='logs',
          12                       routing_key='',
          13                       body=message)
          14 print(" [x] Sent %r" % message)
          15 connection.close()
          生產者
        • 消費者 
           1 import pika
           2 
           3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
           4 channel = connection.channel()
           5 
           6 channel.exchange_declare(exchange='logs',#轉發器
           7                          type='fanout')
           8 #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
           9 result = channel.queue_declare(exclusive=True)  # exclusive排他,惟一的
          10 
          11 queue_name = result.method.queue #獲取隊列名字
          12 
          13 channel.queue_bind(exchange='logs',queue=queue_name) #將隊列綁定到名爲logs的exchange
          14 
          15 print(' [*] Waiting for logs. To exit press CTRL+C')
          16 
          17 def callback(ch, method, properties, body):
          18     print(" [x] %r" % body)
          19 
          20 channel.basic_consume(callback,queue=queue_name,no_ack=True)
          21 channel.start_consuming()
          消費者
      • topic
        設置模糊的綁定方式,「*」操做符將「.」視爲分隔符,匹配單個字符;「#」操做符沒有分塊的概念,它將任意「.」均視爲關鍵字的匹配部分,可以匹配多個字符。(全部符合routingKey(此時能夠是一個表達式)的bind的queue能夠接收消息)
        • 生產者java

           1 import pika
           2 import sys
           3 
           4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
           5 channel = connection.channel()
           6 channel.exchange_declare(exchange='topic_logs',
           7                          type='topic')
           8 
           9 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
          10 message = ' '.join(sys.argv[2:]) or 'Hello World!'
          11 channel.basic_publish(exchange='topic_logs',
          12                       routing_key=routing_key,
          13                       body=message)
          14 print(" [x] Sent %r:%r" % (routing_key, message))
          15 connection.close()
          生產者
        • 消費者 python

           1 import pika
           2 import sys
           3 
           4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
           5 channel = connection.channel()
           6 channel.exchange_declare(exchange='topic_logs',
           7                          type='topic')
           8 
           9 result = channel.queue_declare(exclusive=True)
          10 queue_name = result.method.queue
          11 
          12 binding_keys = sys.argv[1:]
          13 # #號 表明收全部消息
          14 # "kern.*" 表明收以kern開頭的消息
          15 # "*.critical" 表明收全部critical結尾的消息
          16 # "kern.*" "*.critical" 同時收以上二者
          17 if not binding_keys:
          18     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
          19     sys.exit(1)
          20 
          21 for binding_key in binding_keys:
          22     channel.queue_bind(exchange='topic_logs',
          23                        queue=queue_name,
          24                        routing_key=binding_key)
          25 
          26 print(' [*] Waiting for logs. To exit press CTRL+C')
          27 
          28 
          29 def callback(ch, method, properties, body):
          30     print(" [x] %r:%r" % (method.routing_key, body))
          31 
          32 
          33 channel.basic_consume(callback,
          34                       queue=queue_name,
          35                       no_ack=True)
          36 
          37 channel.start_consuming()
          View Code

           

  • redis緩存
    • 使用yum安裝好redis以後,使用redis-cli進入
      • 存儲 
        set name alex
        set age 22
        set name jack ex 2 #只存活2s的時間
      • 查詢目前全部的鍵
        keys *
      • 查詢值
        get name
        get age
    • python調用
      • 鏈接
         1 import redis
         2 #>>>>>>>>>>>>>>>>>>>>>鏈接方法1
         3 #鏈接
         4 r = redis.Redis(host='10.211.55.4', port=6379)
         5 #存儲值
         6 r.set('foo', 'Bar')
         7 #獲取值
         8 print(r.get('foo'))
         9 
        10 #>>>>>>>>>>>>>>>>>>>>>鏈接方法2
        11 #建立鏈接池
        12 pool=redis.ConnectionPool(host='10.211.55.4', port=6379)
        13 #鏈接
        14 r=redis.Redis(connection_pool=pool)
        15 #存儲值
        16 r.set('foo', 'Bar')
        17 #獲取值
        18 print(r.get('foo'))
      • 操做
        • string操做
          redis中的String在在內存中按照一個name對應一個value來存儲
          在Redis中設置值,不存在則建立,存在則修改
          • set(name, value, ex=None, px=None, nx=False, xx=False)
            • ex,過時時間(秒)
            • px,過時時間(毫秒)
            • nx,若是設置爲True,則只有name不存在時,當前set操做才執行
            • xx,若是設置爲True,則只有name存在時,當前set操做才執行
          • setnx(name, value)
            設置值,只有name不存在時,執行設置操做(添加)
          • setex(name, value, time)
            time,過時時間(數字秒 或 timedelta對象)
          • psetex(name, time_ms, value)
            time_ms,過時時間(數字毫秒 或 timedelta對象)
          • mset(*args, **kwargs)
            批量設置值  mset(k1='v1', k2='v2')或者mget({'k1''v1''k2''v2'})
          • get(name)
            獲取值
          • mget(keys, *args)
            批量獲取  mget('ylr''wupeiqi')或者r.mget(['ylr''wupeiqi'])
          • getset(name, value)
            設置新值並獲取返回原來的值(這個須要以前存在)
          • getrange(name, start, end)
            獲取子序列(根據字節獲取,非字符),能夠對字符串進行切片
            start,起始位置(字節);end,結束位置(字節)
          • setrange(name, offset, value)
            修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加)
            offset,字符串的索引,字節(一個漢字三個字節)
            value,要設置的值
          • setbit(name, offset, value)
            對name對應值的二進制表示的位進行操做
            offset,位的索引(將值變換成二進制後再進行索引)
            value,值只能是 1 或 0
          • getbit(name, offset)
            獲取name對應的值的二進制表示中的某位的值 (0或1)
          • bitcount(name, start=None, end=None)
            獲取name對應的值的二進制表示中 1 的個數
            start,位起始位置; end,位結束位置
          • strlen(name)
            返回name對應值的字節長度(一個漢字3個字節)
          • incr(self, name, amount=1)
            自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增
            amount,自增數(必須是整數)
          • incrbyfloat(self, name, amount=1.0)
            自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增
            amount,自增數(浮點型)
          • decr(self, name, amount=1)
            自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減
            amount,自減數(整數)
          • append(key, value)
            在redis name對應的值後面追加內容
            value, 要追加的字符串
        • hash操做
          hash表現形式上有些像pyhton中的dict,能夠存儲一組關聯性較強的數據
          • hset(name, key, value)
            name對應的hash中設置一個鍵值對(不存在,則建立;不然,修改)
            key,name對應的hash中的key;value,name對應的hash中的value
            hsetnx(name, key, value),當name對應的hash中不存在當前key時則建立(至關於添加)
          • hmset(name, mapping)
            在name對應的hash中批量設置鍵值對
            mapping,字典,如:{'k1':'v1', 'k2': 'v2'},例如hmset('xx', {'k1':'v1', 'k2': 'v2'})
          • hget(name,key)
            在name對應的hash中獲取根據key獲取value
          • hmget(name, keys, *args)
            在name對應的hash中獲取多個key的值
            keys,要獲取key集合,如:['k1', 'k2', 'k3'];*args,要獲取的key,如:k1,k2,k3,例如r.hmget('xx', ['k1', 'k2'])或者r.hmget('xx', 'k1', 'k2')
          • hgetall(name)
            獲取name對應 hash 的全部鍵值
          • hlen(name)
            獲取name對應的hash中鍵值對的個數
          • hkeys(name)
            獲取name對應的hash中全部的key的值
          • hvals(name)
            獲取name對應的hash中全部的value的值
          • exists(name, key)
            檢查name對應的hash是否存在當前傳入的key
          • hdel(name,*keys)
            將name對應的hash中指定key的鍵值對刪除
          • hincrby(name, key, amount=1)
            自增name對應的hash中的指定key的值,不存在則建立key=amount
          • hincrbyfloat(name, key, amount=1.0)
            自增name對應的hash中的指定key的值,不存在則建立key=amount
          • hscan(name, cursor=0, match=None, count=None)
            增量式迭代獲取,對於數據大的數據很是有用,hscan能夠實現分片的獲取數據,並不是一次性將數據所有獲取完,從而防止內存溢出
            cursor,遊標(基於遊標分批取獲取數據);match,匹配指定key,默認None 表示全部的key;count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
          • hscan_iter(name, match=None, count=None)
            利用yield封裝hscan建立生成器,實現分批去redis中獲取數據
            match,匹配指定key,默認None 表示全部的key;count,每次分片最少獲取個數,默認None表示採用Redis的默認分片個數
        • list操做
          List操做,redis中的List在在內存中按照一個name對應一個List來存儲
          • lpush(name,values)
            在name對應的list中添加元素,每一個新的元素都添加到列表的最左邊
            r.lpush('oo', 11,22,33),保存順序爲: 33,22,11
            rpush(name, values) 表示從右向左操做
          • lpushx(name,value)
            在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
            rpushx(name, value) 表示從右向左操做
          • llen(name)
            name對應的list元素的個數
          • linsert(name, where, refvalue, value))
            在name對應的列表的某一個值前或後插入一個新值
            where,BEFORE或AFTER;refvalue,標杆值,即:在它先後插入數據;value,要插入的數據
          • lset(name, index, value)
            對name對應的list中的某一個索引位置從新賦值
            index,list的索引位置;value,要設置的值
          • lrem(name, value, num)
            在name對應的list中刪除指定的值
            value,要刪除的值;num=0,刪除列表中全部的指定值;num=2,從前到後,刪除2個;num=-2,從後向前,刪除2個
          • lpop(name)
            在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
            rpop(name) 表示從右向左操做
          • lindex(name, index)
            在name對應的列表中根據索引獲取列表元素
          • lrange(name, start, end)
            在name對應的列表分片獲取數據
          • ltrim(name, start, end)
            在name對應的列表中移除沒有在start-end索引之間的值
          • rpoplpush(src, dst)
            從一個列表取出最右邊的元素,同時將其添加至另外一個列表的最左邊
          • blpop(keys, timeout)
            將多個列表排列,按照從左到右去pop對應列表的元素
            timeout,超時時間,當元素全部列表的元素獲取完以後,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
            brpop(keys, timeout),從右向左獲取數據
          • brpoplpush(src, dst, timeout=0)
            從一個列表的右側移除一個元素並將其添加到另外一個列表的左側
        • set操做
          Set操做,Set集合就是不容許重複的列表
          • sadd(name,values)
            name對應的集合中添加元素
          • scard(name)
            獲取name對應的集合中元素個數
          • sdiff(keys, *args)
            在第一個name對應的集合中且不在其餘name對應的集合的元素集合
          • sdiffstore(dest, keys, *args)
            獲取第一個name對應的集合中且不在其餘name對應的集合,再將其新加入到dest對應的集合中
          • sinter(keys, *args)
            獲取多一個name對應集合的並集
          • sinterstore(dest, keys, *args)
            獲取多一個name對應集合的並集,再講其加入到dest對應的集合中
          • sismember(name, value)
            檢查value是不是name對應的集合的成員
          • smembers(name)
            獲取name對應的集合的全部成員
          • smove(src, dst, value)
            將某個成員從一個集合中移動到另一個集合
          • spop(name)
            從集合的右側(尾部)移除一個成員,並將其返回
          • srandmember(name, numbers)
            從name對應的集合中隨機獲取 numbers 個元素
          • srem(name, values)
            在name對應的集合中刪除某些值
          • sunion(keys, *args)
            獲取多一個name對應的集合的並集
          • sunionstore(dest,keys, *args)
            獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中
          • sscan(name, cursor=0, match=None, count=None)或sscan_iter(name, match=None, count=None)
            同字符串的操做,用於增量迭代分批獲取元素,避免內存消耗太大
            >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
            有序集合,在集合的基礎上,爲每元素排序;元素的排序須要根據另一個值來進行比較,因此,對於有序集合,每個元素有兩個值,即:值和分數,分數專門用來作排序。
          • zadd(name, *args, **kwargs)
            在name對應的有序集合中添加元素,例如:zadd('zz', 'n1', 1, 'n2', 2)或zadd('zz', n1=11, n2=22)
          • zcard(name)
            獲取name對應的有序集合元素的數量
          • zcount(name, min, max)
            獲取name對應的有序集合中分數 在 [min,max] 之間的個數
          • zincrby(name, value, amount)
            自增name對應的有序集合的 name 對應的分數
          • zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
            按照索引範圍獲取name對應的有序集合的元素
            start,有序集合索引發始位置(非分數);end,有序集合索引結束位置(非分數);desc,排序規則,默認按照分數從小到大排序;withscores,是否獲取元素的分數,默認只獲取元素的值;score_cast_func,對分數進行數據轉換的函數
            從大到小排序:zrevrange(name, start, end, withscores=False, score_cast_func=float)
            按照分數範圍獲取name對應的有序集合的元素:zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
            從大到小排序:zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
          • zrank(name, value)
            獲取某個值在 name對應的有序集合中的排行(從 0 開始)
            zrevrank(name, value),從大到小排序
          • zrem(name, values)
            刪除name對應的有序集合中值是values的成員
          • zremrangebyrank(name, min, max)
            根據排行範圍刪除
          • zremrangebyscore(name, min, max)
            根據分數範圍刪除
          • zscore(name, value)
            獲取name對應有序集合中 value 對應的分數
          • zinterstore(dest, keys, aggregate=None)
            獲取兩個有序集合的交集,若是遇到相同值不一樣分數,則按照aggregate進行操做
            aggregate的值爲:  SUM  MIN  MAX
          • zunionstore(dest, keys, aggregate=None)
            獲取兩個有序集合的並集,若是遇到相同值不一樣分數,則按照aggregate進行操做
          • zscan(name, cursor=0, match=None, count=None, score_cast_func=float)或zscan_iter(name, match=None, count=None,score_cast_func=float)
            同字符串類似,相較於字符串新增score_cast_func,用來對分數進行操做
        • 其餘經常使用操做
          • delete(*names)
            根據刪除redis中的任意數據類型
          • exists(name)
            檢測redis的name是否存在
          • keys(pattern='*')
            根據模型獲取redis的name
            KEYS * 匹配數據庫中全部 key;KEYS h?llo 匹配 hello , hallo 和 hxllo 等;KEYS h*llo 匹配 hllo 和 heeeeello 等;KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
          • expire(name ,time)
            爲某個redis的某個name設置超時時間
          • rename(src, dst)
            對redis的name重命名爲
          • move(name, db)
            將redis的某個值移動到指定的db下
          • randomkey()
            隨機獲取一個redis的name(不刪除)
          • type(name)
            獲取name對應值的類型
          • scan(cursor=0, match=None, count=None)或scan_iter(match=None, count=None)
            同字符串操做,用於增量迭代獲取key
          • select 2
            切換到數據庫2,默認只有16(0~15)張表
          • get db_number 1
            切換到數據庫1
      • 管道
        redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做
         1 import redis,time
         2 #鏈接redis
         3 pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
         4 r = redis.Redis(connection_pool=pool)
         5 
         6 #啓動管道
         7 pipe = r.pipeline() #或者pipe=r.pipeline()
         8 
         9 #執行命令,兩條命令一塊兒執行
        10 pipe.set('name', 'alex')
        time.sleep(30)
        11 pipe.set('role', 'sb') 12 13 pipe.execute()transaction=True

         

      • 發佈訂閱
        • Demo
           1 #redishelper.py
           2 import redis
           3 
           4 class RedisHelper:
           5     def __init__(self):
           6         self.__conn = redis.Redis(host='10.211.55.4')
           7         self.chan_sub = 'fm104.5'
           8         self.chan_pub = 'fm104.5'
           9 
          10     def public(self, msg):#發佈者
          11         self.__conn.publish(self.chan_pub, msg)
          12         return True
          13 
          14     def subscribe(self):#訂閱者
          15         pub = self.__conn.pubsub()#打開收音機
          16         pub.subscribe(self.chan_sub)#調頻道
          17         pub.parse_response()#準備接收
          18         return pub
          Demo
        • 訂閱者
          1 #訂閱者.py
          2 from redishelper import RedisHelper
          3 
          4 obj = RedisHelper()
          5 redis_sub = obj.subscribe()
          6 
          7 while True:
          8     msg = redis_sub.parse_response()
          9     print(msg)
          訂閱者.py
        • 發佈者
          1 #發佈者.py
          2 from redishelper import RedisHelper
          3 obj = RedisHelper()
          4 obj.public('hello')
          發佈者.py

           

  • 基於RabbitMQ的RPC實現
    • 服務端
       1 import pika
       2 
       3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       4 channel = connection.channel()
       5 channel.queue_declare(queue='rpc_queue')
       6 
       7 def fib(n): #求斐波那契數列
       8     if n == 0:
       9         return 0
      10     elif n == 1:
      11         return 1
      12     else:
      13         return fib(n - 1) + fib(n - 2)
      14 
      15 
      16 def on_request(ch, method, props, body):
      17     n = int(body)
      18     print(" [.] fib(%s)" % n)
      19     response = fib(n)
      20     ch.basic_publish(exchange='',
      21                      routing_key=props.reply_to,
      22                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
      23                      body=str(response))
      24     ch.basic_ack(delivery_tag=method.delivery_tag)
      25 
      26 channel.basic_qos(prefetch_count=1)
      27 channel.basic_consume(on_request, queue='rpc_queue')
      28 print(" [x] Awaiting RPC requests")
      29 channel.start_consuming()
      服務端
    • 客戶端
       1 import pika,uuid
       2 
       3 class FibonacciRpcClient(object):
       4     def __init__(self):
       5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
       6         self.channel = self.connection.channel()
       7         result = self.channel.queue_declare(exclusive=True)
       8         self.callback_queue = result.method.queue
       9         self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)
      10 
      11     def on_response(self, ch, method, props, body):
      12         if self.corr_id == props.correlation_id:
      13             self.response = body
      14 
      15     def call(self, n):
      16         self.response = None
      17         self.corr_id = str(uuid.uuid4())
      18         self.channel.basic_publish(exchange='',
      19                                    routing_key='rpc_queue',
      20                                    properties=pika.BasicProperties(
      21                                        reply_to=self.callback_queue,
      22                                        correlation_id=self.corr_id,
      23                                    ),
      24                                    body=str(n))
      25         while self.response is None:
      26             self.connection.process_data_events()
      27         return int(self.response)
      28 
      29 
      30 fibonacci_rpc = FibonacciRpcClient()
      31 print(" [x] Requesting fib(30)")
      32 response = fibonacci_rpc.call(30)
      33 print(" [.] Got %r" % response)
      客戶端
相關文章
相關標籤/搜索