【python之路36】進程、線程、協程相關

線程詳細用法請參考:http://www.cnblogs.com/sunshuhai/articles/6618894.html

1、初始多線程html

經過下面兩個例子的運行效率,能夠得知多線程的速度比單線程的要快不少python

#!usr/bin/env python
# -*- coding:utf-8 -*-
import time
def print_num(arg):
    time.sleep(1)
    print(arg)
#每秒打印一個數字,速度很是慢
for i in range(10):
    print_num(i)
#!usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
def print_num(arg):
    time.sleep(1)
    print(arg)
#多線程打印速度很快
for i in range(10):
    t = threading.Thread(target=print_num, args=(i,))
    t.start()

 線程是電腦處理事務的最小單元,線程屬於進程,每一個進程單獨開闢的內存空間,進程越多消耗的內存就越大,每一個進程內能夠建立多個線程,可是CPU調度的時候有線程鎖,每一個進程只能調度一個線程,因此原則上,計算密集型的用進程,IO密集型的用線程。linux

當t.start()後,默認是等待線程運行結束後(即默認t.setDaemon(False)),主程序才結束的,若是使用:t.setDaemon(True)則程序直接結束,後臺運行線程。windows

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1():
    time.sleep(5)
def f2(f1, num):
    print('開始打印....' + num)
    f1()
    print('結束打印....' + num)
t = threading.Thread(target=f2, args=(f1,'1111'))
t.setDaemon(True) #程序不等待線程運行結束,主程序運行完畢後會直接終止程序
t.start()

t1 = threading.Thread(target=f2, args=(f1,'2222'))
t1.setDaemon(True) #程序不等待線程運行結束,主程序運行完畢後會直接終止程序
t1.start()

t2 = threading.Thread(target=f2, args=(f1,'3333'))
#t2.setDaemon(True) #程序不等待線程運行結束,主程序運行完畢後會直接終止程序
t2.start()

 t.join(2) ,若是參數爲空,則默認當改線程執行結束後才繼續向後執行,若是參數爲2,則最多等待2秒,若是1秒運行完,則1秒後繼續運行,若是兩秒內未運行完畢,則也再也不等待,主程序繼續向下運行,原來的線程後臺繼續運行。以下面程序運行4秒後會直接結束:數組

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
def f1():
    time.sleep(100)
    print('程序結束....')

t = threading.Thread(target=f1)
t.setDaemon(True)
t.start()
t.join(2)  #最多等待2秒,後繼續向下運行

t1 = threading.Thread(target=f1)
t1.setDaemon(True)
t1.start()
t1.join(2)  #最多等待2秒,後繼續向下運行

2、線程鎖,rlock,若是有一個變量num = 0,多個線程去讓這個變量自增1,那麼可能兩個線程同時去拿num的初始值,都自增了1。因此就須要線程鎖,只有一條線程能處理這個變量,其餘線程等待,等這個線程處理完畢後,其餘線程才能處理。緩存

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

lock = threading.RLock() #建立線程鎖
num = 0
def func():
    lock.acquire() #得到鎖
    global num
    num += 1
    time.sleep(1)
    print(num)
    lock.release() #釋放鎖


t = threading.Thread(target=func)
t.start()

t1 = threading.Thread(target=func)
t1.start()
#結果1秒鐘輸出1,第2秒鐘輸出2

3、線程事件,threading.Event多線程

Event是線程間通訊最間的機制之一:一個線程發送一個event信號,其餘的線程則等待這個信號。用於主線程控制其餘線程的執行。 Events 管理一個flag,這個flag可使用set()設置成True或者使用clear()重置爲False,wait()則用於阻塞,在flag爲True以前。flag默認爲False。併發

Event.wait([timeout]) : 堵塞線程,直到Event對象內部標識位被設爲True或超時(若是提供了參數timeout)。app

Event.set() :將標識位設爲Ture異步

Event.clear() : 將標識伴設爲False。

Event.isSet() :判斷標識位是否爲Ture。

其實就至關於Event.wait()就至關於遇到了紅燈,當用Event.set()設置爲True時全部的線程暫停在這,一旦用Event.clear()設置爲False,至關於遇到了綠燈,全部線程繼續暫停的地方繼續向下運行

#!usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
obj_event = threading.Event()
def func(event):
    print('start')
    event.wait()
    print('end')

