理論:mysql
1.每創造一個進程,默認裏面就有一個線程 2.進程是一個資源單位,而進程裏面的線程纔是CPU上的一個調度單位 3.一個進程裏面的多個線程,是共享這個進程裏面的資源的 4.線程建立的開銷比進程要小,不用申請空間,只要基於如今的進程所在的空間,開一條流水線 就能夠,因此創造線程的速度比創造進程的速度快 5.進程之間更多的是競爭關係,由於他們彼此之間是互相隔離的,而同一個進程的線程之間是合做關係 線程與進程的區別 1.同一進程內的線程共享建立它們的進程的地址空間,也就是同一進程內的多個線程共享資源,進程擁有本身的地址空間,也就是說父進程 和子進程是徹底獨立的地址空間 2.線程能夠直接訪問進程的數據。在Linux系統下,主進程造一個子進程,子進程會把父進程的狀態完整的拷貝一份看成子進程的初始狀態,可是當子進程 在運行過程當中再產生的數據或者把數據更改了就和父進程無關了 3.同一進程的線程能夠相互通訊。進程彼此之間內存空間是相互隔離的,若通訊須要找一塊共享的內存空間,共享意味着競爭, 因此須要加鎖處理,那麼就須要尋找既是共享的內存空間,並且還自動處理了鎖,使用隊列。隊列就是ipc機制的一種進程之間 通訊的方式,與它相相似的還有管道,只不過管道須要本身加鎖處理,因此仍是使用隊列更方便。 線程是沒有必要使用ipc機制的,由於默認就是共享同一進程的內存空間,但存在競爭的問題,因此只能加鎖,使用線程本身的隊列 4.同等資源狀況下,能開的線程數量多於開的進程數量,線程開銷小,建立速度快,意味着能建立更多線程
開啓線程的兩種方式sql
1.多線程
from threading import Thread def task(): print('is running') if __name__=='__main__': t=Thread(target=task,) t.start() print('主') #is running #主 t.start() 發信號給操做系統,可是是基於當前進程已經有了空間的基礎之上直接開線程 就能夠了,當開始運行第一行代碼的時候,進程就已經產生了,等到運行t.stat()的時候, 進程的空間早就開啓了好長時間了,因此start的時候不用申請空間了,直接開一個流水線 就行了,開銷小,因此就先看到'is running' from multiprocessing import Process def task(): print('is running') if __name__ == '__main__': t=Process(target=task,) t.start() print('主') #主 #is running 開進程的開銷大,要拷貝父進程的狀態,須要的時間長,在 t.start() 給操做系統發出 申請後,操做系統要申請空間把這個進程造出來,還要再造一個線程,在這段時間內, print('主')已經執行了。子進程造出來後就打印 is running
2.併發
from threading import Thread class MyThread(Thread): def run(self): print('is running') if __name__ == '__main__': t=MyThread() t.start() print('主') 若是要傳參數 from threading import Thread class MyThread(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s is running' % self.name) if __name__ == '__main__': t=MyThread('egon') t.start() print('主')
線程與進程app
from threading import Thread from multiprocessing import Process import os def task(): print('%s is running' % os.getpid()) if __name__=='__main__': t1=Thread(target=task,) t2=Thread(target=task,) t1.start() t2.start() print('主',os.getpid()) #1376 is running #1376 is running #主 1376 線程和主線程看到的pid都是同樣的,由於這多個線程都是在一個進程裏面 from threading import Thread from multiprocessing import Process import os def task(): print('%s is running' % os.getpid()) if __name__=='__main__': t1=Process(target=task,) t2=Process(target=task,) t1.start() t2.start() print('主',os.getpid()) #主 4136 主進程pid #5588 is running 子進程pid #6532 is running 子進程pid
多線程共享同一個進程內的資源dom
from threading import Thread from multiprocessing import Process n=100 def work(): global n n=0 if __name__=='__main__': p=Process(target=work,) p.start() p.join() print('主',n) #主 100 主進程看n,主進程的n沒有被改過 在開子進程的時候,數據會被拷貝到子進程,改全局變量是改的子進程的全局變量, 子進程的n改成0,可是主進程的n仍然是100 子進程與主進程是徹底獨立的內存空間 from threading import Thread from multiprocessing import Process n=100 def work(): global n n=0 if __name__=='__main__': t=Thread(target=work,) t.start() t.join() print('主',n) # 主 0 線程是共享同一個進程的地址空間,改全局變量的n,這個n就來自進程的n,直接就改掉了
多線程共享同一進程內地址空間練習異步
from threading import Thread msg_l=[] format_l=[] def talk(): '''用戶輸入後添加到列表''' while True: msg=input('>>: ').strip() msg_l.append(msg) def format(): '''彈出數據而且改成大寫後添加到新列表''' while True: if msg_l: data=msg_l.pop() format_l.append(data.upper()) def save(): while True: if format_l: # 若是有數據 data=format_l.pop() # 數據拿出來後保存到文件中 with open('db.txt','a') as f: f.write('%s\n'%data) #由於中間須要共享數據因此須要多線程 if __name__=='__main__': t1=Thread(target=talk,) t2=Thread(target=format,) t3=Thread(target=save,) t1.start() t2.start() t3.start()
因此只要是涉及到共享數據的多個併發任務能夠用多線程實現socket
Thread對象其餘相關的屬性或方法ide
from threading import Thread def talk(): print('is running') if __name__=='__main__': t=Thread(target=task,) t.start() t.join() # 主進程等待子線程執行完 print(t.is_alive()) # 判斷線程是否存活 print('主') #is running #False #主
from threading import Thread def talk(): print('is running') if __name__=='__main__': t=Thread(target=task,) t.start() print(t.is_alive()) print('主') print(t.is_alive()) #is running #True #主 #True
from threading import Thread def talk(): print('is running') if __name__=='__main__': t=Thread(target=task,) t.start() print(t.is_alive()) print(t.getName()) print('主') print(t.is_alive()) #is runnning #False #Thread-1 #主 #False
因此在沒有join方法的狀況下,True和False是說不許的,
取決於操做系統何時回收它,它才何時會死掉函數
from threading import Thread,activeCount def talk(): print('is running') if __name__=='__main__': t=Thread(target=task,) t.start() print('主') print(activeCount()) #is running #主 #2 -----> 活着的線程數,一個主線程,和主線程開啓的線程
from threading import Thread,activeCount,enumerate def talk(): print('is running') if __name__=='__main__': t=Thread(target=task,) t.start() print('主') print(activeCount()) print(enumerate()) # --->顯示當前活躍的線程對象 #is running #主 #1 #[<_MainThread(MainThread, started 5588)>]
from threading import Thread,activeCount,enumerate import time def talk(): print('is running') time.sleep(2) # 保證2s內線程死不掉 if __name__=='__main__': t=Thread(target=task,) t.start() print(enumerate()) print('主') #is running #[<_MainThread(MainThread, started 1060)>, <Thread(Thread-1, start 4496)>] #主 一個主線程和一個Thread-1線程
#加入一個join方法 from threading import Thread,activeCount,enumerate import time def talk(): print('is running') time.sleep(2) # 保證2s內線程死不掉 if __name__=='__main__': t=Thread(target=task,) t.start() t.join() print(enumerate()) print('主') #is running #[<_MainThread(MainThread, started 6172)>] #主 只有主線程
from threading import Thread,activeCount,enumerate,current_thread import time def talk(): print('%s is running'%current_thread().getName()) # 當前的線程對象 time.sleep(2) if __name__=='__main__': t=Thread(target=task,) t.start() t.join() print('主') #Thread-1 is running
驗證開一個進程默認就有一個主線程
from threading import Thread,current_thread
from multiprocessing import Process
import time
print(current_thread())
#<_MainThread(MainThread, started 6192)>
右鍵一運行就會產生一個進程,進程不是一個執行單位,只是一個資源單位
主進程執行實際上是主進程中的主線程在執行,因此談到執行必定往線程上靠
from threading import Thread,current_thread from multiprocessing import Process import time def task(): print('%s is running'% current_thread().getName()) # 子進程的Main_Thread time.sleep(2) if __name__=='__main__': p=Process(target=task,) # 這個進程中的主線程開始執行代碼了 p.start() # 開一個子進程,裏面還有一個主線程 print(current_thread()) # 父進程的主線程 #<_MainThread(MainThread, started 5056)> #MainThread is running
主線程從執行層面表明了其所在進程的執行過程
from threading import Thread,current_thread from multiprocessing import Process import time def task(): print('%s is running'% current_thread().getName()) time.sleep(2) if __name__=='__main__': t1=Thread(target=task,) t2=Thread(target=task,) t3=Thread(target=task,) t1.start() t2.start() t3.start() print(current_thread().getName()) #Thread-1 is running #Thread-2 is running #Thread-3 is running #MainThread 在一個進程裏面,主線程只有一個,其他的都是它開啓的一些線程
守護線程
主線程掛掉,守護線程也會掛掉
#先看守護進程 from multiprocessing import Process import time def task(): print('123') time.sleep(2) print('123done') if __name__ == '__main__': p=Process(target=task,) p.start() print('主') #主 #123 #123done
主進程即便運行完了也要一直等待子進程運行完畢才結束掉
from multiprocessing import Process import time def task(): print('123') time.sleep(2) print('123done') if __name__ == '__main__': p=Process(target=task,) p.daemon=True p.start() print('主') #主
只要主進程運行完畢守護進程就死掉,那麼主怎麼算運行完畢,代碼運行完了就算完了
'主'出來,子進程還沒來得及開啓就已經被幹掉了
from multiprocessing import Process import time def task1(): print('123') time.sleep(1) print('123done') def task2(): print('456') time.sleep(10) print('456done') if __name__ == '__main__': p1=Process(target=task1,) p2=Process(target=task2,) p1.daemon=True p1.start() p2.start() print('主') #主 #456 #456done
10s的話 '123''123done'應該也出來了,爲何沒有出來,
由於主進程代碼結束就把p1幹掉了,尚未來得及開啓,
雖然代碼運行完畢了,可是還要等待子進程p2運行完畢
若是機器的性能很是高,在p1.start()h和p2.start()的時候就已經
運行起來了,有可能'123'就打印在屏幕上了,但1s的時間對於操做系統
已經足夠長了,足夠能夠打印出'主',而後p1就被幹掉了
對主進程來講,運行完畢指的是主進程代碼運行完畢
對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
from threading import Thread import time def task1(): print('123') time.sleep(1) print('123done') if __name__ == '__main__': t=Thread(target=task1,) t.start() print('主') #123 #主 #123done
from threading import Thread import time def task1(): print('123') time.sleep(1) print('123done') if __name__ == '__main__': t=Thread(target=task1,) t.daemon=True t.start() print('主') #123 #主
from threading import Thread import time def task1(): print('123') time.sleep(1) print('123done') def task2(): print('456') time.sleep(1) print('456done') if __name__ == '__main__': t1=Thread(target=task1,) t2=Thread(target=task2,) t1.daemon=True t1.start() t2.start() print('主') #123 #456 #主 #123done #456done
進程中除了有主線程,還有其餘非守護線程,主線程要等着非守護線程task2結束,要等
10s,可是10s也夠task1運行完畢了,因此也會打印'123done'
from threading import Thread import time def task1(): print('123') time.sleep(10) print('123done') def task2(): print('456') time.sleep(1) print('456done') if __name__ == '__main__': t1=Thread(target=task1,) t2=Thread(target=task2,) t1.daemon=True t1.start() t2.start() print('主') #123 #456 #主 #456done
線程的互斥鎖
1 from threading import Thread,Lock 2 import time 3 4 n=100 5 def work(): 6 global n 7 mutex.acquire() 8 temp=n 9 time.sleep(0.1) 10 n=temp-1 11 mutex.release() 12 13 if __name__ == '__main__': 14 mutex=Lock() 15 l=[] 16 start=time.time() 17 for i in range(100): 18 t=Thread(target=work,) 19 l.append(t) 20 t.start() 21 22 for t in l: 23 t.join() 24 print('run time:%s value:%s'%(time.time()-start,n))
互斥鎖與join的區別
from threading import Thread,Lock import time n=100 def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': start=time.time() for i in range(100): t=Thread(target=work,) l.append(t) t.start() t.join() print('run time:%s value:%s'%(time.time()-start,n))
死鎖與遞歸鎖
#死鎖 from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('搶到了A鎖'%self.name) mutexB.acquire() print('搶到了B鎖'%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('搶到了B鎖'%self.name) time.sleep(1) mutexA.acquire() print('搶到了A鎖'%self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start() #Thread-1 搶到了A鎖 #Thread-1 搶到了B鎖 #Thread-1 搶到了B鎖 #Thread-2 搶到了A鎖
#遞歸鎖 from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('搶到了A鎖'%self.name) mutexB.acquire() print('搶到了B鎖'%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('搶到了B鎖'%self.name) time.sleep(1) mutexA.acquire() print('搶到了A鎖'%self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(20): t=Mythread() t.start()
IO模型介紹
同步,異步指的是提交任務或調用任務的方式
阻塞指的是線程的執行狀態
1.等待數據的準備
2.將數據從內核拷貝到進程中
1.阻塞IO 阻塞io服務端 from socket import * server=socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) while True: conn,addr = server.accept() print(addr) while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except Exception: break conn.close() server.close()
非阻塞IO 服務端 from socket import * server=socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) server.setblocking(False) # 默認爲True, 改成False就是非阻塞 import time conns=[] del_l=[] while True: try: print(conns) conn,addr = server.accept() conns.append(conn) except BlockingIOError: for conn in conns: try: data=conn.recv(1024) conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: conn.close() del_l.append(conn) for conn in del_l: conns.remove(conn) del_l=[] 客戶端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8'))
IO多路複用(推薦) 實現select IO多路複用模型 服務端 from socket import * import select import time server=socket(AF_INET,SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) server.setblocking(False) reads=[server,] while True: rl,_,_=select.select(reads,[],[]) for obj in rl: if obj == server: conn,addr=obj,accept() reads.append(conn) else: try: data=obj.recv(1024) if not data: obj.close() reads.remove(obj) continue obj.send(data.upper()) except Exception: obj.close() reads.remove(obj)
協程
在單線程下實現併發
1.基於yield實現併發 import time def consumer(): while True: res=yield def producer(): g=consumer() next(g) for i in range(1000000): g.send(i) start=time.time() producer() print(time.time()-start) 沒有遇到io就亂切換,只是單純的切換,反而會下降運行效率
import time def consumer(res): print('consumer') time.sleep(10) def producer(): res=[] for i in range(1000000): res.append(i) return res start=time.time() res=producer() consumer(res) print(time.time()-start)
import time def consumer(): while True: res=yield print('consumer',res) time.sleep(10) def producer(): g=consumer() next(g) for i in range(10000): print('producer',i) g.send(i) start=time.time() producer() print(time.timet()-start) yield只是單純的切換,跟效率無關
greenlet模塊
只是單純意義上的切換,惟一的好處是切換起來比yield方便,仍然沒有解決遇到IO就切換
from greenlet import greenlet import time def eat(name): print('%s eat 1'% name) g2.switch('tom') # 暫停而後切到play() print('%s eat 2'% name) g2.switch() def play(name): print('%s play 1'% name) time.sleep(10) # 睡的時候並無切換到別的函數,如eat(),而是繼續等待 g1.switch() # 第一次傳參數就能夠了。暫停而後切換到eat()剩餘的部分 print('%s play 2'% name) g1=greenlet(eat) g2=greenlet(play) g1.switch('tom') # 切到eat() # tom eat 1 # tom play 1 # tom eat 2 # tom play 2
gevent模塊
import gevent def eat(name): print('%s eat 1' % name) gevent.sleep(3) print('%s eat 2' % name) def play(name): print('%s play 1' % name) gevent.sleep(2) print('%s play 2' % name) g1=gevent.spawn(eat, 'tom') # spawn() 是異步提交任務,只管提交任務,無論執行沒執行 g2=gevent.spawn(play, 'tom') # 想要看到執行過程,就須要等(join()) gevent.joinall([g1,g2]) # tom eat 1 # tom play 1 # tom play 2 先睡完(2s),因此就是play 2打印 # tom eat 2
這個函數的io是gevent.sleep()模擬的,若是是time.sleep()呢?,time.sleep()是不能被gevent識別的
from gevent import monkey;monkey.patch_all() #把這句代碼之下的全部io操做都打上能被gevent識別的io操做的補丁,不然 import gevent #在用time.sleep()時,就會串行運行 import time def eat(name): print('%s eat 1' % name) time.sleep(1) print('%s eat 2' % name) def play(name): print('%s play 1' % name) time.sleep(2) print('%s play 2' % name) g1=gevent.spawn(eat, 'tom') g2=gevent.spawn(play, 'tom') gevent.joinall([g1,g2]) #tom eat 1 #tom play 1 #tom eat 2 #tom play 2
這就是單線程下的併發,也就是協程了,協程是用戶程序本身控制調度的,操做系統是看不到的,咱們經過gevent模塊把io操做
隱藏了起來。協程的切換開銷更小。
基於協程實現併發的套接字通訊
#客戶端 from socket import * client=socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
#服務端 from socket import * import gevent def server(server_ip, port): s=socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.bind((server_ip, port)) s.listen(5) while True: conn,addr = s.accept() gevent.spawn(talk,conn,addr) def talk(conn, addr): try: while True: res = conn.recv(1024) print('client %s:%s msg:%s'% (addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1', 8080)
#客戶端 from threading import Thread from socket import * def client(): client=socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 8080)) while True: client.send('hello', encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8')) if __name__ == '__main__': for i in range(500): t=Thread(target=client,) t.start()
協程的缺點:
本質上是單線程,沒法利用多核,因此若是想把程序最大效率的提高,就應該把
程序的io操做最大限度地降到最低。
協程指的是單個線程,一旦協程出現阻塞,將會阻塞整個線程
進程池和線程池
concurrent.futures模塊提供了進程池和線程池,而且提供了更高級別的接口,
爲的是異步執行調用
#進程池 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os def work(n): print('%s is running'%os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': p=ProcessPoolExecutor() #默認開4個進程 objs=[] for i in range(10): obj=p.submit(work, i) objs.append(obj) p.shutdown() for obj in objs: print(obj.result())
#線程池 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os from threading import current_thread def work(n): print('%s is running'%current_thread().getName()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': p=ThreadPoolExecutor() #默認爲cpu的個數*5 objs=[] for i in range(21): obj=p.submit(work, i) objs.append(obj) p.shutdown() for obj in objs: print(obj.result())
事件Event
from threading import Thread,current_thread,Event import time event=Event() def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise ConnectionError('鏈接失敗') print('%s 等待第%s次連接mysql'%(current_thread().getName(),count)) event.wait(0.5) #全局變量默認爲False,在這裏等變爲True,超時時間一過就再也不等待 count+=1 print('%s 連接ok' % current_thread().getName()) def check_mysql(): print('%s 正在檢查mysql狀態' % current_thread().getName()) time.sleep(1) event.set() #把全局變量變爲True if __name__ == '__main__': t1 = Thread(target=conn_mysql) t2 = Thread(target=conn_mysql) check = Thread(target=check_mysql) t1.start() t2.start() check.start()
定時器
from threading import Timer def hello(n): print('hello,world',n) t = Timer(3, hello,args=(11,)) # 3s後運行 t.start() #hello,world 11
線程queue(瞭解)
import queue q=queue.Queue(3) #模擬對列,先進先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) #1 #2 #3
import queue q=queue.LifoQueue(3) # 模擬堆棧,後進先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) #3 #2 #1
import queue q=queue.PriorityQueue(3) #數字越小,優先級越高 q.put((10, 'data1')) # (優先級,數據) q.put((11, 'data2')) q.put((9, 'data3')) print(q.get()) print(q.get()) print(q.get()) #(9, 'data3') #(10, 'data1') #(11, 'data2')