RabbitMq與Redis的使用

<!DOCTYPE html>


RabbitMQRedisMysql




css

RabbitMQ 消息隊列

python裏有threading QUEUE 只用於線程間交互,進程QUEUE 用於父進程與子進程或者是兄弟進程

RabbitMQ採用消息輪詢的方式發送消息。一個一個的給每一個消費者

應用之間使用socket實現數據共享

連接幾個應用的中間商著名的有:
html

  1. RabbitMQ
  2. ZeroMQ
  3. ActiveMQ

    RabbitMQ使用

    生產者:
  4. 引用pika模塊

    import pika
  5. 創建socket

    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  6. 聲明一個管道

    channel=connection.channel()
  7. 在管道中聲明隊列

    channel.queue_declare(queue='隊列名')
  8. 經過管道發送消息 rountingkey就是隊列名字

    channel.basic_publish(exchange='',routing_key='隊列名',body='Hello World!')
  9. 關閉隊列,不用關閉管道

    connection.close()

    消費者(多是其餘機器,能夠跨機器)
  10. 引用pika模塊

    import pika
  11. 創建socket

    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  12. 聲明一個管道

    channel=connection.channel()
  13. 在管道中聲明隊列

    channel.queue_declare(queue='隊列名')
  14. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties包含發消息端的設置信息

    def callback(ch,method,properties,body):
    print(ch,method,properties,body)
    print("[x] Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)手動確認消息處理完,否則消息一直不銷燬

  15. 消費消息,定義函數的目的,若是收到消息,就用定義的函數處理消息noack參數消息確認,當爲True時消息不等消費者確認消息隊列就銷燬消息,爲False時須要等待消費者處理完消息的確認消息隊消息隊列才銷燬消息(判斷的是socket是否斷開)

    channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)
  16. 啓動管道接收消息,啓動後一直處於開啓狀態,沒有消息就等待。

    channel.start_consuming()


    RabbitMQ消息分發輪詢


    採用輪詢的方式,依次的發給每一個消費者

    生產者會等待消費者肯定處理完消息的回覆纔會銷燬消息。

    當消息執行到一半的時候,消費者斷開,消息會保留髮送給下一個消費者處理
    python


    消息持久化


    消息必須等消費者手動肯定後,才銷燬ch.basicack(deliverytag=method.delivery_tag)手動確認消息處理完,否則消息一直不銷燬

    當RabbitMQ服務中止,服務裏的消息隊列會銷燬。

    若是想保持消息隊列的持久化,必須在聲明隊列的時候設置,durable=True。這樣當RabbitMQ服務斷開再重啓,消息隊列仍是存在,消息會銷燬

    channel.queue_declare(queue='隊列名',durable=True)
    消息也持久化

    channel.basic_publish(exchange='',routing_key='隊列名',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))
    mysql


    廣播模式


    消費者端加channel.basicqos(prefetchcount=1),加過這句話實現了,不是按順序分發,而是看哪個是空閒的,才分發給空閒的消費者消息。多大本事幹多少活。

    廣播是生產者發消息,全部消費者都收到。

    用exchange實現廣播。
    fanout:全部bind到此exchange的queue均可以接收消息

    direct:經過routingkey和exchange決定的那個惟一的queue能夠接收消息

    topic:全部符合routingkey(此時能夠是一個表達式)的routingkey所bind的queue能夠接受消息。
    git


    fanout純廣播

    設置管道的時候設置

    channel.exchange_declare(exchange='logs',type='fanout')

    不聲明queue

    生產者:
    github

  17. 引用pika模塊

    import pika
  18. 創建socket

    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  19. 聲明一個管道

    channel=connection.channel()
  20. 設置管道的時候設置

    channel.exchange_declare(exchange='logs',exchange_type='fanout')
  21. 經過管道發送消息,廣播不須要管道名

    channel.basic_publish(exchange='logs',routing_key='',body='Hello World!')
  22. 關閉隊列,不用關閉管道

    connection.close()

    消費者(多是其餘機器,能夠跨機器)
  23. 引用pika模塊

    import pika
  24. 創建socket

    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  25. 聲明一個管道

    channel=connection.channel()
  26. 設置管道的時候設置

    channel.exchange_declare(exchange='logs',exchange_type='fanout')
  27. 生成隨機queue與exchange轉發器綁定。
