rabbitmq做爲消息隊列能夠有消息消費確認機制,以前寫個基於redis的通用生產者 消費者 併發框架,redis的list結構能夠簡單充當消息隊列,但不具有消費確認機制,隨意關停程序,會丟失一部分正在程序中處理但還沒執行完的消息。基於redis的與基於rabbitmq相比對消息消費速度和消息數量沒有自然的支持。redis
使用rabbitmq的最經常使用庫pika多線程
無論是寫代碼仍是運行起來都比celery使用更簡單,基本可以知足絕大多數場景使用,用來取代celery worker模式(celery有三個模式,worker模式最經常使用,其他是定時和間隔時間兩種模式)的後臺異步的做用。併發
# coding=utf-8 """ 一個通用的rabbitmq生產者和消費者。使用多個線程消費同一個消息隊列。 """ from collections import Callable import functools import time from threading import Lock from pika import BasicProperties # noinspection PyUnresolvedReferences from app.utils_ydf import (LoggerMixin, LogManager, decorators, RabbitMqHelper, BoundedThreadPoolExecutor) class RabbitmqPublisher(LoggerMixin): def __init__(self, queue_name, log_level_int=1): self._queue_name = queue_name self.logger.setLevel(log_level_int * 10) channel = RabbitMqHelper().creat_a_channel() channel.queue_declare(queue=queue_name, durable=True) self.channel = channel self.lock = Lock() self._current_time = None self.count_per_minute = None self._init_count() self.logger.info(f'{self.__class__} 被實例化了') def _init_count(self): self._current_time = time.time() self.count_per_minute = 0 def publish(self, msg): with self.lock: # 親測pika多線程publish會出錯。 self.channel.basic_publish(exchange='', routing_key=self._queue_name, body=msg, properties=BasicProperties( delivery_mode=2, # make message persistent ) ) self.logger.debug(f'放入 {msg} 到 {self._queue_name} 隊列中') self.count_per_minute += 1 if time.time() - self._current_time > 60: self._init_count() self.logger.info(f'一分鐘內推送了 {self.count_per_minute} 條消息到 {self.channel.connection} 中') class RabbitmqConsumer(LoggerMixin): def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=1, is_print_detail_exception=True): """ :param queue_name: :param consuming_function: 處理消息的函數,函數有且只能有一個參數,參數表示消息。是爲了簡單,放棄策略和模板來強制參數。 :param threads_num: :param max_retry_times: :param log_level: :param is_print_detail_exception: """ self._queue_name = queue_name self.consuming_function = consuming_function self.threadpool = BoundedThreadPoolExecutor(threads_num) self._max_retry_times = max_retry_times self.logger.setLevel(log_level * 10) self.logger.info(f'{self.__class__} 被實例化') self._is_print_detail_exception = is_print_detail_exception self.rabbitmq_helper = RabbitMqHelper(heartbeat_interval=30) channel = self.rabbitmq_helper.creat_a_channel() channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=threads_num) self.channel = channel LogManager('pika.heartbeat').get_logger_and_add_handlers(1) @decorators.keep_circulating(1) # 是爲了保證不管rabbitmq異常中斷多久,無需重啓程序就能保證恢復後,程序正常。 def start_consuming_message(self): def callback(ch, method, properties, body): msg = body.decode() self.logger.debug(f'從rabbitmq取出的消息是: {msg}') # ch.basic_ack(delivery_tag=method.delivery_tag) self.threadpool.submit(self.__consuming_function, ch, method, properties, msg) self.channel.basic_consume(callback, queue=self._queue_name, # no_ack=True ) self.channel.start_consuming() @staticmethod def ack_message(channelx, delivery_tagx): """Note that `channel` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ if channelx.is_open: channelx.basic_ack(delivery_tagx) else: # Channel is already closed, so we can't ACK this message; # log and/or do something that makes sense for your app in this case. pass def __consuming_function(self, ch, method, properties, msg, current_retry_times=0): if current_retry_times < self._max_retry_times: # noinspection PyBroadException try: self.consuming_function(msg) # ch.basic_ack(delivery_tag=method.delivery_tag) self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag)) except Exception as e: self.logger.error(f'函數 {self.consuming_function} 第{current_retry_times+1}次發生錯誤,\n 緣由是{e}', exc_info=self._is_print_detail_exception) self.__consuming_function(ch, method, properties, msg, current_retry_times + 1) else: self.logger.critical(f'達到最大重試次數 {self._max_retry_times} 後,仍然失敗') # ch.basic_ack(delivery_tag=method.delivery_tag) self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag)) if __name__ == '__main__': rabbitmq_publisher = RabbitmqPublisher('queue_test') [rabbitmq_publisher.publish(str(i)) for i in range(1000)] def f(msg): print('.... ', msg) time.sleep(10) # 模擬作某事須要10秒種。 rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20) rabbitmq_consumer.start_consuming_message()
一、放入任務 (圖片鼠標右鍵點擊新標籤打開查看原圖)app
/二、框架
二、開啓消費者,寫一個函數傳給消費者類。異步
三、併發運行效果。函數
rabbitmq這個專業的消息中間件就是比redis做爲消息中間件專業了不少。fetch