#SORA#celery研究筆記


最近看到celery文檔task部分,作一下小結
python


  1. 實際處理時,咱們能夠使用一個相似於logging的模塊生成日誌。數據庫

  2. 對於某些任務,你能夠設置當知足某些條件時,重試任務、拒絕任務或忽略任務json

  3. 在定義task時,@app.task(bind=True)中的bind參數能夠讓你在task中訪問請求中的內容,好比id,group之類的信息app

  4. @app.task(AGRS=VALUE),ARGS有好幾個參數能夠設置,好比name,有些和全局設置(CELERY_xxx_xxx之類的)是同樣的配置內容異步

  5. 能夠自定義任務狀態(默認有pending,started,success,failure,retry,revoked)工具

  6. 當你使用pickle做爲序列化工具時,你應該定義那些能夠被pickle的異常(我用json,直接忽略)fetch

實例化。你能夠繼承Task類定義新類,定義的__init__方法只會被調用一次,此後將持續存在。當你的task以此新類爲基類,後面對此task的調用中,__init__的做用還會存在。(用途:自定義類中維持一個對數據庫的鏈接,task能夠不用每次都建立鏈接,而是對那個存在的屬性進行操做)。url

e.g:日誌

from celery import Task

class DatabaseTask(Task):
    abstract = True
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db
@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        …


7.定義新類時,設置爲抽象類,能夠做爲task的基類。其中又有四種handle方法(after_return,on_failure,on_retry,on_success)code

e.g:

from celery import Task

class DebugTask(Task):
    abstract = True

    def after_return(self, *args, **kwargs):
        print('Task returned: {0!r}'.format(self.request)


@app.task(base=DebugTask)
def add(x, y):
    return x + y


8.最佳實踐:

*忽略不須要的結果,或者設置CELERY_TASK_RESULT_EXPIRES

*若不須要,關閉rate limits

*避免寫出同步的task(阻塞),同時執行的同步的task,前面的會令後面的低效,應該寫成異步的方法或使用callback 

e.g:

#Bad:

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(url, page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)
    
    
    
    
    
#Good:

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s() | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

*使用細粒度的任務,而不是又長又臭的long task

*儘可能把須要操做的數據放在本地,或使worker靠近數據所在,下降因IO延遲的影響



暫時如此,之後有須要再補

相關文章
相關標籤/搜索