Celery的實踐指南

 
Celery的實踐指南
celery原理:
celery其實是實現了一個典型的生產者-消費者模型的消息處理/任務調度統,消費者(worker)和生產者(client)均可以有任意個,他們經過消息系統(broker)來通訊。
典型的場景爲:
  1. 客戶端啓動一個進程(生產者),當用戶的某些操做耗時較長或者比較頻繁時,考慮接入本消息系統,發送一個task任務給broker。
  2. 後臺啓動一個worker進程(消費者),當發現broker中保存有某個任務到了該執行的時間,他就會拿過來,根據task類型和參數執行。
 
實踐中的典型場景:
  1. 簡單的定時任務:
    1. 替換crontab的celery寫法:

      1. from celery import Celery
        from celery.schedules import crontab

        app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")

        app.conf.update(CELERYBEAT_SCHEDULE = {
            "add": {
                "task": "celery_demo.add",
                "schedule": crontab(minute="*"),
                "args": (16, 16)
            },
        })

        @app.task
        def add(x, y):
            return x + y

    2. 運行celery的worker,讓他做爲consumer運行,自動從broker上得到任務並執行。
      1. `celery -A celery_demo worker`
    3. 運行celery的client,讓其根據schedule,自動生產出task msg,併發布到broker上。
      1. `celery -A celery_demo beat`
    4. 安裝並運行flower,方便監控task的運行狀態
      1. `celery flower -A celery_demo`
      2. 或者設置登陸密碼 `
        celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
  2. 多同步任務-鏈式任務-
  3. 失敗自動重試的task
    1. 失敗重試方法: 將task代碼函數參數增長self,同時綁定bind。
    2. demo代碼:
      1. @app.task(bind=True, default_retry_delay=300, max_retries=5)
        def my_task_A(self):
            try:
                print("doing stuff here...")
            except SomeNetworkException as e:
                print("maybe do some clenup here....")
                self.retry(e)
    3. 自動重試後,是否將任務從新入queue後排隊,仍是等待指定的時間?能夠經過self.retry()參數來指定。
  4. 派發到不一樣Queue隊列的task
    1. 一個task自動映射到多個queue中的方法, 經過配置task和queue的routing_key命名模式。
      1. 好比:把queue的exchange和routing_key配置成通用模式:
      2. 再定義task的routing_key的名稱:
    2. 可用的不一樣exchange策略:
      1. direct:直接根據定義routing_key
      2. topic:exchange會根據通配符來將一個消息推送到多個queue。
      3. fanout:將消息拆分,分別推送到不一樣queue,一般用於超大任務,耗時任務。
    3. 參考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
  5. 高級配置
    1. result是否保存
    2. 失敗郵件通知:
    3. 關閉rate limit:
  6. auto_reload方法(*nix系統):
    1. celery經過監控源代碼目錄的改動,自動地進行reload
    2. 使用方法:1.依賴inotify(Linux) 2. kqueue(OS X / BSD)
    3. 安裝依賴:
      $ pip install pyinotify
    4. (可選) 指定fsNotify的依賴:
      $ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
    5. 啓動: celery -A appname worker --autoreload
  7. auto-scale方法:
    1. 啓用auto-scale
    2. 臨時增長worker進程數量(增長consumer):
      $ celery -A proj control add_consumer foo -d worker1.local
    3. 臨時減小worker進程數量(減小consumer):
  8. 將scheduled task的配置從app.conf變成DB的方法:
    1. 須要在啓動時指定custom schedule 類名,好比默認的是: celery.beat.PersistentScheduler 。
      1.  celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
  9. 啓動中止worker的方法:
    1. 啓動 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
      1. root用戶可使用celeryd
      2. 非特權用戶:celery multi start worker1 -A appName  —autoreload  --pidfile="$HOME/run/celery/%n.pid"  --logfile="$HOME/log/celery/%n.log"
      3. 或者 celery worker —detach
    2. 中止
    3. ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
  10. 與Flask集成的方法
    1. 集成後flask將充當producer來建立併發送task給broker,在celery啓動的獨立worker進程將從broker中得到task並執行,同時將結果返回。
    2. flask中異步地得到task結果的方法:add.delay(x,y),有時須要對參數進行命名後傳遞 或者 add.apply_async(args=(x,y), countdown=30)
    3. flask得到
  11. 與flask集成後的啓動問題
    1. 因爲celery的默認routing_key是根據生產者在代碼中的import級別來設定的,因此worker端在啓動時應該注意其啓動目錄應該在項目頂級目錄上,否者會出現KeyError。
  12. 性能提高: eventlet 和 greenlet
 
 
 
相關文章
相關標籤/搜索