day11 rabbitmq redis rpc命令端

1、Rabbit MQ

一、工做隊列

工做隊列就是多個work共同按順序接收同一個queue裏面的任務,還能夠設置basic_qos來確保當前的任務執行完畢後才繼續接收任務。python

 

import pika

# 鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 申明隊列
channel.queue_declare(queue="work_queue", durable=True)     # durable 持久化,rabbit重啓這個queue也不會丟失

messages = ["apple", "pear", "cherry", "banana", "watermelon"]

for message in messages:
    # 發送消息,routing表示要發送到那個queue,body就是發送的消息內容,properties是其餘的一些配置,能夠設置多個
    channel.basic_publish(exchange="", routing_key="work_queue", body=message, properties=pika.BasicProperties(
        delivery_mode=2     # 發送的消息持久化,前提是queue也是持久化到的
    ))
    print("send {message} ok".format(message=message))

# channel.queue_delete(queue="work_queue")    # 刪除queue
# 關閉鏈接
conn.close()

  

import pika
import time

# 鏈接
cred = pika.PlainCredentials("Glen", "Glen[1234]")  # 用戶名密碼等信息
# conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672, virtual_host="/", credentials=cred))
channel = conn.channel()

# 回調函數
def callbak(ch, method, properties, body):
    print("body:", body)
    time.sleep(1)
    print("done..")
    print("method.delivery_tag", method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)      # 這裏的功能和no_ack相似,忽然終端queue會將任務繼續分配給下一個work

"""
使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工做者分配多個任務,
即只有工做者完成任務以後,纔會再次接收到任務。
"""
channel.basic_qos(prefetch_count=1)
# channel.queue_declare(queue="work_queue")
channel.basic_consume(callbak, queue="work_queue", no_ack=False)    # no_ack 默認使False,須要等待callback執行完畢纔算這個消息處理完畢
channel.start_consuming()

"""
這裏多個work會按順序接收producer發佈的任務,處理完成後才繼續接收
"""

  

二、交換機  

producer先將消息發送到交換機exchange,而後exchange再將消息發送給全部幫綁定的queue,即將消息廣播出去mysql

 

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
"""
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
"""
channel.exchange_declare(exchange="message", exchange_type="fanout")

while True:
    message = input(">>")
    # 直接發送到exchange,接收端使用隨機的queue來綁定exchange,而後接收
    channel.basic_publish(exchange="message", routing_key="", body=message)
    print("send {message} ok".format(message=message))

  

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
channel.exchange_declare(exchange="message", exchange_type="fanout")

# 生成隨機的queue,並綁定到交換機
result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue    # 獲取隨機勝場的queue名字

# 將隨機的queue綁定到exchange
channel.queue_bind(exchange="message", queue=queue_name)

