進程&線程&協程

進程:python

優勢:同時利用多個CPU,同時進行多個操做git

缺點:耗費資源(須要從新開闢內存空間)github

 

線程:數據庫

優勢:共享內存,IO操做時,創造併發操做windows

缺點:搶佔資源數組

 

總結:1.進程並非越多越好,CPU=進程個數,   線程也不是越多越好,如請求上下文切換耗時緩存

   2.進程線程的目的提升執行效率安全

   3.計算機中最小的執行單位是線程多線程

   4.IO操做利用CPU併發

   A:IO密集型(不用CPU) 使用多線程

   B:計算密集型(用CPU)多進程

GIL:全局解釋器鎖爲了鎖線程,   做用就是保證同一時刻只有一個線程能夠執行代碼,所以形成了咱們使用多線程的時候沒法實現並行。

線程鎖:若是多個線程同時修改某個數據,爲了防止錯誤,須要使用鎖

 

主線程等待,子線程執行:

join()

join(2)能夠傳入參數最多等2s

import threading
import time
globals_num = 0

lock = threading.RLock()

def fun():
    lock.acquire()#得到鎖
    global globals_num
    globals_num += 1
    time.sleep(1)
    print(globals_num)
    lock.release()#釋放鎖
for i in range(10):
    t = threading.Thread(target=fun)
    t.start()
#打印:

1
2
3
4
5
6
7
8
9
10

 

 

#建立線程
import time
import threading

def f0():
    pass
def f1(a1,a2):
    time.sleep(10)
    f0()
    
t = threading.Thread(target=f1,args=(123,456))#建立線程執行f1函數,把123,4546傳給f1
t.setDaemon(True)#設爲True直接執行
# t.setDaemon(False)#設爲False等候10秒
t.start()

t = threading.Thread(target=f1,args=(123,456))#建立線程執行f1函數,把123,4546傳給f1
t.setDaemon(True)
# t.setDaemon(False)
t.start()

t = threading.Thread(target=f1,args=(123,456))#建立線程執行f1函數,把123,4546傳給f1
t.setDaemon(True)
# t.setDaemon(False)
t.start()

Event:

線程間的通信,一個線程發送一個event,其它線程等待這個信號,用於主線程控制其它線程執行

event.wait():堵塞線程

evnet.set():標識wei位設未True

event.clear():標識位設未False

event.isSet():判斷標識位是否爲True

import threading

def do(event):
    print("start")
    event.wait()#紅燈wait等待,  綠燈執行
    print("execute")

event_obj = threading.Event()
for i in range(5):
    t = threading.Thread(target=do, args=(event_obj,))
    t.start()

event_obj.clear()#event默認爲False,  讓燈變紅
inp = input("input:")
if inp == "true":
    event_obj.set()#讓燈變綠執行
#線程執行的時候,若是flag爲False,則線程阻塞,爲True,線程不會阻塞,提供本地和遠程的併發性
# start
# start
# start
# start
# start
# input:true
# execute
# execute
# execute
# execute
# execute
threading.Condition: 條件變量condition內部是含有鎖的邏輯,否則沒法保證線程之間同步
import queue#隊列,線程安全,這個模型也叫生產者-消費者模型
import threading

message = queue.Queue(10)#數值小於或者等於0,隊列大小沒有限制。

def producer(i):#生產者
    print("put:",i)
    # while True:
    message.put(i)

def consumer(i):#消費者
    # while True:
        msg = message.get()
        print(msg)

for i in range(12):
    t = threading.Thread(target=producer,args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer,args=(i,))
    t.start()
# put: 0
# put: 1
# put: 2
# put: 3
# put: 4
# put: 5
# put: 6
# put: 7
# put: 8
# put: 9
# put: 10
# put: 11
# 0
# 1
# 2
# 3
# 4
# 5
# 6
# 7
# 8
# 9
get,等
get_nowait,不等
 
#建立進程
import multiprocessing
import time

def f1(a1):
    time.sleep(2)
    print(a1)

if __name__ == "__main__":#windows下運行進程必須加if __name__ == "__main__":
    t = multiprocessing.Process(target=f1, args=(11,))
    # t.daemon = True#默認False, 定義爲True主進程終止所有結束
    t.start()
    t.join()#與線程join相似,主線程等待,子線程執行
    t2 = multiprocessing.Process(target=f1, args=(12,))
    # t2.daemon = True
    t2.start()
    print("end")#主進程
from multiprocessing import Process
li = []

def foo(i):
    li.append(i)
    print("zc",li)
if __name__ == "__main__":
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()
# 每一個進程建立本身的列表,進程之間數據,內存不能共享,先調那個由CPU決定因此結果是無序的
# zc [0]
# zc [1]
# zc [2]
# zc [3]
# zc [4]
# zc [5]
# zc [6]
# zc [7]
# zc [8]
# zc [9]

 

