使用redis原生list結構做爲消息隊列取代celery框架。

一、web後臺對大批量的繁重的io任務須要解耦使用分佈式異步技術,不然會使接口阻塞,併發延遲,通常就選celery好了。此篇的取代主要是針對取代celery的worker模式。沒有涉及到週期和定時模式。web

 二、對我來講celery提供了  分佈式,任務路由,超時殺死,任務過時丟棄,任務限速,併發模型選擇,併發池大小這些功能。redis

 

三、此篇除了併發模型固定爲了線程模式,其他的特色都實現了。基本上的代碼複用了以前使用celery框架的代碼,只有任務調度變了,因此從celery改成自定義只花了3小時就改過來了。json

四、具體是先實現基本骨架,而後使用23種設計模式中的模板模式繼承基類,實現其中一個方法。也就是原來被celery 的@app.task裝飾的東西,如今改成了繼承和重寫基類方法。設計模式

 

 五、若是須要使用celery的進程工做模式,能夠在import以後加一行ThreadPoolExcutor = ProcessPoolExecutor,就能很容易換成進程模式了。api

若是須要使用celery的gevent工做模式,能夠import gevent ,而後monkey.patch_all()架構

 

 

# -*- coding: utf-8 -*-
# @Author  : ydf
"""
用來取代celery框架的,
改成使用自定義架構
"""
import typing
import abc
import threading
from multiprocessing import Process
import json
import time

from app.utils_ydf import BoundedThreadPoolExecutor, RedisMixin, decorators, LoggerMixin, LogManager
from app.apis.list_page_live_price.live_price_celery_app import live_price_deco, bulk_price_live_deco
from app.constant import icon_list


from app.apis.cnbooking.cnbooking_core import CnbookingHotelPriceQuerier, CnbookingHotelPriceQuerierInternational
from app.apis.daolv.hotel_detail import query_hoteldetail_price
# from app.apis.elong.elong_detail_priceinfo2 import detail_priceinfo
from app.apis.elongin.elong_in_detail import elong
from app.apis.haoqiao.core import search2
from app.apis.jltour.jl_price import get_jl_tour_price
from app.apis.qunar.core import getPrice_in, getPrice
from app.apis.yingli.core import get_detial


# from app.apis.expedia.expedia_hotel_price import get_expedia_price
from app.apis.ctrip.ctriphotelm import ctripPriceIn, ctripPrice

# 導入批量獲取比價的函數
from app.apis.elong.elong_cn_bulk_request import elong_cn_bulk_request_price
from app.apis.jltour.jl_bulk_price_querier import JltourBulkPriceQuerier
from app.apis.daolv.daolv_bulk_price_querier import DaolvBulkPriceQuerier

QUENEN_NAME_ELONG = 'compare.quenen.elong'
QUENEN_NAME_QUNAR = 'compare.quenen.qunar'
QUENEN_NAME_DAOLV = 'compare.quenen.daolv'
QUENEN_NAME_HAOQIAO = 'compare.quenen.haoqiao'
QUENEN_NAME_CNBOOKING = 'compare.quenen.cnbooking'
QUENEN_NAME_PROFIT = 'compare.quenen.profit'
QUENEN_NAME_JLTOUR = 'compare.quenen.jltour'
QUENEN_NAME_CTRIP = 'compare.quenen.ctrip'
QUENEN_NAME_ELONG_CN = 'compare.quenen.elong_cn'

TASK_EXPIRE_TIME = 15  # 任務過時時間,消費時候比提交任務時候晚了15秒則不執行這個任務
TASK_TIMEOUT = 20  # 任務(函數)運行超時,自動殺死的時間配置

logger_redis = LogManager('logger_redis').get_logger_and_add_handlers(5, is_add_stream_handler=False, log_filename='logger_redis.log')


