因爲線程是cpu工做的最小單元,建立線程能夠利用多核優點實現並行操做(Java/C#)。python
注意:線程是爲了工做。react
進程和進程之間作數據隔離(Java/C#)。注意:進程是爲了提供環境讓線程工做。git
併發,僞,因爲執行速度特別塊,人感受不到停頓。程序員
並行,真,建立10我的同時操做github
print("666")web
Python本身沒有這玩意,Python中調用的操做系統的線程和進程。安全
一個應用程序(軟件),能夠有多個進程(默認只有一個),一個進程中能夠建立多個線程(默認一個)。多線程
import threading def func(s,arg): print(arg) t1 = threading.Thread(target=func,args=(5,)) t1.start() t2 = threading.Thread(target=func,args=(88,)) t2.start() print("666")
GIL鎖,全局解釋器鎖。用於限制一個進程中同一時刻只有一個線程被cpu調度。併發
擴展:默認GIL鎖在執行100個cpu指令(過時時間)。app
aPython中存在一個GIL鎖
形成:多線程沒法利用多核優點
解決:開多進程處理(浪費資源)
總結:
IO密集型:多線程
計算密集型:多進程
- Thread
# 多線程方式1(常見) def func(arg): print(arg) t1 = threading.Thread(target=func,args=(888,)) t1.start() t2 = threading.Thread(target=func,args=(777,)) t2.start() print("666")
- MyThread
# 多線程方式2 (面向對象) class MyThread(threading.Thread): def run(self): print(self._args,self._kwargs) t1 = MyThread(args=(666,555,444)) t1.start() t2 = MyThread(args=(111,222,333)) t2.start() print("hahaha")
- join
# 開發者能夠控制主線程等待子線程(最多等待時間) def func(arg): time.sleep(10) print(arg) t1 = threading.Thread(target=func,args=(5,)) t1.start() # 無參數,讓主線程在這裏等着,等到子線程t1執行完畢,才能夠繼續往下走。 # 有參數,讓主線程在這裏最多等待n秒,不管是否執行完畢,會繼續往下走。 t1.join(1) t2 = threading.Thread(target=func,args=(88,)) t2.start() t1.join(1) print("666")
- setDeanon
# 主線程再也不等,主線程終止則全部子線程終止 def func(arg): time.sleep(2) print(arg) t1 = threading.Thread(target=func,args=(5,)) t1.setDaemon(True) t1.start() t2 = threading.Thread(target=func,args=(88,)) t2.setDaemon(True) t2.start() print("666")
- setName
- threading.current_thread()
# 線程名稱 def func(arg): # 獲取當前執行該函數的線程的對象 t = threading.current_thread() # 根據當前線程對象獲取當前線程名稱 name = t.getName() print(t,name,arg) t1 = threading.Thread(target=func,args=(5,)) t1.setName("qwer") # 線程命名 t1.start()
t2 = threading.Thread(target=func,args=(88,)) t2.setName("asdf") # 線程命名 t2.start()
print("666")
-得到
lock.acquire() # 加鎖,此區域的代碼同一時刻只能有一個線程執行
-釋放
lock.release() # 釋放鎖
1. 鎖:Lock (1次放1個)
線程安全,多線程操做時,內部會讓全部線程排隊處理。如:list/dict/Queue
線程不安全 + 人 => 排隊處理。
鎖一個代碼塊:
#Lock鎖 1次放1個 v = [] lock = threading.Lock() def func(arg): lock.acquire() # 加鎖 v.append(arg) time.sleep(0.01) m= v[-1] print(arg,m) lock.release() # 解鎖 for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start()
2. 鎖:RLock (1次放1個)
#RLock鎖 1次放1個 v = [] lock = threading.RLock() def func(arg): lock.acquire() lock.acquire() v.append(arg) time.sleep(0.01) m = v[-1] print(arg,m) lock.release() lock.release() for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start()
3. 鎖:BoundedSemaphore(1次放N個)信號量
#semaphore鎖 1次放n個 lock = threading.BoundedSemaphore(3) def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(20): t = threading.Thread(target=func,args=(i,)) t.start()
4. 鎖:Condition(1次方法x個)
#Condition鎖 (1次放x個) lock = threading.Condition() def func(arg): print("線程進來了") lock.acquire() lock.wait() # 加鎖 print(arg) time.sleep(1) lock.release() for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start() while True: inp = int(input(">>>")) lock.acquire() lock.notify(inp) lock.release()
5. 鎖:Event(1次放全部)
#Event鎖 1次放全部 lock = threading.Event() def func(arg): print("線程進來了") lock.wait() # 加鎖,紅燈 print(arg) for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start() input("1>>>") lock.set() # 綠燈 lock.clear() # 再次變紅燈 for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start() input("2>>>") lock.set()
總結:
線程安全,列表和字典線程安全;
爲何要加鎖?
非線程安全
控制一段代碼
6. threading.local
做用:
內部自動爲每一個線程維護一個空間(字典),用於當前存取屬於本身的值。保證線程之間的數據隔離。
import time import threading DATA_DICT = {} # # threading.local 內部自動爲每一個線程維護一個空間(字典),用於當前存取屬於本身的值。保證線程之間的數據隔離。 def func(arg): ident = threading.get_ident() DATA_DICT[ident] = arg time.sleep(1) print(DATA_DICT[ident],arg) for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start()
內部原理
INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 調用__setattr__方法 time.sleep(2) print(obj.phone,arg) for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start()
1. 進程
- 進程間數據不共享
data_list = [] def task(arg): data_list.append(arg) print(data_list) def run(): for i in range(1,10): p = multiprocessing.Process(target=task,args=(i,5)) # p = threading.Thread(target=task,args=(i,4)) p.start() p.join() if __name__ == '__main__': run()
2.經常使用功能:
- join- deamon
- name
- multiprocessing.current_process()
- multiprocessing.current_process().ident/pid
3. 類繼承方式建立進程
class MyProcess(multiprocessing.Process): def run(self): print('當前進程',multiprocessing.current_process()) def run(): p1 = MyProcess() p1.start() p2 = MyProcess() p2.start() if __name__ == '__main__': run()
4.進程鎖
import time import threading import multiprocessing lock = multiprocessing.RLock() def task(arg): print('鬼子來了') lock.acquire() time.sleep(2) print(arg) lock.release() if __name__ == '__main__': p1 = multiprocessing.Process(target=task,args=(1,)) p1.start() p2 = multiprocessing.Process(target=task, args=(2,)) p2.start()
5.進程池
import time from concurrent.futures import ProcessPoolExecutor def task(arg): time.sleep(2) print(arg) if __name__ == '__main__': pool = ProcessPoolExecutor(5) for i in range(10): pool.submit(task,i)
進程和線程的區別?
第一:
進程是cpu資源分配的最小單元。
線程是cpu計算的最小單元。
第二:
一個進程中能夠有多個線程。
第三:
對於Python來講他的進程和線程和其餘語言有差別,是有GIL鎖。
GIL鎖保證一個進程中同一時刻只有一個線程被cpu調度。
注意:IO密集型操做可使用多線程;計算密集型可使用多進程
概念:
進程,操做系統中存在
線程,操做系統中存在
協程,是由程序員創造出來的一個不是真實存在的東西
協程:
是微線程,對一個線程進程分片,使得線程在代碼塊之間進行來回切換執行,而不是在原來逐行執行。
協程 + 遇到 IO 就切換 >>>gevent
from gevent import monkey monkey.patch_all() # 之後代碼中遇到IO都會自動執行greenlet的switch進行切換 import requests import gevent def get_page1(url): ret = requests.get(url) print(url,ret.content) def get_page2(url): ret = requests.get(url) print(url,ret.content) def get_page3(url): ret = requests.get(url) print(url,ret.content) gevent.joinall([ gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1 gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 協程2 gevent.spawn(get_page3, 'https://github.com/'), # 協程3 ])
>>>twisted
from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done) reactor.run()
IO多路複用做用:
檢測多個socket是否已經發生變化(是否已經鏈接成功/是否已經獲取數據)(可讀/可寫)
操做系統檢測socket是否發生變化,有三種模式:
select:最多1024個socket;循環去檢測。
poll:不限制監聽socket個數;循環去檢測(水平觸發)。
epoll:不限制監聽socket個數;回調方式(邊緣觸發)。
基於IO多路複用+socket實現併發請求(一個線程100個請求)
IO多路複用
socket非阻塞
# by luffycity.com import socket import select client1 = socket.socket() client1.setblocking(False) # 百度建立鏈接: 非阻塞 try: client1.connect(('www.baidu.com',80)) except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 百度建立鏈接: 非阻塞 try: client2.connect(('www.sogou.com',80)) except BlockingIOError as e: pass client3 = socket.socket() client3.setblocking(False) # 百度建立鏈接: 非阻塞 try: client3.connect(('www.oldboyedu.com',80)) except BlockingIOError as e: pass socket_list = [client1,client2,client3] conn_list = [client1,client2,client3] while True: rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已經鏈接成功的socket對象 for sk in wlist: if sk == client1: sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') elif sk==client2: sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n') else: sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.oldboyedu.com\r\n\r\n') conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) print('------------>',body) sk.close() socket_list.remove(sk) if not socket_list: break
基於事件循環實現的異步非阻塞框架:
非阻塞:不等待
異步:執行完某我的物後自動調用我給他的函數。
1.什麼是異步非阻塞?
- 非阻塞,不等待。好比建立socket對某個地址進行connect、獲取接收數據recv時默認都會等待(鏈接成功或接收到數據),才執行後續操做。
若是設置setblocking(False),以上兩個過程就再也不等待,可是會報BlockingIOError的錯誤,只要捕獲便可。
- 異步,通知,執行完成以後自動執行回調函數或自動執行某些操做(通知)。好比作爬蟲中向某個地址baidu.com發送請求,當請求執行完成以後自執行回調函數。
2.什麼是同步阻塞?
- 阻塞:等
- 同步:按照順序逐步執行