Python自動化開發課堂筆記【Day10】 - Python進階(線程)

線程python

定義:一條流水線的執行過程是一個線程,一條流水線必須屬於一個車間,一個車間的運行過程就是一個進程(一個進程內至少有一個線程)
          進程是資源單位,而線程纔是CPU上的執行單位,線程建立的開銷遠遠小於進程

多線程:一個車間內有多條流水線,多個流水線共享該車間的資源(多線程共享一個進程的資源)

爲什麼要建立多線程:
  1. 資源共享
  2. 建立開銷小

開啓線程的兩種方式:mysql

方式一:

from threading import Thread

def work(name):
    print('%s say hello' % name)
    
if __name__ == '__main__':
    t = Thread(target=work,args=('Albert',))
    t.start()
    print('main thread')

方式二: from threading import Thread

class MyThread(Thread):

    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print('%s say hello' % self.name)

if __name__ == '__main__':

    t = MyThread('Albert')
    t.start()
    print('main thread')

P.S. 主進程和主線程公用同一個PID, 驗證:git

from threading import Thread
import os

def work():
    print('%s say hello' % os.getpid())

if __name__ == '__main__':
    t = Thread(target=work,)
    t.start()
    print('main thread:%s' % os.getpid())

多線程練習:github

練習一:

服務端:

import socket
import threading

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def action(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            print(data)
            conn.send(data.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,addr = server.accept()
        p = threading.Thread(target=action,args=(conn,))
        p.start()
        
客戶端:

import socket

client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg = input('>>>:').strip()
    if not msg: continue
    client.send(msg.encode())
    back_msg = client.recv(1024)
    print(back_msg.decode())

練習二: from threading import Thread

data_l = []
format_data_l = []

def inp():
    while True:
        data = input('>>>:').strip()
        if not data:continue
        data_l.append(data)

def format():
    while True:
        if data_l:
            data = data_l.pop()
            format_data = data.upper()
            format_data_l.append(format_data)

def write():
    while True:
        if format_data_l:
            data = format_data_l.pop()
            with open('c.txt','a',encoding='utf-8') as f:
                f.write(data + '\n')

if __name__ == '__main__':

    t1 = Thread(target=inp)
    t2 = Thread(target=format)
    t3 = Thread(target=write)

    t1.start()
    t2.start()
    t3.start()

線程一些其餘屬性web

import threading
from threading import Thread
import os

def work():
    print('%s say hello' % threading.current_thread().getName())

if __name__ == '__main__':
    t = Thread(target=work,)
    t.setDaemon(True) #線程的守護進程
    t.start()
    t.join()
    print(threading.enumerate()) #以列表形式顯示當前活躍線程
    print(threading.active_count()) #活躍線程數量統計
    print('main thread:%s' % threading.current_thread().getName()) #獲取當前線程名稱

GIL鎖sql

因爲python GIL的存在,在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點。
GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念
有了GIL的存在,同一時刻統一進程中只有一個線程被執行

結論:
  對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用
  固然對於一個程序來講,不會是純計算或者純I/O,咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,從而進一步分析python的多線程有無用武之地
如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。

應用:
多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

注意:
GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),
後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock 多線程

示例:
I/O密集型

from threading import Thread
from multiprocessing import Process
import time
import os

def work():
    time.sleep(1)
    print(os.getpid())

if __name__ == '__main__':
    tp_l = []
    start_time = time.time()
    for i in range(100):
        tp = Thread(target=work) #run_time is 1.0190582275390625
        # tp = Process(target=work) #run_time is 10.807618141174316
        tp_l.append(tp)
        tp.start()

    for tp in tp_l:
        tp.join()

    stop_time = time.time()
    print('run_time is %s' % (stop_time - start_time))
    
    
計算密集型 from threading import Thread
from multiprocessing import Process
import os
import time

def work():
    res = 0
    for i in range(100000):
        res+=i

if __name__ == '__main__':
    tp_l = []
    start_time = time.time()
    for i in range(300):
        # tp = Thread(target=work) # run_time is 4.402251720428467
        tp = Process(target=work) # run_time is 28.153610229492188
        tp_l.append(tp)
        tp.start()

    for tp in tp_l:
        tp.join()

    stop_time = time.time()
    print('run_time is %s' % (stop_time - start_time))

互斥鎖併發

from threading import Thread, Lock
import time

n = 100
def work():
    with mutex:
        global n
        temp = n
        time.sleep(0.1)
        n = temp - 1

if __name__ == '__main__':
    mutex = Lock()
    t_l = []
    for i in range(100):
        t = Thread(target=work)
        t_l.append(t)
        t.start()
    for i in t_l:
        i.join()
    print(n)  

死鎖與遞歸鎖app

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。
                  此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖:socket

死鎖示例:

from threading import Thread,Lock
import time

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()
        print('\033[40m%s get LockA\033[0m' % self.name)
        mutexB.acquire()
        print('\033[41m%s get LockB\033[0m' % self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        time.sleep(1)
        print('\033[41m%s get LockB\033[0m' % self.name)
        mutexA.acquire()
        print('\033[40m%s get LockA\033[0m' % self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    mutexA = Lock()
    mutexB = Lock()
    for i in range(20):
        t = MyThread()
        t.start()

如何解決死鎖問題:
  遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
  這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。
  直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

from threading import Thread,RLock
import time

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()
        print('\033[40m%s get LockA\033[0m' % self.name)
        mutexB.acquire()
        print('\033[41m%s get LockB\033[0m' % self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        time.sleep(1)
        print('\033[41m%s get LockB\033[0m' % self.name)
        mutexA.acquire()
        print('\033[40m%s get LockA\033[0m' % self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    # mutexA = Lock()
    # mutexB = Lock()
    # 同時引用爲一把鎖,不要誤認爲是兩把鎖
    mutexA = mutexB = RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
    for i in range(20):
        t = MyThread()
        t.start()

信號量(Semaphore)

同進程的同樣
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

from threading import Thread,Semaphore
import time

def work(id):
    with sem:
        time.sleep(2)
        print('%s say hello' %id)

if __name__ == '__main__':
    sem = Semaphore(5)
    for i in range(20):
        t = Thread(target=work,args=(i,))
        t.start()

事件(Event)

event.isSet():返回event的狀態值;
event.wait():若是 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
event.clear():恢復event的狀態值爲False。
from threading import Event ,Thread
import threading
import time

def conn_mysql():
    print('%s waiting...' % threading.current_thread().getName())
    print(e.isSet()) #False
    e.wait()
    print('%s start to connect mysql...' % threading.current_thread().getName())
    print(e.isSet()) #True
    time.sleep(2)

def check_mysql():
    print('%s is checking...' % threading.current_thread().getName())
    time.sleep(3)
    print(e.isSet()) #False
    e.set()
    print(e.isSet()) #True

if __name__ == '__main__':
    e = Event()
    t1 = Thread(target=conn_mysql)
    t2 = Thread(target=conn_mysql)
    t3 = Thread(target=conn_mysql)
    t4 = Thread(target=check_mysql)
    t1.start()
    t2.start()
    t3.start()
    t4.start()

定時器

指定n秒後執行某操做

from threading import Timer

def hello():
    print('hello, world')
t = Timer(3,hello)
t.start()

線程queue

import queue

q = queue.Queue() #先進先出--->隊列

q.put('first')
q.put('second')
q.put((1,2,3,4))

print(q.get())
print(q.get())
print(q.get())

q = queue.LifoQueue() #後進先出--->堆棧

q.put('first')
q.put('second')
q.put((1,2,3,4))

print(q.get())
print(q.get())
print(q.get())

q = queue.PriorityQueue() #優先級queue,數字越小,優先級越高

q.put((1,'a'))
q.put((4,'b'))
q.put((3,'c'))

print(q.get())
print(q.get())
print(q.get())

協程

定義:單線程下的併發,又稱微線程。協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
          要實現協程,關鍵在於用戶程序本身控制程序切換,切換以前必須由用戶程序本身保存協程上一次調用時的狀態,如此,每次從新調用時,可以從上次的位置繼續執行
         (詳細的:協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧)

協程的定義(知足1,2,3就可稱爲協程):

  1.必須在只有一個單線程裏實現併發
  2.修改共享數據不需加鎖
  3.用戶程序裏本身保存多個控制流的上下文棧
  4.附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

須要強調的是:
  1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)
  2. 單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換

對比操做系統控制線程的切換,用戶在單線程內控制協程的切換,優勢以下:
  1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
  2. 單線程內就能夠實現併發的效果,最大限度地利用cpu


yield:
1. yiled能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級
2. send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換

缺點:
協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程
協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

無yield方式:

from threading import Thread
import time

def consumer(item):
    print(item)
    x = 1
    y = 2
    z = 3

def producer(target,seq):
    for item in seq:
        target(item)

s_time = time.time()
producer(consumer,range(500000))
e_time = time.time()
print('run time %s' % (e_time - s_time)) #4.764272451400757

yield方式:

from threading import Thread
import time

def consumer():
    x = 1
    y = 2
    z = 3
    while True:
        item = yield

def producer(target,seq):
    for item in seq:
        target.send(item)

g=consumer()
next(g)
s_time = time.time()
producer(g,range(500000))
e_time = time.time()
print('run time %s' % (e_time - s_time)) #run time 0.12200713157653809

Greenlet模塊

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

from greenlet import greenlet

def test1():
    print('test1,1')
    gr2.switch()
    print('test1,2')
    gr2.switch()
def test2():
    print('test2,1')
    gr1.switch()
    print('test2,2')
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

Gevent模塊

實現單線程下遇到I/O自動切換

from gevent import monkey
monkey.patch_all()
import gevent
import time

def eat(name):
    print('%s eat food first' % name)
    # gevent.sleep(2)
    time.sleep(2)
    print('%s eat food second' % name)

def play(name):
    print('%s play phone 1' % name)
    # gevent.sleep(1)
    time.sleep(1)
    print('%s play phone 2' % name)

def drink(name):
    print('%s is drinking' % name)
    # gevent.sleep(4)
    time.sleep(4)
    print('%s is drinking' % name)

g1 = gevent.spawn(eat,'Albert')
g2 = gevent.spawn(play,'Albert')
g3 = gevent.spawn(drink,'Albert')
g1.join()
g2.join()
g3.join()
print('main thread')


協程實現併發爬取網頁 from gevent import monkey
monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET Page: %s' % url)
    res = requests.get(url)
    if res.status_code == 200:
        print(res.text)

s_time = time.time()
gevent.joinall([gevent.spawn(get_page,'https://www.python.org/'),
                gevent.spawn(get_page,'https://github.com/')])
e_time = time.time()
print('run time %s' % (e_time - s_time))

單線程實現併發的socket

from gevent import monkey
monkey.patch_all()
from socket import *
import gevent

def server(ip,port):
    s = socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((ip,port))
    s.listen(5)
    while True:
        conn,addr = s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res = conn.recv(1024)
            print('client %s : %s msg: %s' % (addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1',8080)
服務端
from threading import Thread
from socket import *
import threading

def client(ip,port):
    c = socket(AF_INET,SOCK_STREAM)
    c.connect((ip,port))

    count = 0
    while True:
        c.send(('%s say hello %s' % (threading.current_thread().getName(),count)).encode())
        msg = c.recv(1024)
        print(msg.decode())
        count += 1

if __name__ == '__main__':

    for i in range(100):
        t = Thread(target=client,args=('127.0.0.1',8080))
        t.start()
客戶端

socketserver

import socketserver

class MyHandler(socketserver.BaseRequestHandler):
    def handle(self):
        while True:
            res = self.request.recv(1024)
            print('client %s msg: %s' % (self.client_address,res))
            self.request.send(res.upper())

if __name__ == '__main__':
    s = socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyHandler)
    s.serve_forever()
客戶端
import socket

client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',8080))

while True:
    msg = input('>>>:').strip()
    if not msg: continue
    client.send(msg.encode())
    back_msg = client.recv(1024)
    print(back_msg.decode())
服務端

基於UDP的socket

# 非併發效果
# from socket import *
#
# s = socket(AF_INET,SOCK_DGRAM)
# s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
# s.bind(('127.0.0.1',8080))
#
# while True:
#     msg,addr = s.recvfrom(1024)
#     print(msg)
#     s.sendto(msg.upper(),addr)

# 基於socketserver的併發效果

import socketserver

class MyUDPhandler(socketserver.BaseRequestHandler):
    def handle(self):
        client_msg,s = self.request
        s.sendto(client_msg.upper(),self.client_address)

if __name__ == '__main__':
    s = socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler)
    s.serve_forever()
服務端
from socket import *

c = socket(AF_INET,SOCK_DGRAM)

while True:
    msg = input('>>>:').strip()
    c.sendto(msg.encode(),('127.0.0.1',8080))
    back_msg,addr= c.recvfrom(1024)
    print('from server %s:%s' % (addr,back_msg.decode()))
客戶端
相關文章
相關標籤/搜索