result=channel.queue<em>declare(exclusive=True)exclusive排他的,單獨的,生成隨街queue,綁定再exchange上
queue</em>name=result.method.queue<br />
channel.queue<em>bind(exchange='logs',queue=queue</em>name)綁定轉發器,接受轉發器裏的消息,exchange與隨機生成的queue綁定  
</p>
<p><code>6. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties</code>
def callback(ch,method,properties,body):
        print(ch,method,properties,body)
        print(&quot;[x] Received %r&quot;%body)<br />
        ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手動確認消息處理完,否則消息一直不銷燬
  1. 消費消息,定義函數的目的,若是收到消息,就用定義的函數處理消息noack參數消息確認,當爲True時消息不等消費者確認消息隊列就銷燬消息,爲False時須要等待消費者處理完消息的確認消息隊消息隊列才銷燬消息(判斷的是socket是否斷開)

    channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)
  2. 啓動管道接收消息,啓動後一直處於開啓狀態,沒有消息就等待。

    channel.start_consuming()


    direct廣播 info warning error 劃分消息

    生產者:
    web

  3. 引用pika模塊

    import pika
  4. 創建socket

    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  5. 聲明一個管道

    channel=connection.channel()
  6. 設置管道的時候設置

    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
  7. 接受分發消息的級別

    severity=sys.argv[1] if len(sys.argv)>1 else 'info'
    message=' '.join(sys.argv[2:]) or 'hello world!'
  8. 經過管道發送消息,廣播不須要管道名

    channel.basic_publish(exchange='direct_logs',
    routing_key=severity,#相似指定queue
    body=message)

  9. 關閉隊列,不用關閉管道

    connection.close()

    消費者
  10. 引用pika模塊

    import pika
  11. 創建socket

    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  12. 聲明一個管道

    channel=connection.channel()
  13. 設置管道的時候設置

    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
  14. 生成隨機queue與exchange轉發器綁定。
result=channel.queue<em>declare(exclusive=True)exclusive排他的,單獨的,生成隨街queue,綁定再exchange上
queue</em>name=result.method.queue<br />
severities = sys.argv[1:]
if not severities:
    sys.stderr.write(&quot;Usage: %s [info] [warning] [error]\n&quot; % sys.argv[0])
    sys.exit(1)</p>
<p>for severity in severities:
    channel.queue<em>bind(exchange='direct</em>logs',queue=queue<em>name,routing</em>key=severity)</p>
<p><code>6. 定義函數,標準的處理消息的函數都會帶下面四個參數ch管道聲明的對象地址,method包含你要發消息給誰的信息,properties</code>
def callback(ch,method,properties,body):
        print(ch,method,properties,body)
        print(&quot;[x] Received %r&quot;%body)<br />
        ch.basic<em>ack(delivery</em>tag=method.delivery<em>tag)手動確認消息處理完,否則消息一直不銷燬
  1. 消費消息,定義函數的目的,若是收到消息,就用定義的函數處理消息noack參數消息確認,當爲True時消息不等消費者確認消息隊列就銷燬消息,爲False時須要等待消費者處理完消息的確認消息隊消息隊列才銷燬消息(判斷的是socket是否斷開)

    channel.basic_consume('定義的函數名',queue='隊列名',no_ack=True)
  2. 啓動管道接收消息,啓動後一直處於開啓狀態,沒有消息就等待。

    channel.start_consuming()


    topic廣播

    更細緻的消息過濾,包括應用程序,#收全部消息,.info接受帶有.info的消息,mysql.接受帶mysql的消息

    生產者

    import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>routing<em>key = sys.argv[1] if len(sys.argv) &gt; 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic</em>publish(exchange='topic<em>logs', routing<em>key=routing</em>key, body=message) print(&quot; [x] Sent %r:%r&quot; % (routing</em>key, message)) connection.close() <code>消費者</code> import pika import sys</p> <p>connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()</p> <p>channel.exchange<em>declare(exchange='topic</em>logs', type='topic')</p> <p>result = channel.queue<em>declare(exclusive=True) queue</em>name = result.method.queue</p> <p>binding<em>keys = sys.argv[1:] if not binding</em>keys: sys.stderr.write(&quot;Usage: %s [binding_key]...\n&quot; % sys.argv[0]) sys.exit(1)</p> <p>for binding<em>key in binding</em>keys: channel.queue<em>bind(exchange='topic<em>logs', queue=queue</em>name, routing</em>key=binding_key)</p> <p>print(' [*] Waiting for logs. To exit press CTRL+C') </p> <p>def callback(ch, method, properties, body): print(&quot; [x] %r:%r&quot; % (method.routing_key, body)) </p> <p>channel.basic<em>consume(callback, queue=queue</em>name, no_ack=True) </p> <p>channel.start_consuming() <br />redis


    RabbitMQ rpc(remote procedure call)遠程調用一個方法


    即便生產者又是消費者

    startcosuming爲阻塞模式,rpc不用阻塞,rpc是執行一會這個再去執行另外一個。processdata_events()非阻塞方法,能收到消息就收,沒有消息不阻塞繼續往下執行

    服務器端

    ```sql


    coding:utf-8


    author = 'Alex Li'
    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))mongodb


    channel = connection.channel()


    channel.queuedeclare(queue='rpcqueue')


    def fib(n):
    if n == 0:
    return 0
    elif n == 1:
    return 1
    else:
    return fib(n-1) + fib(n-2)


    def on_request(ch, method, props, body):
    n = int(body)


    print(" [.] fib(%s)" % n)

    response = fib(n)

ch.basic_publish(exchange='',

routing_key=props.reply_to,

properties=pika.BasicProperties(correlation_id = <br/>
props.correlation_id),

body=str(response))

ch.basic_ack(delivery_tag = method.delivery_tag)


channel.basicqos(prefetchcount=1) channel.basic_consume(onrequest, queue='rpcqueue')

print(" [x] Awaiting RPC requests") channel.start_consuming() 客戶端 import pika import uuid

class FibonacciRpcClient(object): def init(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(

replyto = self.callbackqueue,#客戶端發送消息是,把接收返回消息的管道也告訴給服務器端 correlationid = self.corrid,#用於判斷服務器端返回的結果和個人請求是不是同一條。目的就是爲了客戶端能夠同時發兩條消息,當服務器端返回結果時,須要有判斷關於哪條請求的結果 ), body=str(n)) while self.response is None: self.connection.processdataevents() return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) ```

