線程是最小的工做單位,一個應用程序至少有一個進程,一個進程至少有一個線程。
應用場景:
IO密集型:使用線程
計算密集型:使用進程
GIL: 全局解釋器鎖,保證同一進程中只有一個線程同時被調度。python
線程的基本使用:git
def task(arg): time.sleep(arg) print(arg) for i in range(5): t = threading.Thread(target=task,args=[i,]) # t.setDaemon(True) # 主線程終止,不等待子線程 # t.setDaemon(False) t.start() # t.join() # 一直等 # t.join(1) # 等待最大時間
鎖:
1.只有一我的使用鎖:github
import threading import time lock = threading.Lock() v =10 def task(arg): time.sleep(1) lock.acquire() global v v -= 1 print(v) lock.release() for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start()
2.多人使用鎖:併發
lock = threading.BoundedSemaphore(3) v =10 def task(arg): lock.acquire() time.sleep(1) global v v -= 1 print(v) lock.release() for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start()
3.事件鎖(全部人解脫鎖的限制)app
lock = threading.Event() def task(arg): time.sleep(1) lock.wait() print(arg) for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() while True: value = input(">>>") if value == '1': lock.set()
4.自定義解鎖個數(爲所欲爲鎖)socket
lock = threading.Condition() def task(arg): time.sleep(1) lock.acquire() lock.wait() print("線程:%s" % arg) lock.release() for i in range(10): t = threading.Thread(target=task,args=[i,]) t.start() while True: value = input(">>") lock.acquire() lock.notify(int(value)) lock.release()
線程池:
模式一: 直接處理ui
def task(url): """ 任務執行兩個操做:下載;保存本地 """ # response中封裝了Http請求響應的全部數據 # - response.url 請求的URL # - response.status_code 響應狀態碼 # - response.text 響應內容(字符串格式) # - response.content 響應內容(字節格式) # 下載 response = requests.get(url) # 下載內容保存至本地 f = open('a.log','wb') f.write(response.content) f.close() pool = ThreadPoolExecutor(2) url_list = [ 'http://www.oldboyedu.com', 'http://www.autohome.com.cn', 'http://www.baidu.com', ] for url in url_list: print('開始請求',url) # 去鏈接池中獲取連接 pool.submit(task,url)
模式二:分步處理url
def save(future): """ 只作保存 # future中包含response """ response = future.result() # 下載內容保存至本地 f = open('a.log','wb') f.write(response.content) f.close() def task(url): """ 只作下載 requests """ # response中封裝了Http請求響應的全部數據 # - response.url 請求的URL # - response.status_code 響應狀態碼 # - response.text 響應內容(字符串格式) # - response.content 響應內容(字節格式) # 下載 response = requests.get(url) return response pool = ThreadPoolExecutor(2) url_list = [ 'http://www.oldboyedu.com', 'http://www.autohome.com.cn', 'http://www.baidu.com', ] for url in url_list: print('開始請求',url) # 去鏈接池中獲取連接 # future中包含response future = pool.submit(task,url) # 下載成功後,自動調用save方法 future.add_done_callback(save)
進程的基本使用:spa
from multiprocessing import Process import time def task(arg): time.sleep(arg) print(arg) if __name__ == '__main__': for i in range(10): p = Process(target=task,args=(i,)) p.daemon = True # p.daemon = False p.start() p.join(1) print('主進程最後...')
進程之間的數據共享:經過Array(‘類型’,長度) 或者Manager().list() / Manager().dict()線程
from multiprocessing import Process,Array,Manager from threading import Thread ''' # 驗證進程之間數據不共享 def task(num,li): li.append(num) print(li) if __name__ == "__main__": v = [] for i in range(10): p = Process(target=task,args=(i,v,)) p.start() ''' ''' # 進程間數據共享方式一: def task(num,li): li[num] = 1 print(list(li)) if __name__ == "__main__": v = Array('i',10) for i in range(10): p = Process(target=task,args=(i,v,)) p.start() ''' # 進程間數據共享方式二:經過socket def task(num,li): li.append(num) print(li) if __name__ == '__main__': v = Manager().list() # v = Manager().dict() for i in range(10): p = Process(target=task,args=(i,v,)) p.start() p.join()
進程池的使用:
from concurrent.futures import ProcessPoolExecutor def call(arg): data = arg.result() print(data) def task(arg): return arg + 100 if __name__ == "__main__": pool = ProcessPoolExecutor(10) for i in range(100): obj = pool.submit(task,i) obj.add_done_callback(call)
協程永遠是一個進程在執行,是對線程的分片處理。
greenlet: python自帶的協程模塊
# 協程 from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() # 根據協程二次開發:協程+IO from gevent import monkey; monkey.patch_all() import gevent import requests def f(url): response = requests.get(url) print(response.url,response.status_code) gevent.joinall([ gevent.spawn(f, 'http://www.oldboyedu.com/'), gevent.spawn(f, 'http://www.baidu.com/'), gevent.spawn(f, 'http://github.com/'), ])
IO多路複用,用於監聽多個socket對象是否變化(可讀,可寫,發送錯誤)
import socket import select # IO多路複用:8002,8001 # ############### 基於select實現服務端的「僞」併發 ############### sk1 = socket.socket() sk1.bind(('127.0.0.1',8001,)) sk1.listen(5) sk2 = socket.socket() sk2.bind(('127.0.0.1',8002,)) sk2.listen(5) inputs = [sk1,sk2,] w_inputs = [] while True: # IO多路複用,同時監聽多個socket對象 # - select,內部進行循環操做(1024) 主動查看 # - poll, 內部進行循環操做 主動查看 # - epoll, 被動告知 r,w,e = select.select(inputs,w_inputs,inputs,0.05) # r = [sk2,] # r = [sk1,] # r = [sk1,sk2] # r = [] # r = [conn,] # r = [sk1,Wconn] #######? for obj in r: if obj in [sk1,sk2]: # 新鏈接撿來了... print('新鏈接來了:',obj) conn,addr = obj.accept() inputs.append(conn) else: # 有鏈接用戶發送消息來了.. print('有用戶發送數據了:',obj) try: data = obj.recv(1024) except Exception as ex: data = "" if data: w_inputs.append(obj) # obj.sendall(data) else: obj.close() inputs.remove(obj) w_inputs.remove(obj) for obj in w: obj.sendall(b'ok') w_inputs.remove(obj)
模擬socketserver:
import socket import select import threading def process_request(conn): while True: v = conn.recv(1024) conn.sendall(b'1111') sk1 = socket.socket() sk1.bind(('127.0.0.1',8001,)) sk1.listen(5) inputs=[sk1,] while True: # IO多路複用,同時監聽多個socket對象 # - select,內部進行循環操做(1024) 主動查看 # - poll, 內部進行循環操做 主動查看 # - epoll, 被動告知 r,w,e = select.select(inputs,[],inputs,0.05) for obj in r: if obj in sk1: # conn客戶端的socket conn,addr = obj.accept() t = threading.Thread(target=process_request,args=(conn,)) t.start()