4.4 線程

線程

相關概念

 進程線程關係圖

threading 模塊

線程建立

  方法形式建立線程python

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    print('主線程')
View Code

   類形式建立線程(必須內含有 run方法 以及繼承 Thread)mysql

from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('egon')
    t.start()
    print('主線程')
View Code

相關方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

   join 方法實例sql

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())
    '''
    egon say hello
    主線程
    False
    '''
View Code

  守護線程實例數據庫

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()以前設置
    t.start()

    print('主線程')
    print(t.is_alive())
    '''
    主線程
    True
    '''
View Code
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")
View Code

同步鎖  

  用法和場景同 進程 編程

import threading
R=threading.Lock()
R.acquire()
'''
對公共數據的操做
'''
R.release()

死鎖/重入鎖

  死鎖實例多線程

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

  科學家吃麪死鎖問題app

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
View Code

  解決方式:重入鎖dom

import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麪條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麪'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麪條' % name)
    print('%s 吃麪' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
View Code

信號量 Semaphore 

   信號量 Semaphore 舉例異步

from threading import Thread,Semaphore,currentThread
import time,random
sm = Semaphore(5) #運行的時候有5我的
def task():
    sm.acquire()
    print('\033[42m %s上廁所'%currentThread().getName())
    time.sleep(random.randint(1,3))
    print('\033[31m %s上完廁所走了'%currentThread().getName())
    sm.release()
if __name__ == '__main__':
    for i in range(20):  #開了10個線程 ,這20人都要上廁所
        t = Thread(target=task)
        t.start()
View Code

 

Event 事件

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。async

若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。

爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。

在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。

一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。

若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行

簡單來講:

  當存在多個線程之間有這狀態依附時,使用 Event

  Event 對象的 初始 狀態值 爲 False 當 isSet 判斷狀態時就一直 wait 阻塞

  當達到需求的狀態時,經過 set 將狀態置 True (或者經過 clear 恢復成 False)

  爲 True 時才能夠正確下面的代碼執行

  

from threading import Event
Event.isSet() #返回event的狀態值
Event.wait() #若是 event.isSet()==False將阻塞線程;
Event.set() #設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;
Event.clear() #恢復

  連接數據庫實例 

# 首先定義兩個函數,一個是鏈接數據庫
# 一個是檢測數據庫
from threading import Thread, Event, currentThread
import time

e = Event()


def conn_mysql():
    '''連接數據庫'''
    count = 1
    while not e.is_set():  # 當沒有檢測到時候 is_set 爲 False
        if count > 3:  # 若是嘗試次數大於3,就主動拋異常
            raise ConnectionError('嘗試連接的次數過多')
        print('\033[45m%s 第%s次嘗試' % (currentThread(), count))
        e.wait(timeout=1)  # 等待檢測(裏面的參數是超時1秒)
        count += 1
    print('\033[44m%s 開始連接...' % (currentThread().getName()))


def check_mysql():
    '''檢測數據庫'''
    print('\033[42m%s 檢測mysql...' % (currentThread().getName()))
    time.sleep(5)
    e.set()     # 檢測成功後設置爲 True


if __name__ == '__main__':
    for i in range(3):  # 三個去連接
        t = Thread(target=conn_mysql)
        t.start()
    t = Thread(target=check_mysql)
    t.start()
View Code

   紅綠燈例子

from  threading import Thread,Event,currentThread
import time
e = Event()
def traffic_lights():
    '''紅綠燈'''
    time.sleep(5)
    e.set()
def car():
    ''''''
    print('\033[42m %s 等綠燈\033[0m'%currentThread().getName())
    e.wait()
    print('\033[44m %s 車開始通行' % currentThread().getName())
if __name__ == '__main__':
    for i in range(10):
        t = Thread(target=car)  #10輛車
        t.start()
    traffic_thread = Thread(target=traffic_lights)  #一個紅綠燈
    traffic_thread.start()
View Code

定時器

  指定n秒後執行某操做

from threading import Timer
def func(n):
    print('hello,world',n)
t = Timer(3,func,args=(123,))  #等待三秒後執行func函數,由於func函數有參數,那就再傳一個參數進去
t.start()

GIL 全局解釋器鎖

概念

  在Cpython解釋器中,由於有GIL鎖的存在同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點。 

  同一時刻同一進程中只有一個線程被執行

線程池

建立方法

from  concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import os,time,random
def task(n):
    print('%s:%s is running'%(currentThread().getName(),os.getpid()))  #看到的pid都是同樣的,由於線程是共享了一個進程
    time.sleep(random.randint(1,3))  #I/O密集型的,,通常用線程,用了進程耗時長
    return n**2
if __name__ == '__main__':
    start = time.time()
    p = ThreadPoolExecutor() #線程池 #若是不給定值,默認cup*5
    l = []
    for i in range(10):  #10個任務 # 線程池效率高了
        obj  = p.submit(task,i)  #至關於apply_async異步方法
        l.append(obj)
    p.shutdown()  #默認有個參數wite=True (至關於close和join)
    print('='*30)
    print([obj.result() for obj in l])
    print(time.time() - start)  #3.001171827316284

"""
ThreadPoolExecutor-0_0:12816 is running
ThreadPoolExecutor-0_1:12816 is running
ThreadPoolExecutor-0_2:12816 is running
ThreadPoolExecutor-0_3:12816 is running
ThreadPoolExecutor-0_4:12816 is running
ThreadPoolExecutor-0_5:12816 is running
ThreadPoolExecutor-0_6:12816 is running
ThreadPoolExecutor-0_7:12816 is running
ThreadPoolExecutor-0_8:12816 is running
ThreadPoolExecutor-0_9:12816 is running
==============================
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
3.0191218852996826

"""

線程池實例

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import time,os
def get_page(url):
    print('<%s> is getting [%s]'%(os.getpid(),url))
    response = requests.get(url)
    if response.status_code==200:  
        return {'url':url,'text':response.text}
def parse_page(res):
    res = res.result() # 須要用 result 拿到對象結果
    print('<%s> is getting [%s]'%(os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res = 'url:%s size:%s\n'%(res['url'],len(res['text']))
        f.write(parse_res)
if __name__ == '__main__':
    # p = ThreadPoolExecutor()
    p = ProcessPoolExecutor()
    l = [
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
        'http://www.baidu.com',
    ]
    for url in l:
        res = p.submit(get_page,url).add_done_callback(parse_page) #這裏的回調函數拿到的是一個對象。
    #  須要先把返回的res獲得一個結果。即在前面加上一個res.result() ,誰好了誰去掉用回調函數
    #  回調函數也是一種編程思想。不只開線程池用,開線程池也用
    p.shutdown()  #至關於進程池裏的close和join
    print('',os.getpid())

map 應用實例

    map 也能夠回調函數,因此功能上能夠替換

# 咱們的那個p.submit(task,i)和map函數的原理相似。咱們就
# 能夠用map函數去代替。更減縮了代碼
from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task(n):
    print('[%s] is running'%os.getpid())
    time.sleep(random.randint(1,3))  #I/O密集型的,,通常用線程,用了進程耗時長
    return n**2
if __name__ == '__main__':
    p = ProcessPoolExecutor()
    obj = p.map(task,range(10))
    p.shutdown()  #至關於close和join方法
    print('='*30)
    print(obj)  #返回的是一個迭代器
    print(list(obj))

# map函數應用
相關文章
相關標籤/搜索