celery 和 haystack

celery  是分佈式異步框架 python

haystack  是全文檢索  只能在Django中用。程序員

 

 

1、什麼是celery?     ---->它是Python寫的,因此只支持Python使用。可是消息隊列是通用的任何語言均可以用。redis

  celery英文翻譯是芹菜。數據庫

  celery是一個簡單、靈活且可靠的、處理大量消息的分佈式系統。django

  專一於實時處理的異步任務隊列。windows

  同時也支持任務調度。後端

celery 架構  由消息中間件broker、任務執行單元worker、任務執行結果存儲task result store,這三部分組成。api

 

user表明我們程序員寫的代碼,我們提交一些執行的任務,好比說發個郵件。---提交到celery框架--->amqp broker 消息隊列----->celery worker 具體執行------>task result store 結果的存儲。緩存

用戶提交一些任務,任務就是一些函數-----提交到消息隊列,排隊等着去執行。   生產者消費者模型session

amqp broker 消息隊列------>消息中間件--->celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件組成。包括。rabbitMQ,Redis(能夠作數據庫,能夠作緩存,能夠作數據隊列...咱們會詳細介紹這個redis)等等

celery worker 具體執行------>任務執行單元--->worker是celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。

task result store 結果的存儲------>任務執行存儲結果存儲--->用來存儲worker執行的任務的結果,celery支持以不一樣方式存儲任務的結果,包括AMOB,redis等。

 

什麼是消息隊列?

什麼是異步處理?

 

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
celery 版本支持狀況 

2、使用場景

  異步任務:將耗時操做任務提交給celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等

  定時任務:定時執行某件事情,好比天天數據統計。

 

安裝

pip源或者pip3 install celery == 版本號

 

 

 

3、基本使用

        -1.生成一個celery對象(傳參數),cel.task裝飾器,裝飾函數(這個函數就能夠被提交到任務隊列中)
        -2.把函數提交到任務隊列中 函數.delay(參數)
        -3.啓動worker:celery worker -A celery_task -l info -P eventlet
        -4.查詢任務執行結果
-簡單使用

一、首先須要生成celery對象(傳參數),並寫個任務(函數)---->提交任務---->查詢任務結果---->開啓worker---->查詢結果

建立項目celerytest

建立py文件:celery_app_task.py


import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y

 

建立py文件:add_task.py,添加任務

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

 

建立py文件:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

 

rom celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

 

 

4、多任務結構

 pro_cel
    ├── celery_task# celery相關文件夾
    │   ├── celery.py   # celery鏈接和配置相關文件,必須叫這個名字
    │   └── tasks1.py    #  全部任務函數
    │    └── tasks2.py    #  全部任務函數
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務

 

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])

# 時區
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
celery.py
import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任務結果:%s"%res
task1.py
import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任務結果:%s"%res
task2.py
from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除,執行完成,結果不會自動刪除
    # async.revoke(terminate=True)  # 不管如今是何時,都要終止
    # async.revoke(terminate=False) # 若是任務尚未開始執行呢,那麼就能夠終止。
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')
check_result.py
from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 當即告知celery去執行test_celery任務,並傳入一個參數
result = test_celery.delay('第一個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)
send_task.py

添加任務(執行send_task.py),開啓work:celery worker -A celery_task(這是文件夾) -l info  -P  eventlet,檢查任務執行結果(執行check_result.py)

 

 

5、設定延時時間讓celery執行一個任務

from celery_app_task import add
from datetime import datetime
eta 時間對象爲打他time類型
# 方式一
#經過這個方式拿到一個時間對象,就是參數傳的時間
# v1 = datetime(2019, 2, 13, 18, 19, 56)     #2019年2月13號18點19分56秒
# print(v1)
#把當地時間轉成UTC時間
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
#取一個10秒以後的時間
time_delay = timedelta(seconds=10)
#這個時間是當前時間日後推10秒
task_time = utc_ctime + time_delay

# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
add_task.py

6、設定時間讓celery執行一個任務

 相似於contab的定時任務

多任務結構中celery.py修改以下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執行tasks1下的test_celery函數
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒執行一次
        # 'schedule': 2.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞參數
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.tasks2.test_celery2',
    #     每一年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (‘test2’,)
    # },
}

第一步:在celery.py裏面加

from datetime import timedelta
from celery.schedules import crontab
cel.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執行tasks1下的test_celery函數
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒執行一次 
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞參數
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.tasks1.test_celery',
    #     每一年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}
加的內容

第二步:啓動一個beat:celery beat -A celery_task -l info     ---》啓動一個beat往隊列裏提交任務

第三步:啓動work執行:celery worker -A celery_task -l info -P eventlet      ----》啓動worker

 

 

 

在Django中使用celery

Django-celery版本的要求很是苛刻 不只跟celery版本有關係還跟Django版本有關係。

 

怎麼使用?  也須要本身啓動worker

第一步:在項目根目錄下建立celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些狀況能夠防止死鎖
CELERYD_FORCE_EXECV=True
# 設置併發worker數量
CELERYD_CONCURRENCY=4
#容許重試
CELERY_ACKS_LATE=True
# 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30
celeryconfig.py

第二步:在app01目錄下建立task.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)
View Code

第三步:視圖函數view.py 

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默認用utc時間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')
View Code

第四步:setting.py中配置

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2
View Code

 

啓動worker

python3 manage.py celery worker 

 

 

 

haystack

一、什麼是haystack?

  是Django的開源全文搜索框架。底層依附於solr、elasticsearch、whoosh...這些庫  

- 全文檢索不一樣於特定字段的模糊查詢,使用全文檢索的效率更高,而且可以對於中文進行分詞處理
- haystack:django的一個包,能夠方便地對model裏面的內容進行索引、搜索,設計爲支持whoosh,solr,Xapian,Elasticsearc四種全文檢索引擎後端,屬於一種全文檢索的框架
- whoosh:純Python編寫的全文搜索引擎,雖然性能比不上sphinx、xapian、Elasticsearc等,可是無二進制包,程序不會莫名其妙的崩潰,對於小型的站點,whoosh已經足夠使用
- jieba:一款免費的中文分詞包,若是以爲很差用可使用一些收費產品

 

 

二、安裝:

 

pip install django-haystack
pip install whoosh
pip install jieba

 

 

三、配置

添加Haystack到INSTALLED_APPS 跟大多數Django的應用同樣,你應該在你的設置文件(一般是settings.py)添加Haystack到INSTALLED_APPS. 示例:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.sites',

    # 添加
    'haystack',

    # 你的app
    'blog',
]

 

修改settings.py 在你的settings.py中,你須要添加一個設置來指示站點配置文件正在使用的後端,以及其它的後端設置。 HAYSTACK——CONNECTIONS是必需的設置,而且應該至少是如下的一種:

#須要設置PATH到你的Whoosh索引的文件系統位置
import os
HAYSTACK_CONNECTIONS = {
    'default': {
        'ENGINE': 'haystack.backends.whoosh_backend.WhooshEngine',
        'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
    },
}

# 自動更新索引
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
whoosh示例

 

 

四、處理數據

建立索引

相關文章
相關標籤/搜索