python 的rabbitmq 實例

import pika
def rabbitmq_conn():
    credentials = pika.PlainCredentials(mq_config["user"], mq_config["passwd"])
    parameters = pika.ConnectionParameters(mq_config["host"],mq_config["port"],'/',credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    return channel

#######生產者   
body = 'body'
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=body)
    print " [x] Sent %s" %body
    connection.close()
#########消費者
def deal_msg_callback(ch, method, properties, body):
    print body
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel=rabbitmq_conn()
channel.basic_consume(deal_msg_callback,queue='123456')
channel.start_consuming()




#####################################################################
一個rabbitmq的類
import  kafka
from pika import  connection,BlockingConnection,ConnectionParameters,credentials,BasicProperties
#單例類
class _RmqMgrSingleton(type):
    def __init__(self, name, bases, dict):
        super(_RmqMgrSingleton, self).__init__(name, bases, dict)
        self._instance = {}
    def __call__(self, host,port,user,passwd):
        if not self._instance.has_key((host,port,user,passwd)):
            self._instance[(host,port,user,passwd)] = super(_RmqMgrSingleton, self).__call__(host,port,user,passwd)
        return self._instance[(host,port,user,passwd )]
class RmqMgr(object):
    "rabbit mq 隊列的一個抽象"
    __metaclass__ = _RmqMgrSingleton
    def __init__(self,host,port,user,passwd):
        self.host=host
        self.port=port
        self.user,self.passwd=user,passwd
    def _get_rmq_channel(self ,host,port,user,passwd,withconn=False):
        "獲取鏈接到rabbitmq的通道對象"
        conn=BlockingConnection(ConnectionParameters(host=host,port=port,credentials=credentials.PlainCredentials(username=user,password=passwd)))
        if withconn==False:
            return  conn.channel()
        else:
            return conn.channel(),conn
    def send_message(self ,exchange,routeing_key,body):
        "發送消息到隊列"
        if type(body)==unicode:
            body=body.encode("utf-8")
        assert type(body)==str
        try:
            self.__channel.basic_publish(exchange=exchange,routing_key=routeing_key,body=body,properties=BasicProperties(delivery_mode=2))
        except:
            self.__channel,self.__conn=self._get_rmq_channel(self.host,self.port,self.user,self.passwd,withconn=True)
            self.__channel.basic_publish(exchange=exchange,routing_key=routeing_key,body=body,properties=BasicProperties(delivery_mode=2))
        return True
    def __del__(self):
        self.__conn.close()
        
        
import requests
def get_rabbitmq_info():
    url = "http://"+RMQ["user"]+":"+RMQ["passwd"]+"@"+RMQ["host"]+":15672/api/queues/"
    for i in requests.get(url).json():
        if i["name"]=="v3baninfo_precheck":
            if int(i["messages"]) > 50000:
                return True
            else:
                return False
相關文章
相關標籤/搜索