obj_event.clear() #標識位設置爲True至關於打開紅燈
for i in range(10):
    t = threading.Thread(target=func, args=(obj_event,))
    t.start()

inp = input('請輸入False改變標識位:')
if inp == 'False':
    obj_event.set()  ##標識位設置爲False至關於打開綠燈

 4、生產者消費者模型及隊列queue

舉例來講,賣包子,廚師作好包子後推動管道內,顧客從另外一端取走,這個模型叫生產消費者模型,優勢是:廚師能夠不斷生產,至關於緩存,經常使用方法:

put  get  get_nowait不等待

import queue

q = queue.Queue(maxsize=0)  # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0 時,表示隊列長度無限制。

q.join()    # 等到隊列爲kong的時候,在執行別的操做
q.qsize()   # 返回隊列的大小 (不可靠,由於隊列時刻在發生變化)
q.empty()   # 當隊列爲空的時候,返回True 不然返回False (不可靠,由於隊列時刻在發生變化)
q.full()    # 當隊列滿的時候,返回True,不然返回False (不可靠,由於隊列時刻在發生變化)
q.put(item, block=True, timeout=None) #  將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置,
                      #爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,事後,
                      #若是隊列沒法給出放入item的位置,則引起 queue.Full 異常
q.get(block=True, timeout=None) #   移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞,
                      若此時隊列爲空,則引起 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。
q.put_nowait(item) #   等效於 put(item,block=False)
q.get_nowait() #    等效於 get(item,block=False)
#!usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading

q = queue.Queue(10)
def product(i):
    q.put(i)
    print(i)
def customer(i):
    q.get(i)
    print(i)

for i in range(12):
    t = threading.Thread(target=product, args=(i,))
    t.start()
for i in range(9):
    t1 = threading.Thread(target=customer, args=(i,))
    t1.start()

 之後寫程序的時候須要建立多少線程呢?例若有5000個任務,那麼就須要建立5000個線程麼,答案確定不是的。例如你的電腦開10個線程是最優的,那麼每次最多開10個線程。這就引入線程池的概念,最多建立10個線程,若是大於10個線程,那麼先處理10個線程,哪一個線程處理完畢後,再建立一個線程繼續處理下一個任務。

python內部沒有提供線程池,須要咱們自定義線程池,之後使用中不多本身建立線程,通常都使用線程池。

 

5、進程相關multiprocessing

在windows中是不支持進程的,要想使用進程必須加到if __name__ =="__main__":裏面,不然會報錯,但不可能全部的程序都加這個,因此建議進程程序在linux下進行調試.

代碼從上向下解釋是主進程中的主線程作的。

一、建立進程

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(i):
    time.sleep(15)
    print(i)

if __name__ == "__main__":
    p = multiprocessing.Process(target=func, args=(11,))
    p.daemon = True #True表示不等待直接結束主程序;默認爲False,等待進程中的線程運行完畢才繼續運行
    p.start()
    p.join(2) #表示最多等待2秒

二、進程至關因而從新創造一個房間,房間內的資源所有從新建立一份,即在內存中從新開闢一塊內存,其全部變量及對象從新建立一份。

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
li = []
def func(i):
    li.append(i)
    print(li)

if __name__ == "__main__":
    for i in range(10):
        p = multiprocessing.Process(target=func, args=(i,))
        p.start()

#結果輸出以下:
# [1]
# [0]
# [2]
# [3]
# [4]
# [6]
# [7]
# [5]
# [8]
# [9]

 三、雖然默認進程之間的數據是不共享的,但能夠經過建立數據類型,來實現數據共享

(1)數組的Array特性:數組的類型必須指定,數組的個數指定後不能超出範圍

      此方法由於限制太多通常不用

#!usr/bin/env python
# -*- coding:utf-8 -*-

import multiprocessing

def Foo(i,temp):
    temp[i] = 100 + i
    print(list(temp))

if __name__ == "__main__":
    temp = multiprocessing.Array('i', [11, 22, 33, 44])
    for i in range(4):
        p = multiprocessing.Process(target=Foo, args=(i,temp))
        p.start()

(2)經過建立特殊的字典Manager,來實現不一樣進程之間的數據共享,此字典和python普通的字典幾乎是同樣的(能取值和賦值),但也有區別

 

#!usr/bin/env python
# -*- coding:utf-8 -*-