class BaseExecuor(RedisMixin, LoggerMixin):
    """
    單個酒店查詢的基類
    """

    def __init__(self, redis_list_key_name, thread_pool_nums, every_request_interval_time, platfrom_name):
        """

        :param redis_list_key_name: 每一個平臺的redis任務鍵
        :param thread_pool_nums: 線程池最大數量
        :param every_request_interval_time: 每隔多少秒方任務到線程池,用於限制頻率
        :param platfrom_name: 平臺名字
        """
        self._redis_list_key_name = redis_list_key_name
        self._thread_pool_nums = thread_pool_nums
        self._every_request_interval_time = every_request_interval_time
        self._platfrom_name = platfrom_name
        self._pool = BoundedThreadPoolExecutor(self._thread_pool_nums)
        self._t0 = time.time()
        self._count_per_second = 0
        self._lock = threading.Lock()
        self.logger_with_file.debug(f'監聽的隊列是 {self._redis_list_key_name}')

    def _shedul_a_task(self, redis_task: str):
        hotel_map_item, arrival_date, departure_date, adults, children_str, timestamp = redis_task.split('@@')
        hotel_map_item = json.loads(hotel_map_item)
        adults = int(adults)
        children_str = '' if children_str in (0, '0') else children_str  # 空的會出現4個@符號在一塊兒,split出錯
        if time.time() - float(timestamp) < TASK_EXPIRE_TIME:
            self.logger_with_file.debug(f'未過時,執行這個任務 {redis_task} ')
            time.sleep(self._every_request_interval_time)
            lowest_price_key = 'lowestprice_' + hotel_map_item['_id'] + '_' + arrival_date + '_' + departure_date + '_' + str(adults) + '_' + str(children_str)
            if not self.redis_db_hotel.exists(lowest_price_key):  # TODO 若是此馬踏飛燕id不存在最低價則請求
                self._pool.submit(self.execute_specific_task, hotel_map_item, arrival_date, departure_date, adults, children_str)
            else:
                self.logger_with_file.warning(f'此馬踏飛燕酒店 {hotel_map_item["_id"]} 已經有最低價了,這次不請求 {self._platfrom_name} 這個平臺')
        else:
            self.logger_with_file.warning(f'時間超過 {TASK_EXPIRE_TIME},放棄這個任務 {redis_task}')

    def start(self):
        while True:
            try:
                time_redis_0 = time.time()
                redis_task_bytes = self.redis_db_hotel.rpop(self._redis_list_key_name)  # 獲得一個鍵hotel_map_item,arrival_date, departure_date, adults, children_str,timestamp
                if redis_task_bytes:
                    redis_task = redis_task_bytes.decode('utf8')
                    self.logger_with_file.debug(f'從 {self._redis_list_key_name} 鍵取出的內容是-->  {redis_task}  redis取出耗時 {time.time() - time_redis_0}')
                    self._shedul_a_task(redis_task)
                else:
                    if time.time() - self._t0 > 5:  # 爲了避免頻繁寫這個日誌主要是
                        self._t0 = time.time()
                        self.logger.debug(f'平臺  {self._platfrom_name}  {self._redis_list_key_name} 隊列中沒有任務, redis耗時 {time.time() - time_redis_0}')
                    time.sleep(self._every_request_interval_time)
            except Exception as e:
                self.logger_with_file.exception(e)
                time.sleep(self._every_request_interval_time)

    @abc.abstractmethod
    def execute_specific_task(self, hotel_map_item_or_list: typing.Union[dict, list], arrival_date__, departure_date__, adults__, children_str__):
        raise NotImplemented


class BaseBulkExcutor(BaseExecuor):
    """批量查詢的基類"""

    def execute_specific_task(self, hotel_map_item_or_list: typing.Union[dict, list], arrival_date__, departure_date__, adults__, children_str__):
        pass

    def _shedul_a_task(self, redis_task: str):
        redis_task = json.loads(redis_task)
        hotel_map_item_list = redis_task['id_list']
        arrival_date, departure_date, adults, children_str, timestamp = redis_task['arrival_date'], redis_task['departure_date'], redis_task['adults'], redis_task['children_str'], redis_task['timestamp']
        adults = int(adults)
        children_str = '' if children_str in (0, '0') else children_str  # 空的會出現4個@符號在一塊兒,split出錯,用了0代替空字符串
        if time.time() - float(timestamp) < TASK_EXPIRE_TIME:
            self.logger_with_file.debug(f'未過時,執行這個任務 {redis_task} ')
            time.sleep(self._every_request_interval_time)
            self._pool.submit(self.execute_specific_task, hotel_map_item_list, arrival_date, departure_date, adults, children_str)
        else:
            self.logger_with_file.warning(f'時間超過 {TASK_EXPIRE_TIME},放棄這個任務 {redis_task}')


class QunarExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('qunar', icon_list.ICON_QUNAR)
        def qunar_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            if not hotel_map_item['_id'].startswith('IN'):
                return getPrice(hotel_map_item['qunar_id'], arrival_date, departure_date)
            else:
                if children_str:
                    qunar_children_age = children_str.replace(",", "|")
                    qunar_children = len(children_str.split(","))
                else:
                    qunar_children_age = ''
                    qunar_children = 0
                return getPrice_in(hotel_map_item['qunar_id'], arrival_date, departure_date, adults, qunar_children, qunar_children_age)

        qunar_live(*args, **kwargs)


class CnbookingExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('cnbooking', icon_list.ICON_LONGTENG)
        def cnbooking_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            if not hotel_map_item['_id'].startswith('IN'):
                return CnbookingHotelPriceQuerier(hotel_map_item['cnbooking_id'], arrival_date, departure_date, adults, children_str).get_result()
            else:
                return CnbookingHotelPriceQuerierInternational(hotel_map_item['cnbooking_id'], arrival_date, departure_date, adults, children_str).get_result()

        cnbooking_live(*args, **kwargs)


class ElongExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('elong', icon_list.ICON_MASHANGZHU)
        def elong_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            if not hotel_map_item['_id'].startswith('IN'):
                pass
                # return detail_priceinfo(arrival_date, departure_date, hotel_map_item['elong_id'])
            else:
                return elong(arrival_date, departure_date, hotel_map_item['elong_id'], adults, children_str)

        elong_live(*args, **kwargs)


class DaolvExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('daolv', icon_list.ICON_DAOLV)
        def daolv_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            """不須要區分國內外"""
            return query_hoteldetail_price(hotel_map_item['daolv_id'], arrival_date, departure_date, adults, children_str)

        daolv_live(*args, **kwargs)


class JltourExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        # noinspection PyUnusedLocal
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('jltour', icon_list.ICON_JLTOUR)
        def jltour_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            """不須要區分國內外"""
            return get_jl_tour_price(hotel_map_item['jltour_id'], arrival_date, departure_date, adults)

        jltour_live(*args, **kwargs)


class HaoqiaoExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('haoqiao', icon_list.ICON_HQ)
        def haoqiao_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            """不須要區分國內外"""
            haoqiao = hotel_map_item['haoqiao_id']
            hotel_id = haoqiao['hotel_id']
            city_id = haoqiao['city_id']
            return search2(hotel_id, city_id, arrival_date, departure_date, children_str, adults)

        haoqiao_live(*args, **kwargs)


class YingliExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        # noinspection PyUnusedLocal,PyUnusedLocal
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('yingli', icon_list.ICON_JUYOUHUI)
        def yingli_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            """不須要區分國內外"""
            return get_detial(hotel_map_item['yingli_id'], arrival_date, departure_date)

        yingli_live(*args, **kwargs)


