python線程實現異步任務

瞭解異步編程

樓主在工做中遇到了如下問題,開發接口爬取數據代碼完成以後要寫入redis緩存,可是在寫入緩存的過程花費2-3s,進行這樣就大大影響了接口的性能,因而想到了使用異步存儲。html

傳統的同步編程是一種請求響應模型,調用一個方法,等待其響應返回.
異步編程就是要從新考慮是否須要響應的問題,也就是縮小須要響應的地方。由於越快得到響應,就是越同步化,順序化,事務化,性能差化。python

線程實現異步

思路:經過線程調用的方式,來達到異步非阻塞的效果,也就是說主程序無需等待線程執行完畢,仍然能夠繼續向下執行。redis

1.threading模塊和thread模塊

Python經過兩個標準庫thread和threading提供對線程的支持。thread提供了低級別的、原始的線程以及一個簡單的鎖。編程

threading 模塊提供的其餘方法:flask

  • threading.currentThread(): 返回當前的線程變量。
  • threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  • threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法:緩存

  • run(): 用以表示線程活動的方法。
  • start():啓動線程活動。
  • join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
  • isAlive(): 返回線程是否活動的。
  • getName(): 返回線程名。
  • setName(): 設置線程名。

同步阻塞:app

 1 import  threading,time
 2 
 3 def thead(num):
 4     time.sleep(1)
 5     print("阻塞程序%s開始執行"%num)
 6     time.sleep(3)
 7     print("阻塞程序%s執行完畢"%num)
 8 
 9 def main():
10     print("主方法開始執行")
11 
12     for i in range(1,3):
13         thead(i)
14 
15     print("主方法執行完畢")
16     return
17 
18 if __name__ == '__main__':
19     print(time.ctime())
20     num = main()
21     print("返回結果爲%s"%num)
22     print(time.ctime())
Wed Nov 21 09:22:56 2018
主方法開始執行
阻塞程序1開始執行
阻塞程序1執行完畢
阻塞程序2開始執行
阻塞程序2執行完畢
主方法執行完畢
返回結果爲None
Wed Nov 21 09:23:04 2018

異步,無需等待線程執行異步

 
 
import  threading,time

def thead(num):
# time.sleep(1)
print("線程%s開始執行"%num)
time.sleep(3)
print("線程%s執行完畢"%num)

def main():
print("主方法開始執行")

#建立2個線程
poll = []#線程池
for i in range(1,3):
thead_one = threading.Thread(target=thead, args=(i,))
poll.append(thead_one) #線程池添加線程
for n in poll:
n.start() #準備就緒,等待cpu執行

print("主方法執行完畢")
return

if __name__ == '__main__':
print(time.ctime())
num = main()
print("返回結果爲%s"%num)
print(time.ctime())



 

Wed Nov 21 09:48:00 2018
主方法開始執行
主方法執行完畢
返回結果爲None
Wed Nov 21 09:48:00 2018
線程1開始執行
線程2開始執行
線程1執行完畢
線程2執行完畢async

 2.concurrent.futures模塊

concurrent.futures模塊實現了對threading(線程)multiprocessing(進程)的更高級的抽象,對編寫線程池/進程池提供了直接的支持。 異步編程

從Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類,ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來建立線程池和進程池的代碼。(暫時只介紹線程池的使用)

concurrent.futures模塊的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。可是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor倒是很是有用,顧名思義二者分別被用來建立線程池和進程池的代碼。咱們能夠將相應的tasks直接放入線程池/進程池,不須要維護Queue來操心死鎖的問題,線程池/進程池會自動幫咱們調度。

Future這個概念你能夠把它理解爲一個在將來完成的操做,這是異步編程的基礎,傳統編程模式下好比咱們操做queue.get的時候,在等待返回結果以前會產生阻塞,cpu不能讓出來作其餘事情,而Future的引入幫助咱們在等待的這段時間能夠完成其餘的操做。

  • Future Objects:Future類封裝了可調用的異步執行.Future 實例經過 Executor.submit()方法建立。

  • submit(fn, *args, **kwargs):調度可調用的fn,做爲fn(args kwargs)執行,並返回一個表示可調用的執行的Future對象。
  • ThreadPoolExecutor:ThreadPoolExecutor是一個Executor的子類,它使用線程池來異步執行調用。

  • concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=''):Executor子類,使用max_workers規格的線程池來執行異步調用。

在Flask應用中使用異步redis:

from flask import Flask
import time
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor()
app = Flask(__name__)


@app.route('/')
def update_redis():
    executor.submit(do_update)
    return 'ok'


def do_update():
    time.sleep(3)
    print('start update cache')
    time.sleep(1)
    print("end")


if __name__ == '__main__':
    app.run(debug=True)

「ok「在更新緩存前已經返回。

 

本文到這裏就結束了,着重介紹了線程實現異步的方法。固然還有其餘的方法,好比yied實現,還有asyncio模塊,後續會繼續更新異步編程的文章。

舒適提示

  • 本文代碼是在python3.5版本測試運行。
  • 若是您對本文有疑問,請在評論部分留言,我會在最短期回覆。
  • 若是本文幫助了您,也請評論關注,做爲對個人一份鼓勵。
  • 若是您感受我寫的有問題,也請批評指正,我會盡可能修改。
  • 本文爲原創,轉載請註明出處。
相關文章
相關標籤/搜索