def callback(ch, method, properties, body):
    print(body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

  

三、路由器

direct和路由器相似,發送小時的時候須要指定目的地routing_key,只有對應的queue纔會接收git

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義路由鍵
"""
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
"""
channel.exchange_declare(exchange="message2", exchange_type="direct")

while True:
    message, routing = input(">>").split()
    # 發送消息的時候同時指定routing_key,只有對應routing_key的consumer纔會接收到
    # 發送消息示例:info_message info
    channel.basic_publish(exchange="message2", routing_key=routing, body=message)   # 發送的每一個消息都要指明路由
    print("send {message} {routing} ok".format(message=message, routing=routing))

  

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
channel.exchange_declare(exchange="message2", exchange_type="direct")

# 生成隨機的queue,並綁定到交換機
result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue    # 獲取隨機勝場的queue名字
# channel.queue_bind(exchange="message2", routing_key="info", queue=queue_name)
channel.queue_bind(exchange="message2", routing_key="warning", queue=queue_name)    # 綁定不一樣的routing_key
# channel.queue_bind(exchange="message2", routing_key="error", queue=queue_name)

def callback(ch, method, properties, body):
    print(body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

  

四、路由模糊匹配

producer發送消息的時候能夠模糊地指定接收的queue,若有多個queue, mysql.error  redis.eror  mysql.info redis.info,指定不一樣的routing_key能夠匹配到不一樣的queue,mysql.* 能夠匹配到mysql.error,mysql.info, *.error能夠匹配redis.error,mysql.error。「#」表示全部、所有的意思;「*」只匹配到一個詞。redis

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義路由鍵
"""
fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
"""
channel.exchange_declare(exchange="message3", exchange_type="topic")

"""
發送的消息以下:
a happy.work
b happy.life
c sad.work
d sad.life 
"""
while True:
    message, routing = input(">>").split()
    channel.basic_publish(exchange="message3", routing_key=routing, body=message)   # 發送的每一個消息都要指明路由
    print("send {message} {routing} ok".format(message=message, routing=routing))

  

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義交換機
channel.exchange_declare(exchange="message3", exchange_type="topic")

# 生成隨機的queue,並綁定到交換機
result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name = result.method.queue    # 獲取隨機勝場的queue名字
# channel.queue_bind(exchange="message3", routing_key="#", queue=queue_name)    # 能夠接收任何消息
# channel.queue_bind(exchange="message3", routing_key="happy.*", queue=queue_name)    # 綁定不一樣的routing_key
channel.queue_bind(exchange="message3", routing_key="*.work", queue=queue_name)

def callback(ch, method, properties, body):
    print(body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

  

五、rpc遠程調用返回

遠程調用至關於有一個控制中心和多個計算節點,控制中心發指令調用遠程的計算節點的函數進行計算,而後將結果返回給計算中心,pika模塊也實現了該功能sql

import pika
import time

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義隊列
channel.queue_declare(queue="rpc_queue")

# 執行的函數
def mul(n):
    time.sleep(5)
    return n * n

# 定義接收到消息的處理方法
def message_handle(ch, method, properties, body):
    print("{body} * {body} = ?".format(body=body))
    response = mul(int(body))
    # 將計算結果返回
    ch.basic_publish(exchange="", routing_key=properties.reply_to, body=str(response))
    # 返回執行成功
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(message_handle, queue="rpc_queue")
channel.start_consuming()

  

import pika
import threading


class Center(object):
    def __init__(self):
        self.response = ""
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71"))
        self.channel = self.connection.channel()
        # 定義接收返回消息的隊列 而後在發送命令的時候做爲參數傳遞過去,rpc執行完畢後將消息發送到這個queue裏面
        self.callback_queue = self.channel.queue_declare(exclusive=True).method.queue
        self.channel.basic_consume(self.response_hand, no_ack=True, queue=self.callback_queue)

    # 定義處理返回消息的函數
    def response_hand(self, ch, method, properties, body):
        self.response = body
        print(body)

    def request(self, n):
        self.response = ""
        # 發送計算請求,同時加上返回隊列名
        self.channel.basic_publish(body=str(n), exchange="", routing_key="rpc_queue", properties=pika.BasicProperties(
            reply_to=self.callback_queue
        ))
        # 等待接收返回數據
        while self.response is "":
            self.connection.process_data_events()
        return int(self.response)


while True:
    message = input(">>")
    if not message.isdigit():
        continue
    center = Center()
    t = threading.Thread(target=center.request, args=(int(message), ))      # 啓用多線程,能夠不阻塞執行命令
    t.start()

  

 2、Redis

redis一共有string、list、set、zset、hash這五種經常使用集合,下面對經常使用命令進行整理,參考文檔http://doc.redisfans.com/shell

一、鏈接方法

import redis

"""
redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,
StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令
(好比,SET命令對應與StrictRedis.set方法)。Redis是StrictRedis的子類,
用於向後兼容舊版本的redis-py。 簡單說,官方推薦使用StrictRedis方法。 
"""
# redis = redis.Redis(host="192.169.120.71", port=6379)
# 鏈接池
# pool = redis.ConnectionPool(host="192.168.120.71", port=6379)
# 鏈接redis
# redis = redis.Redis(connection_pool=pool)


# 使用默認方式鏈接到數據庫
# redis = redis.StrictRedis(host='192.168.120.71', port=6379, db=0)
# 使用url方式鏈接到數據庫
# redis = redis.StrictRedis.from_url('redis://@192.168.120.71:6379/0')
# 鏈接池
# pool = redis.ConnectionPool(host="192.168.120.71", port=6379)
"""
有三種構造url的方法
redis://[:password]@host:port/db    # TCP鏈接
rediss://[:password]@host:port/db   # Redis TCP+SSL 鏈接
unix://[:password]@/path/to/socket.sock?db=db    # Redis Unix Socket 鏈接
"""
pool = redis.ConnectionPool.from_url("redis://@192.168.120.71:6379/0")
redis = redis.StrictRedis(connection_pool=pool)
name = redis.get("name")
print(name)

  

 

二、key

 

三、string

 

四、list

 

 

五、set

 

 

六、zset

 

 

七、hash

 

 

八、發佈、訂閱、管道

import redis

pool = redis.ConnectionPool(host="192.168.120.71", port=6379)

# r = redis.StrictRedis(connection_pool=pool)
# pipe = r.pipeline(transaction=True)   # 生成管道
# pipe.set("status", 1)
# pipe.set("message", "hello")
# pipe.execute()                        # 上面兩條一塊兒執行,其中一條執行失敗則都失敗

class RedisPubSub(object):
    def __init__(self, channel_sub="fm110", channel_pub="fm110"):
        self.__conn = redis.StrictRedis(connection_pool=pool)
        self.channel_sub = channel_sub
        self.channel_pub = channel_pub

    def pub(self, message):
        self.__conn.publish(message=message, channel=self.channel_pub)
        # return True

    def sub(self):
        sub = self.__conn.pubsub()
        sub.subscribe(self.channel_sub)
        sub.parse_response()
        return sub

  

from day11.pub_sub_pipe import *


r = RedisPubSub(channel_sub="fm110", channel_pub="fm110")
r.pub("hello")

  

from day11.pub_sub_pipe import *

r = RedisPubSub(channel_pub="fm110", channel_sub="fm110")
redis_sub = r.sub()

while True:
    msg = redis_sub.parse_response()
    print(msg)

  

3、rpc命令端數據庫

import pika
import threading
import uuid


class Center(object):
    def __init__(self, remote_host):
        self.remote_host = remote_host
        self.response = {}
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71"))
        self.channel = self.connection.channel()
        # self.channel.exchange_declare(exchange="work", exchange_type="fanout")
        self.channel.queue_declare(queue=remote_host)
        # 定義接收返回消息的隊列 而後在發送命令的時候做爲參數傳遞過去,rpc執行完畢後將消息發送到這個queue裏面
        self.callback_queue = self.channel.queue_declare(exclusive=True).method.queue
        self.channel.basic_consume(self.response_hand, no_ack=True, queue=self.callback_queue)

    # 定義處理返回消息的函數
    def response_hand(self, ch, method, properties, body):
        self.response[properties.correlation_id] = eval(body.decode("utf"))
        print(self.remote_host, properties.correlation_id, self.response[properties.correlation_id]["stdout"], end="")


    def request(self, n):
        rpcid = str(uuid.uuid4())   # 使用UUID生成標記,隨消息一塊兒發送,rpc處理後再把這個id傳遞回來
        print(self.remote_host, rpcid, n)             # 這樣及時再同一個隊列裏面的消息執行結果也不會混亂
        self.response[rpcid] = ""
        # 發送計算請求,同時加上返回隊列名
        self.channel.basic_publish(body=str(n), exchange="", routing_key=self.remote_host, properties=pika.BasicProperties(
            reply_to=self.callback_queue,
            correlation_id=rpcid    # 發送任務時添加任務id
        ))
        # 等待接收返回數據
        while self.response[rpcid] is "":
            self.connection.process_data_events()
        return self.response[rpcid]


while True:
    message = input(">>").split()   # cmd ip1,ip2,ip3
    if not message:
        continue
    hosts = message[1].split(",")
    for host in hosts:
        center = Center(host)
        t = threading.Thread(target=center.request, args=(message[0], ))      # 啓用多線程,能夠不阻塞執行命令
        t.start()

  

import pika
import subprocess


# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.120.71", port=5672))
channel = conn.channel()

# 定義隊列

# 執行的函數
def cmd_handel(cmd_str):
    print(cmd_str)
    re = {}
    p = subprocess.run(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    re["stdout"] = p.stdout.decode("utf8")
    re["stderr"] = p.stderr.decode("utf8")
    re["code"] = p.returncode
    re["host"] = "1.1.1.1"
    print(re["stdout"])
    return re

# 定義接收到消息的處理方法
def message_handle(ch, method, properties, body):
    print(body.decode("utf8"))
    response = cmd_handel(body.decode("utf8"))
    # 將計算結果返回
    ch.basic_publish(exchange="", routing_key=properties.reply_to, body=str(response), properties=pika.BasicProperties(
        correlation_id=properties.correlation_id    # 返回消息時一塊兒返回任務id
    ))
    # 返回執行成功
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.queue_declare(queue="2.2.2.2")
queue_name = channel.queue_declare(exclusive=True).method.queue
channel.basic_consume(message_handle, queue="2.2.2.2")
channel.start_consuming()
相關文章
相關標籤/搜索