進程:python
優勢:同時利用多個CPU,同時進行多個操做git
缺點:耗費資源(須要從新開闢內存空間)github
線程:數據庫
優勢:共享內存,IO操做時,創造併發操做windows
缺點:搶佔資源數組
總結:1.進程並非越多越好,CPU=進程個數, 線程也不是越多越好,如請求上下文切換耗時緩存
2.進程線程的目的提升執行效率安全
3.計算機中最小的執行單位是線程多線程
4.IO操做利用CPU併發
A:IO密集型(不用CPU) 使用多線程
B:計算密集型(用CPU)多進程
GIL:全局解釋器鎖爲了鎖線程, 做用就是保證同一時刻只有一個線程能夠執行代碼,所以形成了咱們使用多線程的時候沒法實現並行。
線程鎖:若是多個線程同時修改某個數據,爲了防止錯誤,須要使用鎖
主線程等待,子線程執行:
join()
join(2)能夠傳入參數最多等2s
import threading import time globals_num = 0 lock = threading.RLock() def fun(): lock.acquire()#得到鎖 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release()#釋放鎖 for i in range(10): t = threading.Thread(target=fun) t.start()
#打印:
1
2
3
4
5
6
7
8
9
10
#建立線程 import time import threading def f0(): pass def f1(a1,a2): time.sleep(10) f0() t = threading.Thread(target=f1,args=(123,456))#建立線程執行f1函數,把123,4546傳給f1 t.setDaemon(True)#設爲True直接執行 # t.setDaemon(False)#設爲False等候10秒 t.start() t = threading.Thread(target=f1,args=(123,456))#建立線程執行f1函數,把123,4546傳給f1 t.setDaemon(True) # t.setDaemon(False) t.start() t = threading.Thread(target=f1,args=(123,456))#建立線程執行f1函數,把123,4546傳給f1 t.setDaemon(True) # t.setDaemon(False) t.start()
Event:
線程間的通信,一個線程發送一個event,其它線程等待這個信號,用於主線程控制其它線程執行
event.wait():堵塞線程
evnet.set():標識wei位設未True
event.clear():標識位設未False
event.isSet():判斷標識位是否爲True
import threading def do(event): print("start") event.wait()#紅燈wait等待, 綠燈執行 print("execute") event_obj = threading.Event() for i in range(5): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear()#event默認爲False, 讓燈變紅 inp = input("input:") if inp == "true": event_obj.set()#讓燈變綠執行 #線程執行的時候,若是flag爲False,則線程阻塞,爲True,線程不會阻塞,提供本地和遠程的併發性 # start # start # start # start # start # input:true # execute # execute # execute # execute # execute
threading.Condition: 條件變量condition內部是含有鎖的邏輯,否則沒法保證線程之間同步
import queue#隊列,線程安全,這個模型也叫生產者-消費者模型 import threading message = queue.Queue(10)#數值小於或者等於0,隊列大小沒有限制。 def producer(i):#生產者 print("put:",i) # while True: message.put(i) def consumer(i):#消費者 # while True: msg = message.get() print(msg) for i in range(12): t = threading.Thread(target=producer,args=(i,)) t.start() for i in range(10): t = threading.Thread(target=consumer,args=(i,)) t.start() # put: 0 # put: 1 # put: 2 # put: 3 # put: 4 # put: 5 # put: 6 # put: 7 # put: 8 # put: 9 # put: 10 # put: 11 # 0 # 1 # 2 # 3 # 4 # 5 # 6 # 7 # 8 # 9
get,等
get_nowait,不等
#建立進程 import multiprocessing import time def f1(a1): time.sleep(2) print(a1) if __name__ == "__main__":#windows下運行進程必須加if __name__ == "__main__": t = multiprocessing.Process(target=f1, args=(11,)) # t.daemon = True#默認False, 定義爲True主進程終止所有結束 t.start() t.join()#與線程join相似,主線程等待,子線程執行 t2 = multiprocessing.Process(target=f1, args=(12,)) # t2.daemon = True t2.start() print("end")#主進程
from multiprocessing import Process li = [] def foo(i): li.append(i) print("zc",li) if __name__ == "__main__": for i in range(10): p = Process(target=foo,args=(i,)) p.start() # 每一個進程建立本身的列表,進程之間數據,內存不能共享,先調那個由CPU決定因此結果是無序的 # zc [0] # zc [1] # zc [2] # zc [3] # zc [4] # zc [5] # zc [6] # zc [7] # zc [8] # zc [9]
import threading li = [] def foo(i): li.append(i) print("zc",li) if __name__ == "__main__": for i in range(10): p = threading.Thread(target=foo,args=(i,))#threading.Thread線程內存共享,是共同一個li p.start() # zc [0] # zc [0, 1] # zc [0, 1, 2] # zc [0, 1, 2, 3] # zc [0, 1, 2, 3, 4] # zc [0, 1, 2, 3, 4, 5] # zc [0, 1, 2, 3, 4, 5, 6] # zc [0, 1, 2, 3, 4, 5, 6, 7] # zc [0, 1, 2, 3, 4, 5, 6, 7, 8] # zc [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
from multiprocessing import Process #多進程 Multiprocessing 模塊 def f(name): print("hello",name) if __name__ == "__main__": p = Process(target=f, args=("bob",)) # Process類進程對象,建立子進程的時候,只須要傳入一個執行函數和函數的參數便可完成 #target 函數名,須要調用的函數 #args 函數須要的參數,以 tuple 的形式傳入 p.start()#star() 方法啓動進程 p.join()#join() 方法實現進程間的同步,等待全部進程退出。 # p.close()#阻止多餘的進程涌入進程池 Pool 形成進程阻塞。
import multiprocessing import os def run_proc(name): print('Child process {0} {1} Running '.format(name, os.getpid())) # os.getpid()獲取當前進程id os.getppid()獲取父進程id if __name__ == '__main__': print('Parent process {0} is Running'.format(os.getpid())) for i in range(5): p = multiprocessing.Process(target=run_proc, args=(str(i),)) print('process start') p.start() p.join() print('Process close') # Parent process 27428 is Running # process start # process start # process start # process start # process start # Child process 0 27176 Running # Child process 1 23384 Running # Child process 3 11524 Running # Child process 2 11560 Running # Child process 4 24904 Running # Process close
#進程間內存數據共享方式1 from multiprocessing import Process,Value,Array #Value(內存數據共享),Array(數組,與列表類似) def f(n,a): n.value = 3.1415 for i in range(len(a)): a[i] = -a[i] if __name__ == "__main__": num = Value("d",0.0) arr = Array("i",range(10)) p = Process(target=f,args=(num,arr))#進程1 a = Process(target=f,args=(num,arr))#進程2 p.start() a.start() p.join() a.join() print(num.value) print(arr[:]) # 3.1415 # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 數據共享後負負得正 #進程間內存數據共享方式2 from multiprocessing import Process, Manager def f(d,l): d[l] = "1" d["2"] = 2 d[0.26] = None l.reverse() if __name__ == "__main__": with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f,args=(d,l))#建立進程處理函數裏面的d,l變量 p.start() p.join() print(d) print(l) #{<ListProxy object, typeid 'list' at 0x24626a370b8>: '1', '2': 2, 0.26: None} # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
進程池:
python提供了進程池,Pool
from multiprocessing import Pool import time def f1(a): time.sleep(1) print(a) return 1000 def f2(arg): print(arg)#arg值是f1的返回值 if __name__ == "__main__": Pool = Pool(5)#建立5個進程池 for i in range(40):#5個5個執行 Pool.apply_async(func=f1, args=(i,),callback=f2) #1.每一個任務併發執行,先執行5個當有進程的時候再執行5個.內部沒有join()方法須要定義以下: #2.能夠設置回調函數callback print("1111111111111111") # Pool.apply(func=f1, args=(i,)) #一個一個申請執行,一個執行完才執行下一個,內部有join()方法,不用定義 Pool.close()#執行完後終止 # Pool.terminate()#當即終止 Pool.join()#進程池的join方法一個一個執行,join方法前面必須先定義close,terminate方法
簡單版線程池:
import queue import threading import time class ThreadPool(object): #建立線程池類 def __init__(self, max_num=20): #建立一個最大長度爲20的隊列 self.queue = queue.Queue(max_num) #建立一個隊列 for i in range(max_num): #循環把線程對象加入到隊列中 self.queue.put(threading.Thread) #把線程的類名放進去,執行完這個Queue,20個隊列指向同一個Thread類 def get_thread(self): #定義方法從隊列裏獲取線程 return self.queue.get() #在隊列中獲取值 def add_thread(self): #線程執行完任務後,在隊列裏添加線程 self.queue.put(threading.Thread) def func(pool,a1): time.sleep(1) print(a1) pool.add_thread() #線程執行完任務後,隊列裏再加一個線程 p = ThreadPool(10) #執行init方法; 一次最多執行10個線程 for i in range(100): thread = p.get_thread() #線程池10個線程,每一次循環拿走一個拿到類名,沒有就等待 t = thread(target=func, args=(p, i,)) #建立線程; 線程執行func函數的這個任務;args是給函數傳入參數 t.start() #激活線程 #輸出無序的0-99數 # 對象等於類後面加括號 # 對象是線程
線程池要點:
1,建立線程池時,是在須要執行線程的時候建立線程,而不是建立好最大隊列等待執行
2,建立一個回調函數,檢查出剩餘隊列的任務,當線程執行完函數的時候通知線程池,
3,使用線程池時讓其循環獲取任務,並執行
4,線程池,讓其自行的去激活線程,執行完成後,關閉退出
import queue import threading import time import contextlib StopEvent = object() class ThreadPool(object): def __init__(self, max_num): self.q = queue.Queue() # 最多建立的線程數(線程池最大容量) self.max_num = max_num self.terminal = False #若是爲True 終止全部線程,不在獲取新任務 self.generate_list = [] # 真實建立的線程列表 self.free_list = []# 空閒線程數量 def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數) :return: 若是線程池已經終止,則返回True不然None """ if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() #建立線程 w = (func, args, callback,) #把參數封裝成元祖 self.q.put(w) #添加到任務隊列 def generate_thread(self): """ 建立一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數並執行任務函數 """ current_thread = threading.currentThread # 獲取當前線程 self.generate_list.append(current_thread) #添加到已經建立的線程裏 event = self.q.get() # 取任務並執行 while event != StopEvent: # 是元組=》是任務;若是不爲中止信號 執行任務 func, arguments, callback = event #解開任務包; 分別取出值 try: result = func(*arguments) #運行函數,把結果賦值給result status = True #運行結果是否正常 except Exception as e: status = False #表示運行不正常 result = e #結果爲錯誤信息 if callback is not None: #是否存在回調函數 try: callback(status, result) #執行回調函數 except Exception as e: pass if self.terminal: # 默認爲False,若是調用terminal方法 event = StopEvent #等於全局變量,表示中止信號 else: # self.free_list.append(current_thread) #執行完畢任務,添加到閒置列表 # event = self.q.get() #獲取任務 # self.free_list.remove(current_thread) # 獲取到任務以後,從閒置列表中刪除;不是元組,就不是任務 with self.worker_state(self.free_list, current_thread): event = self.q.get() else: self.generate_list.remove(current_thread) #若是收到終止信號,就從已經建立的線程列表中刪除 def close(self): #終止線程 num = len(self.generate_list) #獲取總共建立的線程數 while num: self.q.put(StopEvent) #添加中止信號,有多少線程添加多少表示終止的信號 num -= 1 def terminate(self): #終止線程(清空隊列) self.terminal = True #把默認的False更改爲True while self.generate_list: #若是有已經建立線程存活 self.q.put(StopEvent) #有幾個線程就發幾個終止信號 self.q.empty() #清空隊列 @contextlib.contextmanager def worker_state(self, state_list, worker_thread): state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) def work(i): print(i) pool = ThreadPool(10) for item in range(50): pool.run(func=work, args=(item,)) # 將任務放在隊列中 # 着手開始處理任務 # - 建立線程 # - 有空閒線程,擇再也不建立線程 # - 不能高於線程池的限制 # - 根據任務個數判斷 # - 線程去隊列中取任務 pool.terminate()
協程:
Python的 greenlet就至關於手動切換,去執行別的子程序,在「別的子程序」中又主動切換回來
greenlet協程例子:
# 協程就是:把線程分塊,不讓線程等待,讓線程遇到IO請求先執行1,或先執行2,或先執行3叫作協程 from greenlet import greenlet # greenlet 其實就是手動切換;gevent是對greenlet的封裝,能夠實現自動切換 # import gevent def test1(): print("123") gr2.switch() # 切換去執行test2 print("456") gr2.switch() # 切換回test2以前執行到的位置,接着執行 def test2(): print("789") gr1.switch() # 切換回test1以前執行到的位置,接着執行 print("666") gr1 = greenlet(test1) # 建立的協程,啓動一個協程 注意test1不要加() gr2 = greenlet(test2) # gr1.switch() # 123 # 789 # 456 # 666
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現協程程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
gevent會主動識別程序內部的IO操做,當子程序遇到IO後,切換到別的子程序。若是全部的子程序都進入IO,則阻塞。
協程之gevent例子:
import gevent def func1(): print("func1 running") gevent.sleep(2) # 內部函數實現io操做 print("switch func1") def func2(): print("func2 running") gevent.sleep(1) print("switch func2") def func3(): print("func3 running") gevent.sleep(0) print("func3 done..") gevent.joinall([gevent.spawn(func1), gevent.spawn(func2), gevent.spawn(func3), ]) # func1 running # func2 running # func3 running # func3 done.. # switch func2 # switch func1
同步與異步性能區別:
同步: 發一個請求須要等待返回, 全部的操做都作完,才返回給用戶結果。即寫完數據庫以後,在響應用戶,用戶體驗很差。使用場景:銀行轉帳,數據庫保存操做
異步: 發一個請求不須要等待返回,不用等全部操做等作完,就響應用戶請求。即先響應用戶請求,而後慢慢去寫數據庫,用戶體驗較好。 使用場景:爲了不短期大量的數據庫操做,就使用緩存機制,也就是消息隊列。先將數據放入消息隊列,而後再慢慢寫入數據庫。
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() # Synchronous: # Task 1 done # Task 2 done # Task 3 done # Task 4 done # Task 5 done # Task 6 done # Task 7 done # Task 8 done # Task 9 done # Asynchronous: # Task 0 done # Task 1 done # Task 2 done # Task 3 done # Task 4 done # Task 5 done # Task 6 done # Task 7 done # Task 8 done # Task 9 done
上面程序的重要部分是將task函數封裝到greenlet內部線程的gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
遇到Io阻塞時會切換任務之【爬蟲版】
from urllib import request import gevent,time from gevent import monkey monkey.patch_all() # 把當前程序中的全部io操做都作上標記
def spider(url): print("GET:%s" % url) resp = request.urlopen(url) data = resp.read() print("%s bytes received from %s.." % (len(data), url)) urls = [ "https://www.python.org/", "https://www.yahoo.com/", "https://github.com/" ] start_time = time.time() for url in urls: spider(url) print("同步耗時:",time.time() - start_time) async_time_start = time.time() gevent.joinall([ gevent.spawn(spider,"https://www.python.org/"), gevent.spawn(spider,"https://www.yahoo.com/"), gevent.spawn(spider,"https://github.com/"), ]) print("異步耗時:",time.time() - async_time_start) # GET:https://www.python.org/ # 48814 bytes received from https://www.python.org/.. # GET:https://www.yahoo.com/ # 492112 bytes received from https://www.yahoo.com/.. # GET:https://github.com/ # 81165 bytes received from https://github.com/.. # 同步耗時: 43.494789600372314 # GET:https://www.python.org/ # GET:https://www.yahoo.com/ # GET:https://github.com/ # 492000 bytes received from https://www.yahoo.com/.. # 59868 bytes received from https://github.com/.. # 48814 bytes received from https://www.python.org/.. # 異步耗時: 21.32669472694397
經過gevent實現【單線程】下的多socket併發
server端:
import sys import socket import time import gevent from gevent import socket, monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(9999)
client端:
import socket HOST = 'localhost' # The remote host PORT = 9999 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) # print(data) print('Received', repr(data)) s.close()
前文所述「子程序(函數)在執行過程當中能夠中斷去執行別的子程序;別的子程序也能夠中斷回來繼續執行以前的子程序」,那麼很容易想到Python的yield,顯然yield是能夠實現這種切換的。
使用yield實現協程操做例子:
def consumer(name): print("要開始啃骨頭了...") while True: print("\033[31;1m[consumer] %s\033[0m " % name) bone = yield print("[%s] 正在啃骨頭 %s" % (name, bone)) def producer(obj1, obj2): obj1.send(None) # 啓動obj1這個生成器,第一次必須用None <==> obj1.__next__() obj2.send(None) # 啓動obj2這個生成器,第一次必須用None <==> obj2.__next__() n = 0 while n < 5: n += 1 print("\033[32;1m[producer]\033[0m 正在生產骨頭 %s" % n) obj1.send(n) obj2.send(n) if __name__ == '__main__': con1 = consumer("消費者A") con2 = consumer("消費者B") producer(con1, con2)