基於python實現的三方組件----Celery

一.基於python實現的三方組件----Celery

1.做用

用於異步週期任務的處理python

2.Celery的組成

(1)任務 app
(2)記錄任務的緩存(一般用redis或rabbitMQ)
    任務記錄 -broker
    任務返回記錄-backend
(3)Worker 員工
    主動執行任務
    主動反饋結果

3.celery簡單實例

s1.pyredis

from celery import Celery
import time

#使用redis鏈接url格式 :redis://:password@hostname:port/db_number
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括號中task表明你對任務在內部的稱呼

@my_task.task
def my_func1(a,b):
    time.sleep(10)
    return a+b

@my_task.task
def my_func2():
    time.sleep(10)
    return 2

@my_task.task
def my_func3():
    time.sleep(10)
    return 3

命令行運行json

Linux:Linux - celery worker -A s1 -l INFO 
Windows:celery worker -A s1 -l INFO -P eventlet
#Windows下須要下載eventlet模塊模塊,不然celery4的版本不支持windows
#l:日誌輸出
#c:數量

s2.pyflask

from s1 import my_func1,my_func2,my_func3
pid=my_func1.delay(10,20)
print(pid)
pid=my_func2.delay()
print(pid)
pid=my_func3.delay()
print(pid)

s3.pywindows

from celery.result import AsyncResult
from s1 import my_task
#運行s2.py獲得的pid
res=AsyncResult(id='2b36d20f-da07-42fe-b203-1e56fbaafd5e',app=my_task)
if res.successful():
    print(res.get())
else:
    print("任務正在進行中")

4.爬蟲簡單應用

在caiji.py中緩存

from flask import Flask,request as requ,jsonify,render_template,send_file
import pymongo
import json
import time
import  urllib
import requests
import re
from urllib import request
import uuid
from celery import Celery
import time

#使用redis鏈接url格式 :redis://:password@hostname:port/db_number
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括號中task表明你對任務在內部的稱呼

#獲取各類分類的歌曲列表
@my_task.task
def getcontent():
        # content=requ.form.get("content")
        # print(content)
        headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36"}
        url="https://www.ximalaya.com/ertong/ertongwenxue"
        request=urllib.request.Request(url,headers=headers)
        response=urllib.request.urlopen(request)

        response_text=response.read().decode("utf-8")
        title_id_list=re.findall('"album-title line-2 lg.+?" title="(.+?)" href="/ertong/(\d+?)/">',response_text)

        anthor_list=re.findall('"album-author.+?" title="(.+?)" href',response_text)
        response_list=[]
        i = 0
        for i in range(len(title_id_list)):
            response_dict={}
            response_dict={
                "title":title_id_list[i][0],
                "id":title_id_list[i][1],
                "author":anthor_list[i]
            }
            response_list.append(response_dict)


        # print("返回",response_list)
        return response_list


#獲取music的二進制文件
@my_task.task
def getmusic(id):
    print(id)
    url="http://m.ximalaya.com/ertong/"+id+"/"
    response=requests.get(url)
    response.encoding="utf-8"
    path=re.findall('"isCopyright":.+?"src":"(.+?)","albumId"',response.text)[0]
    print("res",path)
    d_data = requests.get(path)
    get_str=str(uuid.uuid4())
    print(get_str)
    name="./music/"+get_str + ".mp3"
    with open(name,"wb") as f:
        f.write(d_data.content)
    return send_file(name)
# getcontent()

在results.py中app

from caiji import getcontent,getmusic
res1=getcontent.delay()
print(res1)
for i in res1.get():
    res2 = getmusic.delay(i["id"])
    print(res2)

5.定時任務(十秒鐘後執行函數)

在s4.py中異步

from celery import Celery
import time
my_task=Celery("task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括號中表明你對任務在內部的稱呼
​
@my_task.task
def my_func1(a,b):
    return 1

在s5.py中async

import datetime
import time
from s4 import my_func1
​
tp = time.time()
utc_time = datetime.datetime.utcfromtimestamp(tp)
add_time = datetime.timedelta(seconds=10)
utc_time = utc_time + add_time
res = my_func1.apply_async(args=(2,3),eta=utc_time)
print(res)

6.週期任務

task_one.py函數

from celery import Celery
import time
​
my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") 
@my_task.task
def my_func1():
    time.sleep(10)
    return "十秒鐘執行的"

task_two.py

import time
from task_one import my_task
@my_task.task
def my_func2():
    time.sleep(5)
    return "五秒鐘執行的"

s6.py

from celery import Celery
from celery.schedules import crontab

celery_task = Celery("task",
                     broker="redis://127.0.0.1:6379",
                     backend="redis://127.0.0.1:6379",
                     include=["task_one","task_two"])

#我要要對beat任務生產作一個配置,這個配置的意思就是每10秒執行一次Celery_task.task_one任務參數是(10,10)
celery_task.conf.beat_schedule={
    "each10s_task":{
        "task":"task_one.my_func1",
        "schedule":10, # 每10秒鐘執行一次
        # "args":(10,20)
    },
    "each5s_task": {
        "task": "task_two.my_func2",
        "schedule":5, # 每5秒
        # "args": (50, 60)
    },

}

# celery beat -A Celery_task
# celery worker -A Celery_task -l INFO -P eventlet

celery beat -A Celery_task

7.celery項目目錄

在selery.py中

from celery import Celery
my_task = Celery("task",
                 broker="redis://127.0.0.1:6379",
                 backend="redis://127.0.0.1:6379",
                 include=["Celery_task.task_one","Celery_task.task_two"])

在task_one.py中

from Celery_task.celery import my_task

@my_task.task
def func1():
    return 1

@my_task.task
def func3():
    return 3

在task_two.py中

from Celery_task.celery import my_task

@my_task.task
def func2():
    return 2

celery worker -A Celery_task -l INFO -P eventlet

相關文章
相關標籤/搜索