import multiprocessing
import time
def Foo(i,dic):
    dic[i] = 100 + i
    print(len(dic))

if __name__ == "__main__":
    m = multiprocessing.Manager()
    dic = m.dict()
    for item in range(3):
        p = multiprocessing.Process(target=Foo, args=(item,dic,))
        p.start()
        #p.join()
    #進程之間數據共享,基層是經過進程之間數據鏈接實現的,因此若是主進程執行到最後,等待子進程結束,
    #當主進程hold住的時候(即執行到最後),其自進程就沒法進行數據共享了,此時用join,或sleep能夠解決
    time.sleep(3)

 四、進程池Pool

進程池主要有兩個方法:

區別:apply和apply_async,apply是每次只能一個進程進行工做每個任務都是排隊進行,至關於進程.join()

        apply_async經過異步的方式,每次進程能夠同時工做,併發進行,能夠設置回調函數

#!usr/bin/env python
# -*- coding:utf-8 -*-
import multiprocessing
import time
def func(args):
    time.sleep(1)
    print(args + 100)
    return(args + 100)
def f1(args):
    print(args)
if __name__ == "__main__":
    pool = multiprocessing.Pool(5)
    for i in range(50):
        # pool.apply(func, args=(i,))  #逐個進程運行,由於apply只支持一個進程同時工做
        pool.apply_async(func, args=(i,),callback=f1) #同時運行5個進程,異步操做
        #注意上面參數callback是回調函數,調用完func後,再調用f1函數並將func函數的返回值做爲參數傳給f1
        print('11')  #先打印50個11說明for循環50次是所有執行完畢的,而後寫入進程等待
    pool.close() #在進程池.join以前必須加pool.close()-等待進程池全部進程處理完畢或pool.terminate()不等待直接結束
    pool.join()#此句表示進程池中的進程執行完畢後再關閉,若是註釋,那麼程序直接關閉

6、自定義線程池

一、自定義low版線程池

#!usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import time
from threading import Thread

class Thread_Pool:
    def __init__(self, maxsize):
        self.que = queue.Queue(maxsize)  #定義隊列賦值給對象的變量que
        for i in range(maxsize):   #循環將線程類放進隊列
            self.que.put(Thread)
    def get_thread(self):
        return self.que.get()
    def put_thread(self):
        self.que.put(Thread)
def func(t,args):
    time.sleep(1)
    print(args)
    t.put_thread()


t = Thread_Pool(5)
for i in range(50):
    Thread_class = t.get_thread()
    thread = Thread_class(target=func, args=(t,i))  #把t線程池對象傳給func,以便拿走線程後隨時給線程池補充線程
    thread.start()

二、low版的線程池與進程池相比有幾個問題:

一、線程池隊列中無論用不用,線程類都被建立,可能存在大於5個線程的狀況,由於,拿走1個線程,運行任務,而後增長1個線程,可能拿走的線程沒有運行完畢

二、須要手動獲得線程類,而後再根據線程類建立線程對象,最後還須要運行線程對象

三、沒有回調函數

四、線程池補充是手動補回的

絕版高大上線程,能夠直接做爲線程池模塊使用:

#!usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import time
StopEvent = object()
class TreadPool:
    def __init__(self, max_num, max_tast_num = 0):
        self.max_num = max_num  #最大線程數
        if max_tast_num:  #根據是否制定最大任務數來指定隊列程度
            self.q = queue.Queue()  #隊列不限定長度
        else:
            self.q = queue.Queue(max_tast_num)  #根據用戶指定長度建立隊列
        self.generate_list = []   #記錄生成的線程
        self.free_list = []   #記錄空閒的線程
        self.terminal = False
    def run(self, target, args, callback=None):
        """運行該函數,調用線程池"""
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            #沒有空閒線程而且當前建立線程數量小於最大容許線程數量時容許建立線程
            self.creat_thread() #調用建立線程函數
        tast = (target, args, callback)  #將任務打包成爲元組放入隊列
        self.q.put(tast)

    def creat_thread(self):
        """建立線程,而且運行,這時調用call函數,全部實現均在call函數內"""
        thread = threading.Thread(target=self.call)
        thread.start()
    def call(self):
        """線程調用該函數"""
        current_thread = threading.currentThread()  #獲取執行該函數的當前線程
        self.generate_list.append(current_thread)  #將線程加入生成的線程列表

        tast = self.q.get()  #從隊列中取出一個任務包

        while tast != StopEvent:
            target, args, backcall = tast  #將元組人物包,賦值給變量
            try:
                result = target(*args)  #執行函數,並將返回值賦值給result
            except Exception as e:
                result = None

            if backcall:
                try:
                    backcall(result)  #執行回調函數,並將result做爲參數傳給回調函數
                except Exception as e:
                    pass

            self.free_list.append(current_thread) #執行完畢,將當前線程對象加入空閒列表
            if self.terminal: #是否強制終止
                tast = StopEvent
            else:
                tast = self.q.get()  #等待那任務,若是有任務直接循環執行,若是沒有則等待,一旦run方法中放入任務則繼續執行任務,無需再建立線程
            self.free_list.remove(current_thread) #拿到任務後,清除空閒線程
        else:
            self.generate_list.remove(current_thread)
    def close(self):
        """全部線程所有運行完畢後,中止線程
        call函數運行完畢後,全部的線程此時都在等待拿任務,此時,只要給隊列裏面添加StopEvent對象則線程就會結束"""
        generate_size = len(self.generate_list)
        while generate_size:
            self.q.put(StopEvent)
            generate_size -= 1
    def terminate(self):
        """不等待線程所有運行完畢任務,直接終止"""
        self.terminal = True  #正在運行的線程運行完畢後會終止
        generate_size = len(self.generate_list)
        while generate_size:  #終止已經在等待的那一部分線程
            self.q.put(StopEvent)
            generate_size -= 1


def func(i):
    time.sleep(1)
    print(i)


pool = TreadPool(5)
for i in range(100):
    pool.run(target=func, args=(i,))
pool.close()

7、協程

什麼是協程呢,舉個例子:

從網站向下抓圖片,請求發出後,2秒後可以返回請求,網站一共100張圖片,若是用5個線程,那麼這5個線程中2秒共請求5張圖片。

若是用一個線程,請求後繼續請求下張圖片,請求返回後接收圖片,那麼可能0.01秒就能請求一個,這用能夠少用線程,而且效率也很高。

一、協程的基礎greenlet

首先用 pip3 install greenlet  安裝greelet模塊,而後測試下面代碼

#!usr/bin/env python
# -*- coding:utf-8 -*-

from greenlet import greenlet
def f1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def f2():
    print(45)
    gr1.switch()
    print(56)

gr1 = greenlet(f1)
gr2 = greenlet(f2)

gr1.switch() #遇到swith切換到f1函數

#輸出結果爲:
# 12
# 45
# 34
# 56

二、gevent是基於greenlet進行封裝的協程處理模塊,系統自動的進行控制,以下面代碼

經過:pip3 install gevent 導入gevent模塊。

gevent當遇到運行較慢時自動會執行下一個任務,當執行完畢後會回來繼續執行。

 

#!usr/bin/env python
# -*- coding:utf-8 -*-

import gevent
def foo():
    print('Runing in foo')
    gevent.sleep(0) #切換爲下一個任務
    print('Explicit context switch to foo again')
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch to bar')

gevent.joinall([gevent.spawn(foo),
                gevent.spawn(bar)])

#運行結果以下:
# Runing in foo
# Explicit context to bar
# Explicit context switch to foo again
# Implicit context switch to bar

 

爬蟲應用gevent,最主要的應用場景:

#!usr/bin/env python
# -*- coding:utf-8 -*-
from gevent import monkey;monkey.patch_all()
import gevent
import requests
def f(url):
    print('GET:%s' % url)
    resp = requests.get(url)
    data = resp.text
    print(url, len(data))

gevent.joinall([gevent.spawn(f, 'https://www.baidu.com/'),
                gevent.spawn(f,'https://www.yahoo.com/'),
                gevent.spawn(f,'https://taobao.com/'),
                gevent.spawn(f,'http://www.people.com.cn/')])

#返回結果:
# GET:https://www.baidu.com/
# GET:https://www.yahoo.com/
# GET:https://taobao.com/
# GET:http://www.people.com.cn/
# http://www.people.com.cn/ 135707
# https://www.baidu.com/ 2443
# https://taobao.com/ 134324
# https://www.yahoo.com/ 416613
相關文章
相關標籤/搜索