rabbitmq 生產者 消費者(多個線程消費同一個隊列裏面的任務。) 一個通用rabbitmq消費確認,快速併發運行的框架。

rabbitmq做爲消息隊列能夠有消息消費確認機制,以前寫個基於redis的通用生產者 消費者 併發框架,redis的list結構能夠簡單充當消息隊列,但不具有消費確認機制,隨意關停程序,會丟失一部分正在程序中處理但還沒執行完的消息。基於redis的與基於rabbitmq相比對消息消費速度和消息數量沒有自然的支持。redis

使用rabbitmq的最經常使用庫pika多線程

 

 無論是寫代碼仍是運行起來都比celery使用更簡單,基本可以知足絕大多數場景使用,用來取代celery  worker模式(celery有三個模式,worker模式最經常使用,其他是定時和間隔時間兩種模式)的後臺異步的做用。併發

 

 

# coding=utf-8
"""
一個通用的rabbitmq生產者和消費者。使用多個線程消費同一個消息隊列。
"""
from collections import Callable
import functools
import time
from threading import Lock
from pika import BasicProperties
# noinspection PyUnresolvedReferences
from app.utils_ydf import (LoggerMixin, LogManager, decorators, RabbitMqHelper, BoundedThreadPoolExecutor)


class RabbitmqPublisher(LoggerMixin):
    def __init__(self, queue_name, log_level_int=1):
        self._queue_name = queue_name
        self.logger.setLevel(log_level_int * 10)
        channel = RabbitMqHelper().creat_a_channel()
        channel.queue_declare(queue=queue_name, durable=True)
        self.channel = channel
        self.lock = Lock()
        self._current_time = None
        self.count_per_minute = None
        self._init_count()
        self.logger.info(f'{self.__class__} 被實例化了')

    def _init_count(self):
        self._current_time = time.time()
        self.count_per_minute = 0

    def publish(self, msg):
        with self.lock:  # 親測pika多線程publish會出錯。
            self.channel.basic_publish(exchange='',
                                       routing_key=self._queue_name,
                                       body=msg,
                                       properties=BasicProperties(
                                           delivery_mode=2,  # make message persistent
                                       )
                                       )
            self.logger.debug(f'放入 {msg} 到 {self._queue_name} 隊列中')
            self.count_per_minute += 1
            if time.time() - self._current_time > 60:
                self._init_count()
                self.logger.info(f'一分鐘內推送了 {self.count_per_minute} 條消息到 {self.channel.connection} 中')


class RabbitmqConsumer(LoggerMixin):
    def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=1, is_print_detail_exception=True):
        """
        :param queue_name:
        :param consuming_function: 處理消息的函數,函數有且只能有一個參數,參數表示消息。是爲了簡單,放棄策略和模板來強制參數。
        :param threads_num:
        :param max_retry_times:
        :param log_level:
        :param is_print_detail_exception:
        """
        self._queue_name = queue_name
        self.consuming_function = consuming_function
        self.threadpool = BoundedThreadPoolExecutor(threads_num)
        self._max_retry_times = max_retry_times
        self.logger.setLevel(log_level * 10)
        self.logger.info(f'{self.__class__} 被實例化')
        self._is_print_detail_exception = is_print_detail_exception
        self.rabbitmq_helper = RabbitMqHelper(heartbeat_interval=30)
        channel = self.rabbitmq_helper.creat_a_channel()
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=threads_num)
        self.channel = channel
        LogManager('pika.heartbeat').get_logger_and_add_handlers(1)

    @decorators.keep_circulating(1)  # 是爲了保證不管rabbitmq異常中斷多久,無需重啓程序就能保證恢復後,程序正常。
    def start_consuming_message(self):
        def callback(ch, method, properties, body):
            msg = body.decode()
            self.logger.debug(f'從rabbitmq取出的消息是:  {msg}')
            # ch.basic_ack(delivery_tag=method.delivery_tag)
            self.threadpool.submit(self.__consuming_function, ch, method, properties, msg)

        self.channel.basic_consume(callback,
                                   queue=self._queue_name,
                                   # no_ack=True
                                   )
        self.channel.start_consuming()

    @staticmethod
    def ack_message(channelx, delivery_tagx):
        """Note that `channel` must be the same pika channel instance via which
        the message being ACKed was retrieved (AMQP protocol constraint).
        """
        if channelx.is_open:
            channelx.basic_ack(delivery_tagx)
        else:
            # Channel is already closed, so we can't ACK this message;
            # log and/or do something that makes sense for your app in this case.
            pass

    def __consuming_function(self, ch, method, properties, msg, current_retry_times=0):
        if current_retry_times < self._max_retry_times:
            # noinspection PyBroadException
            try:
                self.consuming_function(msg)
                # ch.basic_ack(delivery_tag=method.delivery_tag)
                self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
            except Exception as e:
                self.logger.error(f'函數 {self.consuming_function}  第{current_retry_times+1}次發生錯誤,\n 緣由是{e}', exc_info=self._is_print_detail_exception)
                self.__consuming_function(ch, method, properties, msg, current_retry_times + 1)
        else:
            self.logger.critical(f'達到最大重試次數 {self._max_retry_times} 後,仍然失敗')
            # ch.basic_ack(delivery_tag=method.delivery_tag)
            self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))


if __name__ == '__main__':
    rabbitmq_publisher = RabbitmqPublisher('queue_test')
    [rabbitmq_publisher.publish(str(i)) for i in range(1000)]


    def f(msg):
        print('....  ', msg)
        time.sleep(10)  # 模擬作某事須要10秒種。


    rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20)
    rabbitmq_consumer.start_consuming_message()

 

 

一、放入任務 (圖片鼠標右鍵點擊新標籤打開查看原圖)app

  /二、框架

二、開啓消費者,寫一個函數傳給消費者類。異步

 

 三、併發運行效果。函數

 

 

rabbitmq這個專業的消息中間件就是比redis做爲消息中間件專業了不少。fetch

相關文章
相關標籤/搜索