Redis

緩存中間商,用socket,例如:mongodb,redis,memcache
* 鏈接方式
* 鏈接池
* 操做
- String 操做
- Hash 操做
- List 操做
- Set 操做
- Sort Set 操做
* 管道
* 發佈訂閱

String操做
  1. set(name, value, ex=None, px=None, nx=False, xx=False)
    在Redis中設置值,默認,不存在則建立,存在則修改
    參數:
    ex,過時時間(秒)
    px,過時時間(毫秒)
    nx,若是設置爲True,則只有name不存在時,當前set操做才執行
    xx,若是設置爲True,則只有name存在時,崗前set操做才執行
  2. setnx(name, value) 設置值,只有name不存在時,執行設置操做(添加)
  3. setex(name, value, time) 設置值 參數:time,過時時間(數字秒 或 timedelta對象)
  4. psetex(name, timems, value) 設置值 參數:timems,過時時間(數字毫秒 或 timedelta對象)
  5. mset(*args, **kwargs) 批量設置值 如:mset(k1='v1', k2='v2')或mget({'k1': 'v1', 'k2': 'v2'})
  6. get(name) 獲取值
  7. mget(keys, *args) 批量獲取 如:mget('ylr', 'wupeiqi')或r.mget(['ylr', 'wupeiqi'])
  8. getset(name, value) 設置新值並獲取原來的值
  9. getrange(key, start, end) 獲取子序列(根據字節獲取,非字符) 參數: name,Redis 的 name start,起始位置(字節) end,結束位置(字節) 如: "武沛齊" ,0-3表示 "武"
  10. setrange(name, offset, value) 修改字符串內容,從指定字符串索引開始向後替換(新值太長時,則向後添加) 參數: offset,字符串的索引,字節(一個漢字三個字節) value,要設置的值
  11. setbit(name, offset, value) BITCOUNT 統計二進制有多少個1 對name對應值的二進制表示的位進行操做 參數: name,redis的name offset,位的索引(將值變換成二進制後再進行索引) value,值只能是 1 或 0 注:若是在Redis中有一個對應: n1 = "foo", 那麼字符串foo的二進制表示爲:01100110 01101111 01101111 因此,若是執行 setbit('n1', 7, 1),則就會將第7位設置爲1, 那麼最終二進制則變成 01100111 01101111 01101111,即:"goo" 擴展,轉換二進制表示: source = "武沛齊" source = "foo" for i in source: num = ord(i) print bin(num).replace('b','') 特別的,若是source是漢字 "武沛齊"怎麼辦? 答:對於utf-8,每個漢字佔 3 個字節,那麼 "武沛齊" 則有 9個字節 對於漢字,for循環時候會按照 字節 迭代,那麼在迭代時,將每個字節轉換 十進制數,而後再將十進制數轉換成二進制 11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000 -------------------------- ----------------------------- ----------------------------- 武 沛 齊
  12. getbit(name, offset) 獲取name對應的值的二進制表示中的某位的值 (0或1)
  13. bitcount(key, start=None, end=None) 獲取name對應的值的二進制表示中 1 的個數 參數: key,Redis的name start,位起始位置 end,位結束位置
  14. bitop(operation, dest, *keys) 獲取多個值,並將值作位運算,將最後的結果保存至新的name對應的值 參數: operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或) dest, 新的Redis的name *keys,要查找的Redis的name 如: bitop("AND", 'newname', 'n1', 'n2', 'n3') 獲取Redis中n1,n2,n3對應的值,而後講全部的值作位運算(求並集),而後將結果保存 newname 對應的值中
  15. strlen(name) 返回name對應值的字節長度(一個漢字3個字節)
  16. incr(self, name, amount=1) 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。 參數: name,Redis的name amount,自增數(必須是整數) 注:同incrby
  17. incrbyfloat(self, name, amount=1.0) 自增 name對應的值,當name不存在時,則建立name=amount,不然,則自增。 參數: name,Redis的name amount,自增數(浮點型)
  18. decr(self, name, amount=1) 自減 name對應的值,當name不存在時,則建立name=amount,不然,則自減。 參數: name,Redis的name amount,自減數(整數)
  19. append(key, value) 在redis name對應的值後面追加內容 參數: key, redis的name value, 要追加的字符串   
相關文章
相關標籤/搜索