文件目錄以下:python
celery flower啓動命令:redis
celery -A tasks flower --loglevel=info
celery 命令:json
celery -A tasks worker --loglevel=info
生成任務命令:cookie
python generate_task.py
tasks.pyapp
#!/usr/bin/env python # encoding: utf-8 import logging import time import json from celery import Celery app = Celery('reyun') app.config_from_object("config") # 日誌設置 datefmt = "%Y-%m-%d %H:%M:%S" log_format = "%(asctime)s - %(message)s" logging.basicConfig(level=logging.INFO, format=log_format, datefmt=datefmt) # @app.task(utc=True, expires=10) @app.task(utc=True, name='tasks.crawl', rate_limit='12/m') def crawl(request_data): try: request_data = json.loads(request_data) except Exception as e: logging.error(e, exc_info=True) if not request_data: return url = request_data.get('url', None) if not url: logging.info("url爲空",exc_info=True) return headers = request_data.get('headers', None) if not headers: logging.info("headers爲空",exc_info=True) return method = request_data.get('method', None) if not method: logging.info("method爲空",exc_info=True) return params = request_data.get('params', None) if not params: logging.info("params爲空",exc_info=True) return cat = request_data.get('cat', None) if not cat: logging.info("cat爲空",exc_info=True) return data = request_data.get('data', None) cookies = request_data.get('cookies', None) json_data = request_data.get('json_data', None) timeout = request_data.get('timeout', None) if json_data: result = post_json(url, headers, json_data, cookies=cookies, timeout=timeout) else: result = send(url, headers, params=params, data=data, method=method, timeout=timeout, cookies=cookies) return time.time() # spider = self.cat_spider_dict.get(cat) # if spider: # spider.process_res(result=result, request_data=request_data) # else: # logging.error("spider is None cat is %s " % cat)
config.pyide
#!/usr/bin/env python # encoding: utf-8 from kombu import Exchange, Queue CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/4' BROKER_URL = 'redis://127.0.0.1:6379/5' CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_QUEUES = ( Queue("craw_queue", Exchange("craw_queue"), routing_key="crawl"), ) CELERY_ROUTES = { 'tasks.crawl': {"queue": "craw_queue", "routing_key": "craw_queue"} }