import threading
li = []

def foo(i):
    li.append(i)
    print("zc",li)
if __name__ == "__main__":
    for i in range(10):
        p = threading.Thread(target=foo,args=(i,))#threading.Thread線程內存共享,是共同一個li
        p.start()

# zc [0]
# zc [0, 1]
# zc [0, 1, 2]
# zc [0, 1, 2, 3]
# zc [0, 1, 2, 3, 4]
# zc [0, 1, 2, 3, 4, 5]
# zc [0, 1, 2, 3, 4, 5, 6]
# zc [0, 1, 2, 3, 4, 5, 6, 7]
# zc [0, 1, 2, 3, 4, 5, 6, 7, 8]
# zc [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 
from multiprocessing import Process
#多進程 Multiprocessing 模塊

def f(name):
    print("hello",name)

if __name__ == "__main__":
    p = Process(target=f, args=("bob",))
# Process類進程對象,建立子進程的時候,只須要傳入一個執行函數和函數的參數便可完成
#target 函數名,須要調用的函數
#args 函數須要的參數,以 tuple 的形式傳入
    p.start()#star() 方法啓動進程
    p.join()#join() 方法實現進程間的同步,等待全部進程退出。
# p.close()#阻止多餘的進程涌入進程池 Pool 形成進程阻塞。
import multiprocessing
import os

def run_proc(name):
    print('Child process {0} {1} Running '.format(name, os.getpid()))
# os.getpid()獲取當前進程id     os.getppid()獲取父進程id
if __name__ == '__main__':
    print('Parent process {0} is Running'.format(os.getpid()))
    for i in range(5):
        p = multiprocessing.Process(target=run_proc, args=(str(i),))
        print('process start')
        p.start()
    p.join()
    print('Process close')
# Parent process 27428 is Running
# process start
# process start
# process start
# process start
# process start
# Child process 0 27176 Running 
# Child process 1 23384 Running 
# Child process 3 11524 Running 
# Child process 2 11560 Running 
# Child process 4 24904 Running 
# Process close

 

 
#進程間內存數據共享方式1
from multiprocessing import Process,Value,Array
#Value(內存數據共享),Array(數組,與列表類似)

def f(n,a):
    n.value = 3.1415
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == "__main__":
    num = Value("d",0.0)
    arr = Array("i",range(10))

    p = Process(target=f,args=(num,arr))#進程1
    a = Process(target=f,args=(num,arr))#進程2
    p.start()
    a.start()
    p.join()
    a.join()

    print(num.value)
    print(arr[:])
# 3.1415
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 數據共享後負負得正







#進程間內存數據共享方式2
from multiprocessing import Process, Manager
def f(d,l):
    d[l] = "1"
    d["2"] = 2
    d[0.26] = None
    l.reverse()

if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f,args=(d,l))#建立進程處理函數裏面的d,l變量
        p.start()
        p.join()

        print(d)
        print(l)
#{<ListProxy object, typeid 'list' at 0x24626a370b8>: '1', '2': 2, 0.26: None}
# [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
 

進程池:

python提供了進程池,Pool

from multiprocessing import Pool
import time
def f1(a):
    time.sleep(1)
    print(a)
    return 1000

def f2(arg):
    print(arg)#arg值是f1的返回值

if __name__ == "__main__":
    Pool = Pool(5)#建立5個進程池
    for i in range(40):#5個5個執行
         Pool.apply_async(func=f1, args=(i,),callback=f2)
         #1.每一個任務併發執行,先執行5個當有進程的時候再執行5個.內部沒有join()方法須要定義以下:
         #2.能夠設置回調函數callback
         print("1111111111111111")
         # Pool.apply(func=f1, args=(i,))
         #一個一個申請執行,一個執行完才執行下一個,內部有join()方法,不用定義
    Pool.close()#執行完後終止
    # Pool.terminate()#當即終止
    Pool.join()#進程池的join方法一個一個執行,join方法前面必須先定義close,terminate方法

 簡單版線程池:

 

import queue
import threading
import time

class ThreadPool(object):  #建立線程池類

    def __init__(self, max_num=20):  #建立一個最大長度爲20的隊列
        self.queue = queue.Queue(max_num)  #建立一個隊列
        for i in range(max_num):  #循環把線程對象加入到隊列中
            self.queue.put(threading.Thread)  #把線程的類名放進去,執行完這個Queue,20個隊列指向同一個Thread類

    def get_thread(self):  #定義方法從隊列裏獲取線程
        return self.queue.get()  #在隊列中獲取值

    def add_thread(self):  #線程執行完任務後,在隊列裏添加線程
        self.queue.put(threading.Thread)


