11.多線程、多進程和線程池編程

1.1.線程同步Lock和Rlock

(1)Lockhtml

  • 用鎖會影響性能
  • 用鎖會產生死鎖
import threading
from threading import Lock

total = 0
lock = Lock()

def add():
    global total
    global local
    for i in range(100000):
        lock.acquire()
        # lock.acquire()   #若是再加把鎖會產生死鎖
        total += 1
        lock.release()

def desc():
    global total
    global local
    for i in range(100000):
        lock.acquire()     #獲取鎖
        total -= 1
        lock.release()     #釋放鎖

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)   #0

(2)RLock多線程

RLock:在同一個線程裏面,能夠連續屢次調用acquire,必定要注意acquire和release的次數相等併發

import threading
from threading import Lock,RLock

total = 0
lock = RLock()

def add():
    global total
    global local
    for i in range(100000):
        #用RLock在同一線程裏面,能夠屢次調用acquire,不會產生死鎖
        lock.acquire()
        lock.acquire()
        total += 1
        #release的次數和acquire的次數相等
        lock.release()
        lock.release()

def desc():
    global total
    global local
    for i in range(100000):
        lock.acquire()     #獲取鎖
        total -= 1
        lock.release()     #釋放鎖

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)   #0

1.2.線程同步 - condition 

使用condition模擬對話app

import threading
from threading import Condition

 #經過condition,完成協同讀詩
class XiaoAi(threading.Thread):
    def __init__(self,cond):
        super().__init__(name='小愛')
        self.cond = cond

    def run(self):
        with self.cond:
            #等待
            self.cond.wait()
            print("{} : 在".format(self.name))
            #通知
            self.cond.notify()

            self.cond.wait()
            print("{} : 好啊".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 君住長江尾".format(self.name))
            self.cond.notify()

class TianMao(threading.Thread):
    def __init__(self,cond):
        super().__init__(name="天貓精靈")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小愛同窗".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 咱們來對古詩吧".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我在長江頭".format(self.name))
            self.cond.notify()
            self.cond.wait()

if __name__ == '__main__':
    cond = threading.Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)

    xiaoai.start()
    tianmao.start()

結果:async

 

1.3.線程同步 - Semaphore 

控制線程併發數量ide

#samaphore是用於控制進入數量的鎖

import threading
import time

class htmlSpider(threading.Thread):
    def __init__(self,url,sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success!")
        self.sem.release()   #釋放鎖

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem
    def run(self):
        for i in range(20):
            self.sem.acquire()    #加鎖
            html_htread = htmlSpider("baidu.com/{}".format(i), self.sem)
            html_htread.start()

if __name__ == '__main__':
    #控制線程併發數量爲3
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()

11.4.ThreadPoolExecutor線程池

線程池函數

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

#爲何要線程池
#主線程中能夠獲取某一個線程的狀態或者某一個任務的狀態,以及返回值
#當一個線程完成的時候,主線程立馬知道
#futures能夠讓多線程和多進程編碼接口一致

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)

#經過submit提交執行的函數到線程池中,sumbit是當即返回
task1 = executor.submit(get_html, (3))    #函數和參數

#done方法用於斷定某個任務是否完成
print(task1.done())      #False
time.sleep(4)
print(task1.done())      #True
#result方法查看task函數執行的結構
print(task1.result())    #3

用as_completed獲取任務結束的返回性能

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

#爲何要線程池
#主線程中能夠獲取某一個線程的狀態或者某一個任務的狀態,以及返回值
#當一個線程完成的時候,主線程立馬知道
#futures能夠讓多線程和多進程編碼接口一致

# def get_html(times):
#     time.sleep(times)
#     print("get page {} success".format(times))
#     return times
#
# executor = ThreadPoolExecutor(max_workers=2)
#
# #經過submit提交執行的函數到線程池中,sumbit是當即返回
# task1 = executor.submit(get_html, (3))    #函數和參數
#
# #done方法用於斷定某個任務是否完成
# print(task1.done())      #False
# time.sleep(4)
# print(task1.done())      #True
# #result方法查看task函數執行的結構
# print(task1.result())    #3

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)

#獲取已經成功的task的返回
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
    data = future.result()
    print(data)   #已經成功的task函數的return

11.5.進程間通訊 - Queue

Queueui

import time

from multiprocessing import Process, Queue

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == '__main__':
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

11.6.進程間通訊 - Manager

Manger編碼

import time

from multiprocessing import Process, Queue, Manager,Pool

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == '__main__':
    #pool中的進程間通訊須要使用manger中的queue
    queue = Manager().Queue(10)
    pool = Pool(2)   #建立進程池

    pool.apply_async(producer, args=(queue, ))
    pool.apply_async(consumer, args=(queue, ))

    pool.close()
    pool.join()

11.7.進程間通訊 - Pipe

pipe實現進程間通訊(只能兩個進程之間)

#Pipe進程間通訊
from multiprocessing import Process, Pipe

def producer(pipe):
    pipe.send("derek")

def consumer(pipe):
    print(pipe.recv())

if __name__ == '__main__':
    receive_pipe, send_pipe = Pipe()
    my_producer = Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(receive_pipe, ))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_producer.join()
相關文章
相關標籤/搜索