class CtripExecutor(BaseExecuor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @live_price_deco('ctrip', icon_list.ICON_CTRIP)
        def ctrip_live(hotel_map_item, arrival_date, departure_date, adults, children_str):
            if hotel_map_item['_id'].startswith('IN'):
                return ctripPriceIn(hotel_map_item['ctrip_id'], arrival_date, departure_date, adults, children_str)
            else:
                return ctripPrice(hotel_map_item['ctrip_id'], arrival_date, departure_date)

        ctrip_live(*args, **kwargs)


# ###########################################################批量查詢######################################################################################
class JltourBulkExecutor(BaseBulkExcutor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @bulk_price_live_deco(platform_name='jltour', platform_icon=icon_list.ICON_JLTOUR, platform_hotel_id_key='jltour_id')
        def jltour_bulk_request_price_live(hotel_map_item_list, arrival_date, departure_date, adults, children_str):
            hotel_id_list = [hotel_map_item['jltour_id'] for hotel_map_item in hotel_map_item_list]
            return JltourBulkPriceQuerier(hotel_id_list, arrival_date, departure_date, adults, children_str).get_result_list()

        jltour_bulk_request_price_live(*args, **kwargs)


class ElongBulkExecutor(BaseBulkExcutor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @bulk_price_live_deco(platform_name='elong', platform_icon=icon_list.ICON_MASHANGZHU, platform_hotel_id_key='elong_id')
        def elong_cn_bulk_request_price_live(hotel_map_item_list, arrival_date, departure_date, adults, children_str):
            hotel_id_list = [hotel_map_item['elong_id'] for hotel_map_item in hotel_map_item_list]
            price_result_list = elong_cn_bulk_request_price(hotel_id_list, arrival_date, departure_date, adults, children_str)
            return price_result_list

        elong_cn_bulk_request_price_live(*args, **kwargs)


class DaolvBulkExecutor(JltourBulkExecutor):
    def execute_specific_task(self, *args, **kwargs):
        @decorators.timeout(TASK_TIMEOUT)
        @bulk_price_live_deco(platform_name='daolv', platform_icon=icon_list.ICON_DAOLV, platform_hotel_id_key='daolv_id')
        def daolv_bulk_request_price_live(hotel_map_item_list, arrival_date, departure_date, adults, children_str):
            hotel_id_list = [hotel_map_item['daolv_id'] for hotel_map_item in hotel_map_item_list]
            querier = DaolvBulkPriceQuerier(hotel_id_list, arrival_date, departure_date, adults, children_str)
            querier.set_is_real_time(is_real_time=False)
            return querier.get_result_list()

        daolv_bulk_request_price_live(*args, **kwargs)


def start_executor(**kwargs):
    platfrom_name = kwargs['platfrom_name']
    if platfrom_name == '去哪':
        executor_class = QunarExecutor
    elif platfrom_name == '龍騰':
        executor_class = CnbookingExecutor
    elif platfrom_name == '藝龍國際':
        executor_class = ElongExecutor
    elif platfrom_name == '道旅':
        executor_class = DaolvBulkExecutor
    elif platfrom_name == '捷旅':
        executor_class = JltourBulkExecutor
    elif platfrom_name == '好巧':
        executor_class = HaoqiaoExecutor
    elif platfrom_name == '盈利':
        executor_class = YingliExecutor
    elif platfrom_name == '攜程':
        executor_class = CtripExecutor
        CtripExecutor(**kwargs).start()
    elif platfrom_name == '藝龍國內':
        executor_class = ElongBulkExecutor
    else:
        raise ValueError('平臺名字設置不正確')
    executor_class(**kwargs).start()


if __name__ == '__main__':
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_QUNAR, 'thread_pool_nums': 300, 'every_request_interval_time': 0.02, 'platfrom_name': '去哪'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_CNBOOKING, 'thread_pool_nums': 300, 'every_request_interval_time': 0.02, 'platfrom_name': '龍騰'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_ELONG, 'thread_pool_nums': 300, 'every_request_interval_time': 0.15, 'platfrom_name': '藝龍國際'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_DAOLV, 'thread_pool_nums': 300, 'every_request_interval_time': 0.02, 'platfrom_name': '道旅'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_JLTOUR, 'thread_pool_nums': 300, 'every_request_interval_time': 0.1, 'platfrom_name': '捷旅'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_HAOQIAO, 'thread_pool_nums': 100, 'every_request_interval_time': 0.5, 'platfrom_name': '好巧'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_PROFIT, 'thread_pool_nums': 300, 'every_request_interval_time': 0.01, 'platfrom_name': '盈利'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_CTRIP, 'thread_pool_nums': 500, 'every_request_interval_time': 0.01, 'platfrom_name': '攜程'}).start()
    Process(target=start_executor, kwargs={'redis_list_key_name': QUENEN_NAME_ELONG_CN, 'thread_pool_nums': 200, 'every_request_interval_time': 0.15, 'platfrom_name': '藝龍國內'}).start()
相關文章
相關標籤/搜索