# Celeryhtml
## 1.什麼是Celerypython
Celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統git
專一於實時處理的異步任務隊列github
同時也支持任務調度redis
### Celery架構數據庫
![20150314100608_187](C:\Users\Administrator\Desktop\celery和hystack\20150314100608_187.png)django
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。windows
#### 消息中間件後端
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等api
#### 任務執行單元
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。
#### 任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等
### 版本支持狀況
```
Celery version 4.0 runs on
Python ❨2.7, 3.4, 3.5❩
PyPy ❨5.4, 5.5❩
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
```
## 2.使用場景
異步任務:將耗時操做任務提交給Celery去異步執行,好比發送短信/郵件、消息推送、音視頻處理等等
定時任務:定時執行某件事情,好比天天數據統計
## 3.Celery的安裝配置
pip install celery
消息中間件:RabbitMQ/Redis
app=Celery('任務名',backend='xxx',broker='xxx')
## 4.Celery執行異步任務
#### 基本使用
建立項目celerytest
建立py文件:celery_app_task.py
```python
import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
return x+y
```
建立py文件:add_task.py,添加任務
```python
from celery_app_task import add
result = add.delay(4,5)
print(result.id)
```
建立py文件:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info
注:windows下:celery worker -A celery_app_task -l info -P eventlet
```python
from celery_app_task import cel
if __name__ == '__main__':
cel.worker_main()
# cel.worker_main(argv=['--loglevel=info')
```
建立py文件:result.py,查看任務執行結果
```python
from celery.result import AsyncResult
from celery_app_task import cel
async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
if async.successful():
result = async.get()
print(result)
# result.forget() # 將結果刪除
elif async.failed():
print('執行失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常後正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
```
執行 add_task.py,添加任務,並獲取任務ID
執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info
執行 result.py,檢查任務狀態並獲取結果
#### 多任務結構
```python
pro_cel
├── celery_task# celery相關文件夾
│ ├── celery.py # celery鏈接和配置相關文件,必須叫這個名字
│ └── tasks1.py # 全部任務函數
│ └── tasks2.py # 全部任務函數
├── check_result.py # 檢查結果
└── send_task.py # 觸發任務
```
celery.py
```python
from celery import Celery
cel = Celery('celery_demo',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
# 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類
include=['celery_task.tasks1',
'celery_task.tasks2'
])
# 時區
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
```
tasks1.py
```python
import time
from celery_task.celery import cel
@cel.task
def test_celery(res):
time.sleep(5)
return "test_celery任務結果:%s"%res
```
tasks2.py
```python
import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
time.sleep(5)
return "test_celery2任務結果:%s"%res
```
check_result.py
```python
from celery.result import AsyncResult
from celery_task.celery import cel
async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)
if async.successful():
result = async.get()
print(result)
# result.forget() # 將結果刪除,執行完成,結果不會自動刪除
# async.revoke(terminate=True) # 不管如今是何時,都要終止
# async.revoke(terminate=False) # 若是任務尚未開始執行呢,那麼就能夠終止。
elif async.failed():
print('執行失敗')
elif async.status == 'PENDING':
print('任務等待中被執行')
elif async.status == 'RETRY':
print('任務異常後正在重試')
elif async.status == 'STARTED':
print('任務已經開始被執行')
```
send_task.py
```python
from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2
# 當即告知celery去執行test_celery任務,並傳入一個參數
result = test_celery.delay('第一個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)
```
添加任務(執行send_task.py),開啓work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)
## 5.Celery執行定時任務
#### 設定時間讓celery執行一個任務
add_task.py
```python
from celery_app_task import add
from datetime import datetime
# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)
# 方式二
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
```
#### 相似於contab的定時任務
多任務結構中celery.py修改以下
```python
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
'celery_task.tasks1',
'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False
cel.conf.beat_schedule = {
# 名字隨意命名
'add-every-10-seconds': {
# 執行tasks1下的test_celery函數
'task': 'celery_task.tasks1.test_celery',
# 每隔2秒執行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=2),
# 傳遞參數
'args': ('test',)
},
# 'add-every-12-seconds': {
# 'task': 'celery_task.tasks1.test_celery',
# 每一年4月11號,8點42分執行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'args': (16, 16)
# },
}
```
啓動一個beat:celery beat -A celery_task -l info
啓動work執行:celery worker -A celery_task -l info -P eventlet
## 6.Django中使用Celery
安裝包
```python
celery==3.1.25
django-celery==3.1.20
```
在項目目錄下建立celeryconfig.py
```python
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
'app01.tasks',
)
#有些狀況能夠防止死鎖
CELERYD_FORCE_EXECV=True
# 設置併發worker數量
CELERYD_CONCURRENCY=4
#容許重試
CELERY_ACKS_LATE=True
# 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30
```
在app01目錄下建立tasks.py
```python
from celery import task
@task
def add(a,b):
with open('a.text', 'a', encoding='utf-8') as f:
f.write('a')
print(a+b)
```
視圖函數views.py
```python
from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
# result=add.delay(2,3)
ctime = datetime.now()
# 默認用utc時間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=5)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
return HttpResponse('ok')
```
settings.py
```python
INSTALLED_APPS = [
...
'djcelery',
'app01'
]
...
from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
```
# Haystack
## 1.什麼是Haystack
Haystack是django的開源全文搜索框架(全文檢索不一樣於特定字段的模糊查詢,使用全文檢索的效率更高 ),該框架支持**Solr**,**Elasticsearch**,**Whoosh**, ***Xapian*搜索引擎它是一個可插拔的後端(很像Django的數據庫層),因此幾乎你全部寫的代碼均可以在不一樣搜索引擎之間便捷切換
- 全文檢索不一樣於特定字段的模糊查詢,使用全文檢索的效率更高,而且可以對於中文進行分詞處理
- haystack:django的一個包,能夠方便地對model裏面的內容進行索引、搜索,設計爲支持whoosh,solr,Xapian,Elasticsearc四種全文檢索引擎後端,屬於一種全文檢索的框架
- whoosh:純Python編寫的全文搜索引擎,雖然性能比不上sphinx、xapian、Elasticsearc等,可是無二進制包,程序不會莫名其妙的崩潰,對於小型的站點,whoosh已經足夠使用
- jieba:一款免費的中文分詞包,若是以爲很差用可使用一些收費產品
## 2.安裝
```python
pip install django-haystack
pip install whoosh
pip install jieba
```
## 3.配置
###添加Haystack到`INSTALLED_APPS`
跟大多數Django的應用同樣,你應該在你的設置文件(一般是`settings.py`)添加Haystack到`INSTALLED_APPS`. 示例:
```python
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
# 添加
'haystack',
# 你的app
'blog',
]
```
###修改`settings.py`
在你的`settings.py`中,你須要添加一個設置來指示站點配置文件正在使用的後端,以及其它的後端設置。 `HAYSTACK——CONNECTIONS`是必需的設置,而且應該至少是如下的一種:
#### Solr示例
```python
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'haystack.backends.solr_backend.SolrEngine',
'URL': 'http://127.0.0.1:8983/solr'
# ...or for multicore...
# 'URL': 'http://127.0.0.1:8983/solr/mysite',
},
}
```
#### Elasticsearch示例
```python
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'haystack.backends.elasticsearch_backend.ElasticsearchSearchEngine',
'URL': 'http://127.0.0.1:9200/',
'INDEX_NAME': 'haystack',
},
}
```
#### Whoosh示例
```python
#須要設置PATH到你的Whoosh索引的文件系統位置
import os
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'haystack.backends.whoosh_backend.WhooshEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
},
}
# 自動更新索引
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
```
#### Xapian示例
```python
#首先安裝Xapian後端(http://github.com/notanumber/xapian-haystack/tree/master)
#須要設置PATH到你的Xapian索引的文件系統位置。
import os
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'xapian_backend.XapianEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'xapian_index'),
},
}
```
## 4.處理數據
### 建立索引
若是你想針對某個app例如blog作全文檢索,則必須在blog的目錄下面創建`search_indexes.py`文件,文件名不能修改
```python
from haystack import indexes
from app01.models import Article
class ArticleIndex(indexes.SearchIndex, indexes.Indexable):
#類名必須爲須要檢索的Model_name+Index,這裏須要檢索Article,因此建立ArticleIndex
text = indexes.CharField(document=True, use_template=True)#建立一個text字段
#其它字段
desc = indexes.CharField(model_attr='desc')
content = indexes.CharField(model_attr='content')
def get_model(self):#重載get_model方法,必需要有!
return Article
def index_queryset(self, using=None):
return self.get_model().objects.all()
```
爲何要建立索引?索引就像是一本書的目錄,能夠爲讀者提供更快速的導航與查找。在這裏也是一樣的道理,當數據量很是大的時候,若要從這些數據裏找出全部的知足搜索條件的幾乎是不太可能的,將會給服務器帶來極大的負擔。因此咱們須要爲指定的數據添加一個索引(目錄),在這裏是爲Note建立一個索引,索引的實現細節是咱們不須要關心的,至於爲它的哪些字段建立索引,怎麼指定 ,下面開始講解
每一個索引裏面必須有且只能有一個字段爲 document=True,這表明haystack 和搜索引擎將使用此字段的內容做爲索引進行檢索(primary field)。其餘的字段只是附屬的屬性,方便調用,並不做爲檢索數據
```python
注意:若是使用一個字段設置了document=True,則通常約定此字段名爲text,這是在ArticleIndex類裏面一向的命名,以防止後臺混亂,固然名字你也能夠隨便改,不過不建議改。
```
另外,咱們在`text`字段上提供了`use_template=True`。這容許咱們使用一個數據模板(而不是容易出錯的級聯)來構建文檔搜索引擎索引。
你應該在模板目錄下創建新的模板`search/indexes/blog/article_text.txt`,並將下面內容放在裏面。
```python
#在目錄「templates/search/indexes/應用名稱/」下建立「模型類名稱_text.txt」文件
{{ object.title }}
{{ object.desc }}
{{ object.content }}
```
這個數據模板的做用是對`Note.title`, `Note.user.get_full_name`,`Note.body`這三個字段創建索引,當檢索的時候會對這三個字段作全文檢索匹配
## 5.設置視圖
### 添加`SearchView`到你的`URLconf`
在你的`URLconf`中添加下面一行:
```python
(r'^search/', include('haystack.urls')),
```
這會拉取Haystack的默認URLconf,它由單獨指向`SearchView`實例的URLconf組成。你能夠經過傳遞幾個關鍵參數或者徹底從新它來改變這個類的行爲。
### 搜索模板
你的搜索模板(默認在`search/search.html`)將可能很是簡單。下面的足夠讓你的搜索運行(你的`template/block`應該會不一樣)
```python
<!DOCTYPE html>
<html>
<head>
<title></title>
<style>
span.highlighted {
color: red;
}
</style>
</head>
<body>
{% load highlight %}
{% if query %}
<h3>搜索結果以下:</h3>
{% for result in page.object_list %}
{# <a href="/{{ result.object.id }}/">{{ result.object.title }}</a><br/>#}
<a href="/{{ result.object.id }}/">{% highlight result.object.title with query max_length 2%}</a><br/>
<p>{{ result.object.content|safe }}</p>
<p>{% highlight result.content with query %}</p>
{% empty %}
<p>啥也沒找到</p>
{% endfor %}
{% if page.has_previous or page.has_next %}
<div>
{% if page.has_previous %}
<a href="?q={{ query }}&page={{ page.previous_page_number }}">{% endif %}« 上一頁
{% if page.has_previous %}</a>{% endif %}
|
{% if page.has_next %}<a href="?q={{ query }}&page={{ page.next_page_number }}">{% endif %}下一頁 »
{% if page.has_next %}</a>{% endif %}
</div>
{% endif %}
{% endif %}
</body>
</html>
```
須要注意的是`page.object_list`其實是`SearchResult`對象的列表。這些對象返回索引的全部數據。它們能夠經過`{{result.object}}`來訪問。
因此`{{ result.object.title}}`實際使用的是數據庫中Article對象來訪問`title`字段的。
### 重建索引
如今你已經配置好了全部的事情,是時候把數據庫中的數據放入索引了。Haystack附帶的一個命令行管理工具使它變得很容易。
簡單的運行`./manage.py rebuild_index`。你會獲得有多少模型進行了處理並放進索引的統計。
## 6.使用jieba分詞
```python
#創建ChineseAnalyzer.py文件
#保存在haystack的安裝文件夾下,路徑如「D:\python3\Lib\site-packages\haystack\backends」
import jieba
from whoosh.analysis import Tokenizer, Token
class ChineseTokenizer(Tokenizer):
def __call__(self, value, positions=False, chars=False,
keeporiginal=False, removestops=True,
start_pos=0, start_char=0, mode='', **kwargs):
t = Token(positions, chars, removestops=removestops, mode=mode,
**kwargs)
seglist = jieba.cut(value, cut_all=True)
for w in seglist:
t.original = t.text = w
t.boost = 1.0
if positions:
t.pos = start_pos + value.find(w)
if chars:
t.startchar = start_char + value.find(w)
t.endchar = start_char + value.find(w) + len(w)
yield t
def ChineseAnalyzer():
return ChineseTokenizer()
```
```python
#複製whoosh_backend.py文件,更名爲whoosh_cn_backend.py
#注意:複製出來的文件名,末尾會有一個空格,記得要刪除這個空格
from .ChineseAnalyzer import ChineseAnalyzer
查找
analyzer=StemmingAnalyzer()
改成
analyzer=ChineseAnalyzer()
```
## 7.在模版中建立搜索欄
```html
<form method='get' action="/search/" target="_blank">
<input type="text" name="q">
<input type="submit" value="查詢">
</form>
```
## 8.其它配置
### 增長更多變量
```python
from haystack.views import SearchView
from .models import *
class MySeachView(SearchView):
def extra_context(self): #重載extra_context來添加額外的context內容
context = super(MySeachView,self).extra_context()
side_list = Topic.objects.filter(kind='major').order_by('add_date')[:8]
context['side_list'] = side_list
return context
#路由修改
url(r'^search/', search_views.MySeachView(), name='haystack_search'),
```
### 高亮顯示
```python
{% highlight result.summary with query %}
# 這裏能夠限制最終{{ result.summary }}被高亮處理後的長度
{% highlight result.summary with query max_length 40 %}
#html中 <style> span.highlighted { color: red; } </style>```