def func(pool,a1):
    time.sleep(1)
    print(a1)
    pool.add_thread()  #線程執行完任務後,隊列裏再加一個線程

p = ThreadPool(10)  #執行init方法;  一次最多執行10個線程

for i in range(100):
    thread = p.get_thread()  #線程池10個線程,每一次循環拿走一個拿到類名,沒有就等待
    t = thread(target=func, args=(p, i,))  #建立線程;  線程執行func函數的這個任務;args是給函數傳入參數
    t.start()  #激活線程

#輸出無序的0-99數
# 對象等於類後面加括號
# 對象是線程

 

複雜版線程池:

線程池要點:
1,建立線程池時,是在須要執行線程的時候建立線程,而不是建立好最大隊列等待執行
2,建立一個回調函數,檢查出剩餘隊列的任務,當線程執行完函數的時候通知線程池,
3,使用線程池時讓其循環獲取任務,並執行
4,線程池,讓其自行的去激活線程,執行完成後,關閉退出

import queue
import threading
import time
import contextlib

StopEvent = object()

class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue()  # 最多建立的線程數(線程池最大容量)
        self.max_num = max_num

        self.terminal = False  #若是爲True 終止全部線程,不在獲取新任務
        self.generate_list = []  # 真實建立的線程列表
        self.free_list = []# 空閒線程數量

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()  #建立線程
        w = (func, args, callback,)  #把參數封裝成元祖
        self.q.put(w)  #添加到任務隊列

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread  # 獲取當前線程
        self.generate_list.append(current_thread)  #添加到已經建立的線程裏

        event = self.q.get()  # 取任務並執行
        while event != StopEvent:  # 是元組=》是任務;若是不爲中止信號  執行任務

            func, arguments, callback = event  #解開任務包; 分別取出值
            try:
                result = func(*arguments)  #運行函數,把結果賦值給result
                status = True  #運行結果是否正常
            except Exception as e:
                status = False  #表示運行不正常
                result = e  #結果爲錯誤信息

            if callback is not None:  #是否存在回調函數
                try:
                    callback(status, result)  #執行回調函數
                except Exception as e:
                    pass

            if self.terminal:  # 默認爲False,若是調用terminal方法
                event = StopEvent  #等於全局變量,表示中止信號
            else:
                # self.free_list.append(current_thread)  #執行完畢任務,添加到閒置列表
                # event = self.q.get()  #獲取任務
                # self.free_list.remove(current_thread)  # 獲取到任務以後,從閒置列表中刪除;不是元組,就不是任務
                with self.worker_state(self.free_list, current_thread):
                    event = self.q.get()

        else:
            self.generate_list.remove(current_thread)  #若是收到終止信號,就從已經建立的線程列表中刪除

    def close(self):  #終止線程
        num = len(self.generate_list)  #獲取總共建立的線程數
        while num:
            self.q.put(StopEvent)  #添加中止信號,有多少線程添加多少表示終止的信號
            num -= 1


    def terminate(self):   #終止線程(清空隊列)

        self.terminal = True  #把默認的False更改爲True

        while self.generate_list:  #若是有已經建立線程存活
            self.q.put(StopEvent)  #有幾個線程就發幾個終止信號
        self.q.empty()  #清空隊列

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

def work(i):
    print(i)

pool = ThreadPool(10)
for item in range(50):
    pool.run(func=work, args=(item,))
# 將任務放在隊列中
#      着手開始處理任務
#         - 建立線程
#                 - 有空閒線程,擇再也不建立線程
#                 - 不能高於線程池的限制
#                 - 根據任務個數判斷
#         - 線程去隊列中取任務

pool.terminate()

 協程:

 Python的 greenlet就至關於手動切換,去執行別的子程序,在「別的子程序」中又主動切換回來

greenlet協程例子:
# 協程就是:把線程分塊,不讓線程等待,讓線程遇到IO請求先執行1,或先執行2,或先執行3叫作協程

from greenlet import greenlet
# greenlet 其實就是手動切換;gevent是對greenlet的封裝,能夠實現自動切換
# import gevent
def test1():
    print("123")
    gr2.switch()   # 切換去執行test2
    print("456")
    gr2.switch()   # 切換回test2以前執行到的位置,接着執行

def test2():
    print("789")
    gr1.switch()   # 切換回test1以前執行到的位置,接着執行
    print("666")


gr1 = greenlet(test1)   # 建立的協程,啓動一個協程 注意test1不要加()
gr2 = greenlet(test2)   #
gr1.switch()

# 123
# 789
# 456
# 666

gevent 實現協程:

  Gevent 是一個第三方庫,能夠輕鬆經過gevent實現協程程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

  gevent會主動識別程序內部的IO操做,當子程序遇到IO後,切換到別的子程序。若是全部的子程序都進入IO,則阻塞。

