import pika def rabbitmq_conn(): credentials = pika.PlainCredentials(mq_config["user"], mq_config["passwd"]) parameters = pika.ConnectionParameters(mq_config["host"],mq_config["port"],'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() return channel #######生產者 body = 'body' channel.basic_publish(exchange='', routing_key='hello', body=body) print " [x] Sent %s" %body connection.close() #########消費者 def deal_msg_callback(ch, method, properties, body): print body ch.basic_ack(delivery_tag = method.delivery_tag) channel=rabbitmq_conn() channel.basic_consume(deal_msg_callback,queue='123456') channel.start_consuming() ##################################################################### 一個rabbitmq的類 import kafka from pika import connection,BlockingConnection,ConnectionParameters,credentials,BasicProperties #單例類 class _RmqMgrSingleton(type): def __init__(self, name, bases, dict): super(_RmqMgrSingleton, self).__init__(name, bases, dict) self._instance = {} def __call__(self, host,port,user,passwd): if not self._instance.has_key((host,port,user,passwd)): self._instance[(host,port,user,passwd)] = super(_RmqMgrSingleton, self).__call__(host,port,user,passwd) return self._instance[(host,port,user,passwd )] class RmqMgr(object): "rabbit mq 隊列的一個抽象" __metaclass__ = _RmqMgrSingleton def __init__(self,host,port,user,passwd): self.host=host self.port=port self.user,self.passwd=user,passwd def _get_rmq_channel(self ,host,port,user,passwd,withconn=False): "獲取鏈接到rabbitmq的通道對象" conn=BlockingConnection(ConnectionParameters(host=host,port=port,credentials=credentials.PlainCredentials(username=user,password=passwd))) if withconn==False: return conn.channel() else: return conn.channel(),conn def send_message(self ,exchange,routeing_key,body): "發送消息到隊列" if type(body)==unicode: body=body.encode("utf-8") assert type(body)==str try: self.__channel.basic_publish(exchange=exchange,routing_key=routeing_key,body=body,properties=BasicProperties(delivery_mode=2)) except: self.__channel,self.__conn=self._get_rmq_channel(self.host,self.port,self.user,self.passwd,withconn=True) self.__channel.basic_publish(exchange=exchange,routing_key=routeing_key,body=body,properties=BasicProperties(delivery_mode=2)) return True def __del__(self): self.__conn.close() import requests def get_rabbitmq_info(): url = "http://"+RMQ["user"]+":"+RMQ["passwd"]+"@"+RMQ["host"]+":15672/api/queues/" for i in requests.get(url).json(): if i["name"]=="v3baninfo_precheck": if int(i["messages"]) > 50000: return True else: return False