更新 python萬能消費框架,新增7種中間件(或操做mq的包)和三種併發模式。html
框架目的是分佈式調度起一切任何函數(固然也包括調度起一切任何方法)。java
以前寫的是基於rabbitmq的,做爲專用的消息隊列好處比redis的list結構好不少。但有的人仍是強烈喜歡用redis,以及rabbitmq安裝比redis麻煩點。python
如今加入reids做爲中間件的方式。(支持僅僅修改一個字母就達到全局切換使用何種中間件,其他代碼不須要作任何一處修改就能夠 正常運行)linux
使用 模板模式 加工廠模式 加策略模式(消費者調用的函數,用戶本身寫的每個被消費的函數單元都是策略函數)redis
解釋下爲何不直接用celery呢?mongodb
一、寫法過於麻煩了,運行方式也麻煩一些。編程
二、對文件的位置要求高一點,能夠設置main來解決。但仍是沒太方便。這個能夠把消費代碼隨意移動文件到任何層級的文件夾或者移到任何地方。json
三、配置項高達200個,複雜,英文文檔2000頁,我是看了5遍,太複雜了公司推廣很麻煩,不是每一個人都喜歡反覆幾千頁看好幾回。windows
這是以前的,運行方式很古怪,不喜歡命令行方式,ide不能補全愛出錯。api
celery_main = 'app.apis.list_page_live_price.live_price_celery_app'
# -*- coding: utf-8 -*- # @Author : ydf """ 類celery的worker模式,可用於一切須要分佈式併發的地方,最好是io類型的。能夠分佈式調度起一切函數。 rabbitmq生產者和消費者框架。徹底實現了celery worker模式的所有功能,使用更簡單。支持自動重試指定次數,消費確認,指定數量的併發線程,和指定頻率控制1秒鐘只運行幾回, 同時對mongodb類型的異常作了特殊處理 最開始寫得是使用pika包,非線程安全,後來加入rabbitpy,rabbitpy包推送會丟失部分數據,推薦pika包使用 單下劃線表明保護,雙下劃線表明私有。只要關注公有方法就能夠,其他是類內部自調用方法。 3月15日 1)、新增RedisConsumer 是基於redis中間件的消費框架,不支持隨意暫停程序或者斷點,會丟失一部分正在運行中的任務,推薦使用rabbitmq的方式。 get_consumer是使用工廠模式來生成基於rabbit和reids的消費者,使用不一樣中間件的消費框架更靈活一點點,只須要修改一個數字。 3月20日 2)、增長支持函數參數過濾的功能,能夠隨時放心屢次推送相同的任務到中間件,會先檢查該任務是否須要執行,避免浪費cpu和流量,加快處理速度。 基於函數參數值的過濾,須要設置 do_task_filtering 參數爲True才生效,默認爲False。 3)、新增支持了函數的參數是多個參數,須要設置is_consuming_function_use_multi_params 爲True生效,爲了兼容老代碼默認爲False。 區別是消費函數原來須要 def f(body): # 函數有且只能有一個參數,是字典的多個鍵值對來表示參數的值。 print(body['a']) print(body['b']) 如今能夠 def f(a,b): print(a) print(b) 對於推送的部分,都是同樣的,都是推送 {"a":1,"b":2} 6月3日 1) 增長了RedisPublisher類,和增長get_publisher工廠模式 方法同mqpublisher同樣,這是爲了加強一致性,之後每一個業務的推送和消費,若是不直接使用RedisPublisher RedisConsumerer RabbitmqPublisher RabbitMQConsumer這些類,而是使用get_publisher和get_consumer來獲取發佈和消費對象,支持修改一個全局變量的broker_kind數字來切換全部平臺消費和推送的中間件種類。 2)增長指定不運行的時間的配置。例如能夠白天不運行,只在晚上運行。 3)增長了函數超時的配置,當函數運行時間超過n秒後,自動殺死函數,拋出異常。 4) 增長每分鐘函數運行次數統計,和按照最近一分鐘運行函數次數來預估多久能夠運行完成當前隊列剩餘的任務。 5) 增長一個判斷函數,阻塞判斷連續多少分鐘隊列裏面是空的。判斷任務疑似完成。 6)增長一個終止消費者的標誌,設置標誌後終止循環調度消息。 7) consumer對象增長內置一個屬性,表示相同隊列名的publisher實例。 """ # import functools import abc import copy import traceback import typing import json from collections import Callable, OrderedDict import time from concurrent.futures import ThreadPoolExecutor from functools import wraps from threading import Lock, Thread import unittest import rabbitpy from pika import BasicProperties # noinspection PyUnresolvedReferences from pika.exceptions import ChannelClosed, AMQPError # from rabbitpy.message import Properties import pika from pika.adapters.blocking_connection import BlockingChannel from pymongo.errors import PyMongoError from app.utils_ydf import (LogManager, LoggerMixin, RedisMixin, BoundedThreadPoolExecutor, RedisBulkWriteHelper, RedisOperation, decorators, time_util, LoggerLevelSetterMixin, nb_print) from app import config as app_config # LogManager('pika').get_logger_and_add_handlers(10) # LogManager('pika.heartbeat').get_logger_and_add_handlers(10) # LogManager('rabbitpy').get_logger_and_add_handlers(10) # LogManager('rabbitpy.base').get_logger_and_add_handlers(10) def delete_keys_from_dict(dictx: dict, keys: list): for dict_key in keys: dictx.pop(dict_key) class ExceptionForRetry(Exception): """爲了重試的,拋出錯誤。只是定義了一個子類,用不用均可以""" class ExceptionForRequeue(Exception): """框架檢測到此錯誤,從新放回隊列中""" class ExceptionForRabbitmqRequeue(ExceptionForRequeue): # 之後去掉這個異常,拋出上面那個異常就能夠了。 """遇到此錯誤,從新放回隊列中""" class RabbitmqClientRabbitPy: """ 使用rabbitpy包。 """ # noinspection PyUnusedLocal def __init__(self, username, password, host, port, virtual_host, heartbeat=0): rabbit_url = f'amqp://{username}:{password}@{host}:{port}/{virtual_host}?heartbeat={heartbeat}' self.connection = rabbitpy.Connection(rabbit_url) def creat_a_channel(self) -> rabbitpy.AMQP: return rabbitpy.AMQP(self.connection.channel()) # 使用適配器,使rabbitpy包的公有方法幾乎接近pika包的channel的方法。 class RabbitmqClientPika: """ 使用pika包,多線程不安全的包。 """ def __init__(self, username, password, host, port, virtual_host, heartbeat=0): """ parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F') connection = pika.SelectConnection(parameters=parameters, on_open_callback=on_open) :param username: :param password: :param host: :param port: :param virtual_host: :param heartbeat: """ credentials = pika.PlainCredentials(username, password) self.connection = pika.BlockingConnection(pika.ConnectionParameters( host, port, virtual_host, credentials, heartbeat=heartbeat)) def creat_a_channel(self) -> BlockingChannel: return self.connection.channel() class RabbitMqFactory: def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60 * 10, is_use_rabbitpy=0): """ :param username: :param password: :param port: :param virtual_host: :param heartbeat: :param is_use_rabbitpy: 爲0使用pika,多線程不安全。爲1使用rabbitpy,多線程安全的包。 """ if is_use_rabbitpy: self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat) else: self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat) def get_rabbit_cleint(self): return self.rabbit_client class AbstractPublisher(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ): def __init__(self, queue_name, log_level_int=10, logger_prefix='', is_add_file_handler=True, clear_queue_within_init=False): """ :param queue_name: :param log_level_int: :param logger_prefix: :param is_add_file_handler: :param clear_queue_within_init: """ self._queue_name = queue_name if logger_prefix != '': logger_prefix += '--' logger_name = f'{logger_prefix}{self.__class__.__name__}--{queue_name}' self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level_int, log_filename=f'{logger_name}.log' if is_add_file_handler else None) # # self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint() # self.channel = self.rabbit_client.creat_a_channel() # self.queue = self.channel.queue_declare(queue=queue_name, durable=True) self._lock_for_pika = Lock() self._lock_for_count = Lock() self._current_time = None self.count_per_minute = None self._init_count() self.init_broker() self.logger.info(f'{self.__class__} 被實例化了') self.publish_msg_num_total = 0 if clear_queue_within_init: self.clear() def _init_count(self): with self._lock_for_count: self._current_time = time.time() self.count_per_minute = 0 @abc.abstractmethod def init_broker(self): pass def publish(self, msg: typing.Union[str, dict]): if isinstance(msg, dict): msg = json.dumps(msg) t_start = time.time() decorators.handle_exception(retry_times=10, is_throw_error=True, time_sleep=0.1)(self.concrete_realization_of_publish)(msg) self.logger.debug(f'向{self._queue_name} 隊列,推送消息 耗時{round(time.time() - t_start, 5)}秒 {msg}') with self._lock_for_count: self.count_per_minute += 1 self.publish_msg_num_total += 1 if time.time() - self._current_time > 10: self.logger.info(f'10秒內推送了 {self.count_per_minute} 條消息,累計推送了 {self.publish_msg_num_total} 條消息到 {self._queue_name} 中') self._init_count() @abc.abstractmethod def concrete_realization_of_publish(self, msg): raise NotImplementedError @abc.abstractmethod def clear(self): raise NotImplementedError @abc.abstractmethod def get_message_count(self): raise NotImplementedError @abc.abstractmethod def close(self): raise NotImplementedError def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() self.logger.warning(f'with中自動關閉publisher鏈接,累計推送了 {self.publish_msg_num_total} 條消息 ') def deco_mq_conn_error(f): def _inner(self, *args, **kwargs): try: return f(self, *args, **kwargs) except AMQPError as e: self.logger.error(f'rabbitmq連接出錯 ,方法 {f.__name__} 出錯 ,{e}') self.init_broker() return f(self, *args, **kwargs) return _inner class RabbitmqPublisher(AbstractPublisher): """ 使用pika實現的。 """ # noinspection PyAttributeOutsideInit def init_broker(self): self.logger.warning(f'使用pika 連接mq') self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint() self.channel = self.rabbit_client.creat_a_channel() self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) # noinspection PyAttributeOutsideInit @deco_mq_conn_error def concrete_realization_of_publish(self, msg): with self._lock_for_pika: # 親測pika多線程publish會出錯。 # if self.channel.connection.is_closed or self.channel.is_closed: # 有時候斷了。 # self.logger.critical('發佈消息,pika連接斷了 「self.channel.connection.is_closed or self.channel.is_closed 」') # self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint() # self.channel = self.rabbit_client.creat_a_channel() # self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) # import random # if random.randint(0, 3) != 1: # raise AMQPError self.channel.basic_publish(exchange='', routing_key=self._queue_name, body=msg, properties=BasicProperties( delivery_mode=2, # make message persistent 2(1是非持久化) ) ) @deco_mq_conn_error def clear(self): self.channel.queue_purge(self._queue_name) self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') @deco_mq_conn_error def get_message_count(self): queue = self.channel.queue_declare(queue=self._queue_name, durable=True) return queue.method.message_count # @deco_mq_conn_error def close(self): self.channel.close() self.rabbit_client.connection.close() self.logger.warning('關閉pika包 連接') class RabbitmqPublisherUsingRabbitpy(AbstractPublisher): """ 使用rabbitpy包實現的。 """ # noinspection PyAttributeOutsideInit def init_broker(self): self.logger.warning(f'使用rabbitpy包 連接mq') self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint() self.channel = self.rabbit_client.creat_a_channel() self.queue = self.channel.queue_declare(queue=self._queue_name, durable=True) @deco_mq_conn_error def concrete_realization_of_publish(self, msg): # noinspection PyTypeChecker self.channel.basic_publish( exchange='', routing_key=self._queue_name, body=msg, properties={'delivery_mode': 2}, ) @deco_mq_conn_error def clear(self): self.channel.queue_purge(self._queue_name) self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') @deco_mq_conn_error def get_message_count(self): # noinspection PyUnresolvedReferences ch_raw_rabbity = self.channel.channel return rabbitpy.amqp_queue.Queue(ch_raw_rabbity, self._queue_name, durable=True) # @deco_mq_conn_error def close(self): self.channel.close() self.rabbit_client.connection.close() self.logger.warning('關閉rabbitpy包 連接mq') class RedisPublisher(AbstractPublisher, RedisMixin): """ 使用redis做爲中間件 """ def init_broker(self): pass def concrete_realization_of_publish(self, msg): # noinspection PyTypeChecker self.redis_db7.rpush(self._queue_name, msg) def clear(self): self.redis_db7.delete(self._queue_name) self.logger.warning(f'清除 {self._queue_name} 隊列中的消息成功') def get_message_count(self): return self.redis_db7.llen(self._queue_name) def close(self): # self.redis_db7.connection_pool.disconnect() pass class RedisFilter(RedisMixin): def __init__(self, redis_key_name): self._redis_key_name = redis_key_name @staticmethod def _get_ordered_str(value): """對json的鍵值對在redis中進行過濾,須要先把鍵值對排序,不然過濾會不許確如 {"a":1,"b":2} 和 {"b":2,"a":1}""" if isinstance(value, str): value = json.loads(value) ordered_dict = OrderedDict() for k in sorted(value): ordered_dict[k] = value[k] return json.dumps(ordered_dict) def add_a_value(self, value: typing.Union[str, dict]): self.redis_db7.sadd(self._redis_key_name, self._get_ordered_str(value)) def check_value_exists(self, value): return self.redis_db7.sismember(self._redis_key_name, self._get_ordered_str(value)) class AbstractConsumer(LoggerLevelSetterMixin, metaclass=abc.ABCMeta, ): shedual_task_thread_for_join_on_linux_multiprocessing = list() time_interval_for_check_do_not_run_time = 60 BROKER_KIND = None @property @decorators.synchronized def publisher_of_same_queue(self): if not self._publisher_of_same_queue: self._publisher_of_same_queue = get_publisher(self._queue_name, broker_kind=self.BROKER_KIND) return self._publisher_of_same_queue @classmethod def join_shedual_task_thread(cls): """ :return: """ """ def ff(): RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() AbstractConsumer.join_shedual_task_thread() # 若是開多進程啓動消費者,在linux上須要這樣寫下這一行。 if __name__ == '__main__': [Process(target=ff).start() for _ in range(4)] """ for t in cls.shedual_task_thread_for_join_on_linux_multiprocessing: t.join() def __init__(self, queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool: ThreadPoolExecutor = None, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True, is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), schedule_tasks_on_main_thread=False): """ :param queue_name: :param consuming_function: 處理消息的函數。 :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。爲0是不限制。 :param threads_num: :param specify_threadpool:使用指定的線程池,能夠多個消費者共使用一個線程池,不爲None時候。threads_num失效 :param max_retry_times: :param log_level: :param is_print_detail_exception: :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻 :param logger_prefix: 日誌前綴,可以使不一樣的消費者生成不一樣的日誌 :param create_logger_file : 是否建立文件日誌 :param do_task_filtering :是否執行基於函數參數的任務過濾 :is_consuming_function_use_multi_params 函數的參數是不是傳統的多參數,不爲單個body字典表示多個參數。 :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效 :param do_not_run_by_specify_time :不運行的時間段 :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啓兩個消費者。 """ self._queue_name = queue_name self.consuming_function = consuming_function self._function_timeout = function_timeout self._threads_num = threads_num self.threadpool = specify_threadpool if specify_threadpool else BoundedThreadPoolExecutor(threads_num + 1) # 單獨加一個檢測消息數量和心跳的線程 self._max_retry_times = max_retry_times self._is_print_detail_exception = is_print_detail_exception self._msg_schedule_time_intercal = msg_schedule_time_intercal self._logger_prefix = logger_prefix self._log_level = log_level if logger_prefix != '': logger_prefix += '--' logger_name = f'{logger_prefix}{self.__class__.__name__}--{queue_name}' self.logger = LogManager(logger_name).get_logger_and_add_handlers(log_level, log_filename=f'{logger_name}.log' if create_logger_file else None) self.logger.info(f'{self.__class__} 被實例化') self._do_task_filtering = do_task_filtering self._redis_filter_key_name = f'filter:{queue_name}' self._redis_filter = RedisFilter(self._redis_filter_key_name) self._is_consuming_function_use_multi_params = is_consuming_function_use_multi_params self._lock_for_pika = Lock() self._execute_task_times_every_minute = 0 # 每分鐘執行了多少次任務。 self._lock_for_count_execute_task_times_every_minute = Lock() self._current_time_for_execute_task_times_every_minute = time.time() self._msg_num_in_broker = 0 self._last_timestamp_when_has_task_in_queue = 0 self._last_timestamp_print_msg_num = 0 self._is_do_not_run_by_specify_time_effect = is_do_not_run_by_specify_time_effect self._do_not_run_by_specify_time = do_not_run_by_specify_time # 能夠設置在指定的時間段不運行。 self._schedule_tasks_on_main_thread = schedule_tasks_on_main_thread self.stop_flag = False self._publisher_of_same_queue = None def keep_circulating(self, time_sleep=0.001, exit_if_function_run_sucsess=False, is_display_detail_exception=True): """間隔一段時間,一直循環運行某個方法的裝飾器 :param time_sleep :循環的間隔時間 :param is_display_detail_exception :param exit_if_function_run_sucsess :若是成功了就退出循環 """ def _keep_circulating(func): # noinspection PyBroadException @wraps(func) def __keep_circulating(*args, **kwargs): while 1: if self.stop_flag: break try: result = func(*args, **kwargs) if exit_if_function_run_sucsess: return result except Exception as e: msg = func.__name__ + ' 運行出錯\n ' + traceback.format_exc(limit=10) if is_display_detail_exception else str(e) self.logger.error(msg) finally: time.sleep(time_sleep) return __keep_circulating return _keep_circulating def start_consuming_message(self): # self.threadpool.submit(decorators.keep_circulating(20)(self.check_heartbeat_and_message_count)) self.threadpool.submit(self.keep_circulating(20)(self.check_heartbeat_and_message_count)) if self._schedule_tasks_on_main_thread: # decorators.keep_circulating(1)(self._shedual_task)() self.keep_circulating(1)(self._shedual_task)() else: # t = Thread(target=decorators.keep_circulating(1)(self._shedual_task)) t = Thread(target=self.keep_circulating(1)(self._shedual_task)) self.__class__.shedual_task_thread_for_join_on_linux_multiprocessing.append(t) t.start() @abc.abstractmethod def _shedual_task(self): raise NotImplementedError def _run_consuming_function_with_confirm_and_retry(self, kw: dict, current_retry_times=0): if self._do_task_filtering and self._redis_filter.check_value_exists(kw['body']): # 對函數的參數進行檢查,過濾已經執行過而且成功的任務。 self.logger.info(f'redis的 [{self._redis_filter_key_name}] 鍵 中 過濾任務 {kw["body"]}') self._confirm_consume(kw) return with self._lock_for_count_execute_task_times_every_minute: self._execute_task_times_every_minute += 1 if time.time() - self._current_time_for_execute_task_times_every_minute > 60: self.logger.info( f'一分鐘內執行了 {self._execute_task_times_every_minute} 次函數 [ {self.consuming_function.__name__} ] ,預計' f'還須要 {time_util.seconds_to_hour_minute_second(self._msg_num_in_broker / self._execute_task_times_every_minute * 60)} 時間' f'才能執行完成 {self._msg_num_in_broker}個剩餘的任務 ') self._current_time_for_execute_task_times_every_minute = time.time() self._execute_task_times_every_minute = 0 if current_retry_times < self._max_retry_times + 1: # noinspection PyBroadException t_start = time.time() try: function_run = self.consuming_function if self._function_timeout == 0 else decorators.timeout(self._function_timeout)(self.consuming_function) if self._is_consuming_function_use_multi_params: # 消費函數使用傳統的多參數形式 function_run(**kw['body']) else: function_run(kw['body']) # 消費函數使用單個參數,參數自身是一個字典,由鍵值對錶示各個參數。 self._confirm_consume(kw) if self._do_task_filtering: self._redis_filter.add_a_value(kw['body']) # 函數執行成功後,添加函數的參數排序後的鍵值對字符串到set中。 self.logger.debug(f'函數 {self.consuming_function.__name__} 第{current_retry_times + 1}次 運行, 正確了,函數運行時間是 {round(time.time() - t_start, 2)} 秒,入參是 【 {kw["body"]} 】') except Exception as e: if isinstance(e, (PyMongoError, ExceptionForRequeue)): # mongo常常維護備份時候插入不了或掛了,或者本身主動拋出一個ExceptionForRequeue類型的錯誤會從新入隊,不受指定重試次數逇約束。 self.logger.critical(f'函數 [{self.consuming_function.__name__}] 中發生錯誤 {type(e)} {e}') return self._requeue(kw) self.logger.error(f'函數 {self.consuming_function.__name__} 第{current_retry_times + 1}次發生錯誤,函數運行時間是 {round(time.time() - t_start, 2)} 秒,\n 入參是 【 {kw["body"]} 】 \n 緣由是 {type(e)} ', exc_info=self._is_print_detail_exception) self._run_consuming_function_with_confirm_and_retry(kw, current_retry_times + 1) else: self.logger.critical(f'函數 {self.consuming_function.__name__} 達到最大重試次數 {self._max_retry_times} 後,仍然失敗, 入參是 【 {kw["body"]} 】') # 錯得超過指定的次數了,就確認消費了。 self._confirm_consume(kw) @abc.abstractmethod def _confirm_consume(self, kw): """確認消費""" raise NotImplementedError # noinspection PyUnusedLocal def check_heartbeat_and_message_count(self): self._msg_num_in_broker = self.publisher_of_same_queue.get_message_count() if time.time() - self._last_timestamp_print_msg_num > 60: self.logger.info(f'[{self._queue_name}] 隊列中還有 [{self._msg_num_in_broker}] 個任務') self._last_timestamp_print_msg_num = time.time() if self._msg_num_in_broker != 0: self._last_timestamp_when_has_task_in_queue = time.time() return self._msg_num_in_broker @abc.abstractmethod def _requeue(self, kw): """從新入隊""" raise NotImplementedError def _submit_task(self, kw): if self._judge_is_daylight(): self._requeue(kw) time.sleep(self.time_interval_for_check_do_not_run_time) return self.threadpool.submit(self._run_consuming_function_with_confirm_and_retry, kw) def _judge_is_daylight(self): if self._is_do_not_run_by_specify_time_effect and self._do_not_run_by_specify_time[0] < time_util.DatetimeConverter().time_str < self._do_not_run_by_specify_time[1]: self.logger.warning(f'如今時間是 {time_util.DatetimeConverter()} ,如今時間是在 {self._do_not_run_by_specify_time} 之間,不運行') return True def wait_for_possible_has_finish_all_tasks(self, minutes: int, mannu_call_check_heartbeat_and_message_count=False, stop_flag=0): """ 因爲是異步消費,和存在隊列一邊被消費,一邊在推送,或者還有結尾少許任務還在確認消費者實際還沒完全運行完成。 但有時候須要判斷 全部任務,務是否完成,提供一個不精確的判斷,要搞清楚緣由和場景後再慎用。 :param minutes 連續多少分鐘沒任務就判斷爲消費已完成 :param mannu_call_check_heartbeat_and_message_count 若是消費者沒有執行startconsuming,須要手動調用這個方法 :param stop_flag 設置中止標誌。中止當前實例無限循環調度消息。 :return: """ if minutes <= 1: raise ValueError('疑似完成任務,判斷時間最少須要設置爲2分鐘內,每隔20秒檢測一次都是0個任務,') if mannu_call_check_heartbeat_and_message_count: self.threadpool = BoundedThreadPoolExecutor(2) self.threadpool.submit(self.keep_circulating(20)(self.check_heartbeat_and_message_count)) while True: if minutes * 60 < time.time() - self._last_timestamp_when_has_task_in_queue < 3650 * 24 * 60 * 60: # 初次時間戳是0,確保不是無限大。 # print(self._last_timestamp_print_msg_num) self.logger.warning(f'最後一次有任務的時間是{time_util.DatetimeConverter(self._last_timestamp_when_has_task_in_queue)},已經有 {minutes} 分鐘沒有任務了,疑似完成。') self.stop_flag = stop_flag if self.stop_flag: self.logger.warning('當前實例退出循環調度消息') break else: time.sleep(30) """ continuou_no_task_times = 0 check_interval_time = 10 while True: try: msg_num_in_broker = self.check_heartbeat_and_message_count() except Exception: msg_num_in_broker = 9999 if msg_num_in_broker == 0: continuou_no_task_times += 1 else: continuou_no_task_times = 0 if continuou_no_task_times >= minutes * (60//check_interval_time): break time.sleep(check_interval_time) """ class RabbitmqConsumer(AbstractConsumer): """ 使用pika包實現的。 """ BROKER_KIND = 0 def _shedual_task_old(self): channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) def callback(ch, method, properties, body): body = body.decode() self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是: {body}') time.sleep(self._msg_schedule_time_intercal) body = json.loads(body) kw = {'ch': ch, 'method': method, 'properties': properties, 'body': body} self._submit_task(kw) if self.stop_flag: ch.close() # 使start_consuming結束。 channel.basic_consume(callback, queue=self._queue_name, # no_ack=True ) channel.start_consuming() def _shedual_task(self): channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel() channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) while True: if self.stop_flag: return method, properties, body = channel.basic_get(self._queue_name, no_ack=False) if body is None: time.sleep(0.001) else: body = body.decode() self.logger.debug(f'從rabbitmq的 [{self._queue_name}] 隊列中 取出的消息是: {body}') body = json.loads(body) kw = {'ch': channel, 'method': method, 'properties': properties, 'body': body} self._submit_task(kw) time.sleep(self._msg_schedule_time_intercal) def _confirm_consume(self, kw): with self._lock_for_pika: kw['ch'].basic_ack(delivery_tag=kw['method'].delivery_tag) # 確認消費 def _requeue(self, kw): with self._lock_for_pika: # ch.connection.add_callback_threadsafe(functools.partial(self.__ack_message_pika, ch, method.delivery_tag)) return kw['ch'].basic_nack(delivery_tag=kw['method'].delivery_tag) # 當即從新入隊。 @staticmethod def __ack_message_pika(channelx, delivery_tagx): """Note that `channel` must be the same pika channel instance via which the message being ACKed was retrieved (AMQP protocol constraint). """ if channelx.is_open: channelx.basic_ack(delivery_tagx) else: # Channel is already closed, so we can't ACK this message; # log and/or do something that makes sense for your app in this case. pass class RabbitmqConsumerRabbitpy(AbstractConsumer): """ 使用rabbitpy實現的 """ BROKER_KIND = 1 def _shedual_task(self): # noinspection PyTypeChecker channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel() # type: rabbitpy.AMQP # channel.queue_declare(queue=self._queue_name, durable=True) channel.basic_qos(prefetch_count=self._threads_num) for message in channel.basic_consume(self._queue_name, no_ack=False): body = message.body.decode() self.logger.debug(f'從rabbitmq {self._queue_name} 隊列中 取出的消息是: {body}') time.sleep(self._msg_schedule_time_intercal) kw = {'message': message, 'body': json.loads(message.body.decode())} if self.stop_flag: return # channel.channel.close() self._submit_task(kw) def _confirm_consume(self, kw): kw['message'].ack() def _requeue(self, kw): kw['message'].nack(requeue=True) class RedisConsumer(AbstractConsumer, RedisMixin): """ redis做爲中間件實現的。 """ BROKER_KIND = 2 def _shedual_task_old(self): while True: t_start = time.time() task_bytes = self.redis_db7.blpop(self._queue_name)[1] # 使用db7 if task_bytes: task_dict = json.loads(task_bytes) # noinspection PyProtectedMember self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 2)} 消息是: {task_bytes.decode()} ') time.sleep(self._msg_schedule_time_intercal) kw = {'body': task_dict} if self.stop_flag: return self._submit_task(kw) def _shedual_task(self): # 這樣容易控制退出消費循環。 while True: if self.stop_flag: return t_start = time.time() task_bytes = self.redis_db7.lpop(self._queue_name) # 使用db7 if task_bytes: task_dict = json.loads(task_bytes) # noinspection PyProtectedMember self.logger.debug(f'取出的任務時間是 {round(time.time() - t_start, 2)} 消息是: {task_bytes.decode()} ') kw = {'body': task_dict} self._submit_task(kw) else: time.sleep(0.001) time.sleep(self._msg_schedule_time_intercal) def _confirm_consume(self, kw): pass # redis沒有確認消費的功能。 def _requeue(self, kw): self.redis_db7.rpush(self._queue_name, json.dumps(kw['body'])) def get_publisher(queue_name, *, log_level_int=10, logger_prefix='', is_add_file_handler=False, clear_queue_within_init=False, broker_kind=0): """ :param queue_name: :param log_level_int: :param logger_prefix: :param is_add_file_handler: :param clear_queue_within_init: :param broker_kind: 中間件或使用包的種類。 :return: """ all_kwargs = copy.deepcopy(locals()) all_kwargs.pop('broker_kind') if broker_kind == 0: return RabbitmqPublisher(**all_kwargs) elif broker_kind == 1: return RabbitmqPublisherUsingRabbitpy(**all_kwargs) elif broker_kind == 2: return RedisPublisher(**all_kwargs) else: raise ValueError('設置的中間件種類數字不正確') def get_consumer(queue_name, *, consuming_function: Callable = None, function_timeout=0, threads_num=50, specify_threadpool: ThreadPoolExecutor = None, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, logger_prefix='', create_logger_file=True, do_task_filtering=False, is_consuming_function_use_multi_params=True, is_do_not_run_by_specify_time_effect=False, do_not_run_by_specify_time=('10:00:00', '22:00:00'), schedule_tasks_on_main_thread=False, broker_kind=0): """ 使用工廠模式再包一層,經過設置數字來生成基於不一樣中間件或包的consumer。 :param queue_name: :param consuming_function: 處理消息的函數。 :param function_timeout : 超時秒數,函數運行超過這個時間,則自動殺死函數。爲0是不限制。 :param threads_num: :param specify_threadpool:使用指定的線程池,能夠多個消費者共使用一個線程池,不爲None時候。threads_num失效 :param max_retry_times: :param log_level: :param is_print_detail_exception: :param msg_schedule_time_intercal:消息調度的時間間隔,用於控頻 :param logger_prefix: 日誌前綴,可以使不一樣的消費者生成不一樣的日誌 :param create_logger_file : 是否建立文件日誌 :param do_task_filtering :是否執行基於函數參數的任務過濾 :param is_consuming_function_use_multi_params 函數的參數是不是傳統的多參數,不爲單個body字典表示多個參數。 :param is_do_not_run_by_specify_time_effect :是否使不運行的時間段生效 :param do_not_run_by_specify_time :不運行的時間段 :param schedule_tasks_on_main_thread :直接在主線程調度任務,意味着不能直接在當前主線程同時開啓兩個消費者。 :param broker_kind:中間件種類 :return """ all_kwargs = copy.copy(locals()) all_kwargs.pop('broker_kind') if broker_kind == 0: return RabbitmqConsumer(**all_kwargs) elif broker_kind == 1: return RabbitmqConsumerRabbitpy(**all_kwargs) elif broker_kind == 2: return RedisConsumer(**all_kwargs) else: raise ValueError('設置的中間件種類數字不正確') # noinspection PyMethodMayBeStatic,PyShadowingNames class _Test(unittest.TestCase, LoggerMixin, RedisMixin): """ 演示一個簡單求和的例子。 """ @unittest.skip def test_publisher_with(self): """ 測試上下文管理器。 :return: """ with RabbitmqPublisher('queue_test') as rp: for i in range(1000): rp.publish(str(i)) @unittest.skip def test_publish_rabbit(self): """ 測試mq推送 :return: """ rabbitmq_publisher = RabbitmqPublisher('queue_test', log_level_int=10, logger_prefix='yy平臺推送') rabbitmq_publisher.clear() for i in range(500000): try: time.sleep(1) rabbitmq_publisher.publish({'a': i, 'b': 2 * i}) except Exception as e: print(e) rabbitmq_publisher = RabbitmqPublisher('queue_test2', log_level_int=20, logger_prefix='zz平臺推送') rabbitmq_publisher.clear() [rabbitmq_publisher.publish({'somestr_to_be_print': str(i)}) for i in range(500000)] @unittest.skip def test_publish_redis(self): # 若是須要批量推送 for i in range(10007): # 最犀利的批量操做方式,自動聚合多條redis命令,支持多種redis混合命令批量操做。 RedisBulkWriteHelper(self.redis_db7, 1000).add_task(RedisOperation('lpush', 'queue_test', json.dumps({'a': i, 'b': 2 * i}))) [self.redis_db7.lpush('queue_test', json.dumps({'a': j, 'b': 2 * j})) for j in range(500)] print('推送完畢') @unittest.skip def test_consume(self): """ 單參數表明全部傳參 :return: """ def f(body): self.logger.info(f'消費此消息 {body}') # print(body['a'] + body['b']) time.sleep(5) # 模擬作某事須要阻塞10秒種,必須用併發。 # 把消費的函數名傳給consuming_function,就這麼簡單。 rabbitmq_consumer = RabbitmqConsumer('queue_test', consuming_function=f, threads_num=20, msg_schedule_time_intercal=0.5, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=False) rabbitmq_consumer.start_consuming_message() @unittest.skip def test_consume2(self): """ 測試支持傳統參數形式,不是用一個字典裏面包含全部參數。 :return: """ def f2(a, b): self.logger.debug(f'a的值是 {a}') self.logger.debug(f'b的值是 {b}') print(f'{a} + {b} 的和是 {a + b}') time.sleep(3) # 模擬作某事須要阻塞10秒種,必須用併發。 # 把消費的函數名傳給consuming_function,就這麼簡單。 RabbitmqConsumer('queue_test', consuming_function=f2, threads_num=60, msg_schedule_time_intercal=5, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() @unittest.skip def test_redis_filter(self): """ 測試基於redis set結構的過濾器。 :return: """ redis_filter = RedisFilter('abcd') redis_filter.add_a_value({'a': 1, 'c': 3, 'b': 2}) redis_filter.check_value_exists({'a': 1, 'c': 3, 'b': 2}) redis_filter.check_value_exists({'a': 1, 'b': 2, 'c': 3}) with decorators.TimerContextManager(): print(redis_filter.check_value_exists('{"a": 1, "b": 2, "c": 3}')) with decorators.TimerContextManager(): # 實測百萬元素的set,過濾檢查不須要1毫秒,通常最多100萬個酒店。 print(RedisFilter('filter:mafengwo-detail_task').check_value_exists({"_id": "69873340"})) @unittest.skip def test_run_two_function(self): # 演示連續運行兩個consumer def f3(a, b): print(f'{a} + {b} = {a + b}') time.sleep(10) # 模擬作某事須要阻塞10秒種,必須用併發。 def f4(somestr_to_be_print): print(f'打印 {somestr_to_be_print}') time.sleep(20) # 模擬作某事須要阻塞10秒種,必須用併發。 RabbitmqConsumer('queue_test', consuming_function=f3, threads_num=20, msg_schedule_time_intercal=2, log_level=10, logger_prefix='yy平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() RabbitmqConsumer('queue_test2', consuming_function=f4, threads_num=20, msg_schedule_time_intercal=4, log_level=10, logger_prefix='zz平臺消費', is_consuming_function_use_multi_params=True).start_consuming_message() # AbstractConsumer.join_shedual_task_thread() # @unittest.skip def test_factory_pattern_consumer(self): """ 測試工廠模式來生成消費者 :return: """ def f2(a, b): # body_dict = json.loads(body) self.logger.info(f'消費此消息 {a} {b} ,結果是 {a+b}') # print(body_dict['a'] + body_dict['b']) time.sleep(2) # 模擬作某事須要阻塞10秒種,必須用併發。 # 把消費的函數名傳給consuming_function,就這麼簡單。 consumer = get_consumer('queue_test5', consuming_function=f2, threads_num=30, msg_schedule_time_intercal=1, log_level=10, logger_prefix='zz平臺消費', function_timeout=20, is_print_detail_exception=True, broker_kind=0) # 經過設置broker_kind,一鍵切換中間件爲mq或redis consumer.publisher_of_same_queue.clear() [consumer.publisher_of_same_queue.publish({'a': i, 'b': 2 * i}) for i in range(80)] consumer.start_consuming_message() # consumer.stop_flag = 1 # 原則是不須要關閉消費,一直在後臺等待任務,循環調度消息。若是須要關閉可使用下面。 nb_print('判斷完成阻塞中。。。') consumer.wait_for_possible_has_finish_all_tasks(2, stop_flag=1) nb_print('這一行要等疑似結束判斷,才能運行。。。') if __name__ == '__main__': # noinspection PyArgumentList unittest.main(sleep_time=1)
一、雖然實現這樣的萬能異步分佈式框架代碼很長,代碼看起來有點複雜(若是真正的懂oop,看起來就不復雜,裏面用了大量 模板模式 工廠模式 裝飾器 等)。但使用卻極其簡單。核心就是定義了一個函數,只須要把函數傳給這個Consumer類的初始化方法,並和隊列名綁定,就能夠一行代碼實現分佈式消費了。Consumer類的實例初始化參數只有2個是最本質核心,分別是隊列名字和函數,其他的參數全是輔助功能。目前框架已用於多個平臺以及線上生產項目中,至關穩定。
個人寫代碼理念是作任何事,最好儘量先設計好想好,而後抽取可複用流程或框架。我但願只麻煩、 很複雜一次,而不是使用無限複製粘貼扣字的作法來麻煩 複雜無數次。
二、使用這個框架,能大大簡化一切須要分佈式的代碼,使你在寫任何須要分佈式的項目和平臺時候,都不須要關心分佈式自己,只須要專一於寫好函數,寫完後,直接把函數和隊列名綁定,實例化一個consumer實例,而後執行start_consuming_message方法就能夠。
三、有人有疑惑爲何反覆強調的是函數?類行不行?
這個調度的本質是從中間件隊列中取到一個消息,消息是json形式,例如取到的消息是 {"a":1,"b":2},
一個函數是 def add(a,b):
print(a + b)
那麼框架自動使用 add(** {"a":1,"b":2}) 的參數來調用 add函數。若是是類,很難判斷到底哪些參數傳給實例的初始化方法,哪些傳給其餘方法。
若是是這樣的形式 Aclass(x).fun(y), 那再用函數包裝一層就能夠了,
例如 def ffff(x,y):
Aclass(x).fun(y)
而後把ffff做爲函數傳給consumer的初始化方法就能夠了。
雖然實現框架用了不少類,但消費的不使用類的緣由,
1)使用類,那就是有狀態的,分佈式最好是無狀態的,函數更好。
2)再者通常這裏面的consuming_function要簡單,每一個函數只要作一件簡單的事,而後由框架無限次循環調度。若是一個函數作的事情太大,一個函數內部啥都幹了,一個函數運行須要持續幾十分鐘幾個小時,那分佈式就成了廢物,能夠吧這個巨大的函數任務弄成邊消費邊推送,分解成不少細粒度的任務。分佈式就是要消費大量細粒度的任務,使每臺機器都有機會消費),弄錯一個細粒度的任務,重試時候不至於形成巨大代價。因此簡單細粒度的任務通常也不須要用類。
3)再者 celery裝飾器也是加在函數上,celery的任務也是函數單元,因此是不須要類的。
四、分佈式爲何重要?
即便是隻有一臺機器,作分佈式也很重要
a/ 這能夠保存未消費的消息,中止腳本能夠繼續接着運行未消耗的任務。
b/而且支持使用多進程而不須要考慮進程間通訊,
c/並且支持重複啓動同一個腳本10次,使這十個運行中的腳本都有機會消費任務
d/python有個垃圾的地方是隻能使用單核,若是隻使用單進程,那32核linux電腦原本能夠達到3200%的cpu使用率,但python因爲設計的緣由,python程序繁忙獲得天了也只能把cpu消耗到100%,這一點在linux使用top命令查看能夠證明,一個python腳本即便再忙碌運算量再大,這個linux進程絕對不會超過120%cpu使用率,java能把cpu消耗到3000%均可以,因此須要使用多進程或屢次啓動來充分使用機器cpu。若是沒有分佈式,一個腳本在消耗任務,別的腳本又不知道這個腳本還須要作哪些任務,大大的浪費cpu空閒資源,python是解釋性語言性能原本就垃圾再加上單進程不能充分使用多核優點,形成了py性能雪上加霜,比java性能差了50倍,比c語言速度差了100倍。因此python比其餘語言更須要分佈式了。
若是是winwods用戶也能夠證明,好比你電腦是i5 四核的,windows的cpu總數是100%使用率(和linux的統計不同),即便你寫個腳本反覆計算運行100次1加到10億,這毫無疑問會形成cpu很忙碌,但python的設計緣由,你的python消耗的cpu會是25%。總之就是不管 在linux仍是windows,python都不能 充分利用cpu,因此須要分佈式,便於多進程消費。
這個框架最好是用於io任務非純cpu計算的,但即便是io任務,因爲python性能不好勁,作一樣的事情要消耗比c語言更高的cpu使用率更長的運行時間,即便是io任務python也會消耗不少cpu,因此io任務一般也須要使用多進程來充分使用cpu,因此分佈式很重要。這個Conusmer類不只是能夠分佈式,還提供了標題中的另外10種功能。
運行結果是這樣。
若是設置msg_schedule_time_intervel 爲0.2和0.5,能夠發現,的確是作到了控頻。精確地控制了每秒執行5次和2次的速度。
測試任務過時
也可使用java發消息,py來運行。默認使用json來序列化和反序列化消息。因此推送的消息必須是簡單的,不要把一個自定義類型的對象做爲消費函數的入參,json鍵的值必須是簡單類型,例如 數字 字符串 數組 字典這種。不能夠是不可被json序列化的自定義類型的對象。用json序列化已經知足全部場景了, picke序列化更強,但仍然有一些自定義類型的對象的實例屬性因爲是一個不可被序列化的東西,picke解決不了,這種東西例如self.r = Redis() ,不能夠序列化,就算能序列化也是要用一串很長的東西來表示這種屬性,致使中間件要存儲很大的東西傳輸效率會下降,這種徹底可使用json來解決,例如指定ip 和端口,在消費函數內部來使用redis。因此用json必定能夠知足一切傳參場景。
若是是使用celery,因爲推送時候要讀取項目配置,java和python基本不能配合。這一點能夠從消息裏面的結構能夠證明,由於celery的消息包括了函數參數、celery項目的配置、裝飾器的參數配置。
celery主要核心使用理念是在函數上加入裝飾器,裝飾器指定任務的路由,或者在獨立的配置中指定路由。而後調用 函數名.delay(x,y),這樣消費和發佈都是自動使用同一個隊列了。很魔術,但做用也不是很大,黑魔法實在python的ide裏面代碼是不能自動補全提示的,由於用了元編程,是一種動態的,pycharm只能解析死語法。