協程之gevent例子:

import gevent

def func1():
    print("func1 running")
    gevent.sleep(2)             # 內部函數實現io操做
    print("switch func1")

def func2():
    print("func2 running")
    gevent.sleep(1)
    print("switch func2")

def func3():
    print("func3  running")
    gevent.sleep(0)
    print("func3 done..")

gevent.joinall([gevent.spawn(func1),
                gevent.spawn(func2),
                gevent.spawn(func3),
                ])

# func1 running
# func2 running
# func3  running
# func3 done..
# switch func2
# switch func1

同步與異步性能區別:

同步:   發一個請求須要等待返回, 全部的操做都作完,才返回給用戶結果。即寫完數據庫以後,在響應用戶,用戶體驗很差。使用場景:銀行轉帳,數據庫保存操做

異步:   發一個請求不須要等待返回,不用等全部操做等作完,就響應用戶請求。即先響應用戶請求,而後慢慢去寫數據庫,用戶體驗較好。  使用場景:爲了不短期大量的數據庫操做,就使用緩存機制,也就是消息隊列。先將數據放入消息隊列,而後再慢慢寫入數據庫。

import gevent

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1, 10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

# Synchronous:
# Task 1 done
# Task 2 done
# Task 3 done
# Task 4 done
# Task 5 done
# Task 6 done
# Task 7 done
# Task 8 done
# Task 9 done
# Asynchronous:
# Task 0 done
# Task 1 done
# Task 2 done
# Task 3 done
# Task 4 done
# Task 5 done
# Task 6 done
# Task 7 done
# Task 8 done
# Task 9 done

上面程序的重要部分是將task函數封裝到greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

 遇到Io阻塞時會切換任務之【爬蟲版】

from urllib import request import gevent,time from gevent import monkey monkey.patch_all() # 把當前程序中的全部io操做都作上標記

def spider(url): print("GET:%s" % url) resp = request.urlopen(url) data = resp.read() print("%s bytes received from %s.." % (len(data), url)) urls = [ "https://www.python.org/", "https://www.yahoo.com/", "https://github.com/" ] start_time = time.time() for url in urls: spider(url) print("同步耗時:",time.time() - start_time) async_time_start = time.time() gevent.joinall([ gevent.spawn(spider,"https://www.python.org/"), gevent.spawn(spider,"https://www.yahoo.com/"), gevent.spawn(spider,"https://github.com/"), ]) print("異步耗時:",time.time() - async_time_start) # GET:https://www.python.org/ # 48814 bytes received from https://www.python.org/.. # GET:https://www.yahoo.com/ # 492112 bytes received from https://www.yahoo.com/.. # GET:https://github.com/ # 81165 bytes received from https://github.com/.. # 同步耗時: 43.494789600372314 # GET:https://www.python.org/ # GET:https://www.yahoo.com/ # GET:https://github.com/ # 492000 bytes received from https://www.yahoo.com/.. # 59868 bytes received from https://github.com/.. # 48814 bytes received from https://www.python.org/.. # 異步耗時: 21.32669472694397

經過gevent實現【單線程】下的多socket併發

server端:
import sys
import socket
import time
import gevent

from gevent import socket, monkey

monkey.patch_all()

def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)

def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()
if __name__ == '__main__':
    server(9999)

client端:
import socket

HOST = 'localhost'  # The remote host
PORT = 9999  # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
    msg = bytes(input(">>:"), encoding="utf8")
    s.sendall(msg)
    data = s.recv(1024)
    # print(data)

    print('Received', repr(data))
s.close()

yield實現協程  

  前文所述「子程序(函數)在執行過程當中能夠中斷去執行別的子程序;別的子程序也能夠中斷回來繼續執行以前的子程序」,那麼很容易想到Python的yield,顯然yield是能夠實現這種切換的。

使用yield實現協程操做例子:

def consumer(name):
    print("要開始啃骨頭了...")
    while True:
        print("\033[31;1m[consumer] %s\033[0m " % name)
        bone = yield
        print("[%s] 正在啃骨頭 %s" % (name, bone))


def producer(obj1, obj2):
    obj1.send(None)    # 啓動obj1這個生成器,第一次必須用None  <==> obj1.__next__()
    obj2.send(None)    # 啓動obj2這個生成器,第一次必須用None  <==> obj2.__next__()
    n = 0
    while n < 5:
        n += 1
        print("\033[32;1m[producer]\033[0m 正在生產骨頭 %s" % n)
        obj1.send(n)
        obj2.send(n)


if __name__ == '__main__':
    con1 = consumer("消費者A")
    con2 = consumer("消費者B")
    producer(con1, con2)

相關文章
相關標籤/搜索