import socket import requests # 方式一 ret = requests.get('https://www.baidu.com/s?wd=alex') # 方式二 client = socket.socket() # 百度建立鏈接: 阻塞 client.connect(('www.baidu.com',80)) # 問百度我要什麼? client.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # 我等着接收百度給個人回覆 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b''.join(chunk_list) print(body.decode('utf-8'))
# by luffycity.com import socket import requests # #################### 解決併發:單線程 #################### # 方式一 key_list = ['alex','db','sb'] for item in key_list: ret = requests.get('https://www.baidu.com/s?wd=%s' %item) # 方式二 def get_data(key): # 方式二 client = socket.socket() # 百度建立鏈接: 阻塞 client.connect(('www.baidu.com',80)) # 問百度我要什麼? client.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # 我等着接收百度給個人回覆 chunk_list = [] while True: chunk = client.recv(8096) if not chunk: break chunk_list.append(chunk) body = b''.join(chunk_list) print(body.decode('utf-8')) key_list = ['alex','db','sb'] for item in key_list: get_data(item) # #################### 解決併發:多線程 #################### import threading key_list = ['alex','db','sb'] for item in key_list: t = threading.Thread(target=get_data,args=(item,)) t.start() # #################### 解決併發:單線程+IO不等待 #################### # IO請求? # 數據回來了?
# by luffycity.com import socket client = socket.socket() client.setblocking(False) # 將原來阻塞的位置變成非阻塞(報錯) # 百度建立鏈接: 阻塞 try: client.connect(('www.baidu.com',80)) # 執行了但報錯了 except BlockingIOError as e: pass # 檢測到已經鏈接成功 # 問百度我要什麼? client.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # 我等着接收百度給個人回覆 chunk_list = [] while True: chunk = client.recv(8096) # 將原來阻塞的位置變成非阻塞(報錯) if not chunk: break chunk_list.append(chunk) body = b''.join(chunk_list) print(body.decode('utf-8'))
一、IO多路複用做用:python
檢測多個socket是否已經發生變化(是否已經鏈接成功/是否已經獲取數據)(可讀/可寫)react
能夠監聽全部的IO請求狀態git
client.setblocking(False) # 將原來阻塞的位置變成非阻塞(報錯)
基於事件循環實現的異步非阻塞框架:lzl 非阻塞:不等待 異步:執行完某我的物後自動調用我給他的函數。 Python中開源 基於事件循環實現的異步非阻塞框架 Twisted
三、單線程的併發程序員
基於IO多路複用+socket實現併發請求(一個線程100個請求)github
IO多路複用
socket非阻塞
import select #利用此模塊 能夠檢測數據是否拿到 rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # socket_list 檢測服務端是否給我返回數據 # ,conn_list 檢測是否鏈接成功
import socket import select client1 = socket.socket() client1.setblocking(False) # 百度建立鏈接: 非阻塞 try: client1.connect(('www.baidu.com',80)) except BlockingIOError as e: pass client2 = socket.socket() client2.setblocking(False) # 百度建立鏈接: 非阻塞 try: client2.connect(('www.sogou.com',80)) except BlockingIOError as e: pass client3 = socket.socket() client3.setblocking(False) # 百度建立鏈接: 非阻塞 try: client3.connect(('www.oldboyedu.com',80)) except BlockingIOError as e: pass socket_list = [client1,client2,client3] conn_list = [client1,client2,client3] while True: rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # wlist中表示已經鏈接成功的socket對象 for sk in wlist: if sk == client1: sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') elif sk==client2: sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n') else: sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.oldboyedu.com\r\n\r\n') conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) print('------------>',body) sk.close() socket_list.remove(sk) if not socket_list: break
基於事件循環實現的異步非阻塞框架:Twisted
非阻塞:不等待
異步:執行完某我的物後自動調用我給他的函數。
Python中開源 基於事件循環實現的異步非阻塞框架 Twisted
# by luffycity.com import socket import select class Req(object): def __init__(self,sk,func): self.sock = sk self.func = func def fileno(self): return self.sock.fileno() class Nb(object): def __init__(self): self.conn_list = [] self.socket_list = [] def add(self,url,func): client = socket.socket() client.setblocking(False) # 非阻塞 try: client.connect((url, 80)) except BlockingIOError as e: pass obj = Req(client,func) self.conn_list.append(obj) self.socket_list.append(obj) def run(self): while True: rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005) # wlist中表示已經鏈接成功的req對象 for sk in wlist: # 發生變換的req對象 sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') self.conn_list.remove(sk) for sk in rlist: chunk_list = [] while True: try: chunk = sk.sock.recv(8096) if not chunk: break chunk_list.append(chunk) except BlockingIOError as e: break body = b''.join(chunk_list) # print(body.decode('utf-8')) sk.func(body) sk.sock.close() self.socket_list.remove(sk) if not self.socket_list: break def baidu_repsonse(body): print('百度下載結果:',body) def sogou_repsonse(body): print('搜狗下載結果:', body) def oldboyedu_repsonse(body): print('老男孩下載結果:', body) t1 = Nb() t1.add('www.baidu.com',baidu_repsonse) t1.add('www.sogou.com',sogou_repsonse) t1.add('www.oldboyedu.com',oldboyedu_repsonse) t1.run() # # # # # # # # # # # # # # # # # # client1 = socket.socket() # client1.setblocking(False) # 百度建立鏈接: 非阻塞 # # try: # client1.connect(('www.baidu.com',80)) # except BlockingIOError as e: # pass # # # client2 = socket.socket() # client2.setblocking(False) # 百度建立鏈接: 非阻塞 # try: # client2.connect(('www.sogou.com',80)) # except BlockingIOError as e: # pass # # # client3 = socket.socket() # client3.setblocking(False) # 百度建立鏈接: 非阻塞 # try: # client3.connect(('www.oldboyedu.com',80)) # except BlockingIOError as e: # pass # # class Foo(object): # def __init__(self,sk): # self.sk = sk # # def fileno(self): # return self.sk.fileno() # # """ # 1. select.select(socket_list,conn_list,[],0.005) # select監聽的 socket_list/conn_list 內部會調用列表中每個值的fileno方法,獲取該返回值並去系統中檢測。 # # 2. 方式一: # select.select([client1,client2,client3],[client1,client2,client3],[],0.005) # 3. 方式二: # select.select([Foo(client1),Foo(client2),(client3)],Foo(client1),Foo(client2),(client3),[],0.005) # """ # socket_list = [Foo(client1),client2,client3] # client1.fileno # conn_list = [client1,client2,client3] # # while True: # rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005) # # wlist中表示已經鏈接成功的socket對象 # for sk in wlist: # if sk == client1: # sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n') # elif sk==client2: # sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n') # else: # sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.oldboyedu.com\r\n\r\n') # conn_list.remove(sk) # for sk in rlist: # chunk_list = [] # while True: # try: # chunk = sk.recv(8096) # if not chunk: # break # chunk_list.append(chunk) # except BlockingIOError as e: # break # body = b''.join(chunk_list) # # print(body.decode('utf-8')) # print('------------>',body) # sk.close() # socket_list.remove(sk) # if not socket_list: # break
總結: 1. socket默認是不是阻塞的?阻塞體如今哪裏?
是阻塞的, 連接的時候 accept recv 2. 如何讓socket編程非阻塞?
.setblocking(Flase) 3. IO多路複用做用? 檢測多個socket是否發生變化。 操做系統檢測socket是否發生變化,有三種模式: select:最多1024個socket;循環去檢測。 poll:不限制監聽socket個數;循環去檢測(水平觸發)。 epoll:不限制監聽socket個數;回調方式(邊緣觸發)。 Python模塊: select.select select.epoll 4. 提升併發方案: - 多進程 - 多線程 - 異步非阻塞模塊(Twisted) scrapy框架(單線程完成併發) 5. 什麼是異步非阻塞? - 非阻塞,不等待。 好比建立socket對某個地址進行connect、獲取接收數據recv時默認都會等待(鏈接成功或接收到數據),才執行後續操做。 若是設置setblocking(False),以上兩個過程就再也不等待,可是會報BlockingIOError的錯誤,只要捕獲便可。 - 異步,通知,執行完成以後自動執行回調函數或自動執行某些操做(通知)。 好比作爬蟲中向某個地址baidu.com發送請求,當請求執行完成以後自執行回調函數。 6. 什麼是同步阻塞? - 阻塞:等 - 同步:按照順序逐步執行 key_list = ['alex','db','sb'] for item in key_list: ret = requests.get('https://www.baidu.com/s?wd=%s' %item) print(ret.text) 7. 概念 以前:
# 你寫的代碼:7000w v = [ [11,22], # 每一個都有一個append方法 [22,33], # 每一個都有一個append方法 [33,44], # 每一個都有一個append方法 ] # 王思聰 for item in v: print(item.append)
以後:
class Foo(object): def __init__(self,data,girl): self.row = data self.girl = girl def append(self,item): self.row.append(item) v = [ Foo([11,22],'雪梨'), # 每一個都有一個append方法 Foo([22,33],'冰糖'), # 每一個都有一個append方法 Foo([33,44],'糖寶'), # 每一個都有一個append方法 ] for item in v: print(item.append) item.girl
概念:
進程,操做系統中存在;
線程,操做系統中存在;
協程,是由程序員創造出來的一個不是真實存在的東西;
協程:是微線程,對一個線程進程分片,使得線程在代碼塊之間進行來回切換執行,而不是在原來逐行執行。
import greenlet def f1(): print(11) gr2.switch() print(22) gr2.switch() def f2(): print(33) gr1.switch() print(44) # 協程 gr1 gr1 = greenlet.greenlet(f1) # 協程 gr2 gr2 = greenlet.greenlet(f2) gr1.switch() 注意:單純的協程無用 def f1(): print(11) print(33) def f2(): print(22) print(44) f1() f2()
from gevent import monkey monkey.patch_all() # 之後代碼中遇到IO都會自動執行greenlet的switch進行切換 import requests import gevent def get_page1(url): ret = requests.get(url) print(url,ret.content) def get_page2(url): ret = requests.get(url) print(url,ret.content) def get_page3(url): ret = requests.get(url) print(url,ret.content) gevent.joinall([ gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1 gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 協程2 gevent.spawn(get_page3, 'https://github.com/'), # 協程3 ])
總結: 1. 什麼是協程? 協程也能夠稱爲「微線程」,就是開發者控制線程執行流程,控制先執行某段代碼而後再切換到另外函執行代碼...來回切換。 2. 協程能夠提升併發嗎? 協程本身自己沒法實現併發(甚至性能會下降)。 協程+IO切換性能提升。 3. 進程、線程、協程的區別? 4. 單線程提供併發: - 協程+IO切換:gevent - 基於事件循環的異步非阻塞框架:Twisted - 手動實現協程:yield關鍵字生成器
def f1(): print(11) yield print(22) yield print(33) def f2(): print(55) yield print(66) yield print(77) v1 = f1() v2 = f2() next(v1) # v1.send(None) next(v2) # v1.send(None) next(v1) # v1.send(None) next(v2) # v1.send(None) next(v1) # v1.send(None) next(v2) # v1.send(None)
重點總結: 1. 進程、線程、協程的區別? ********** 2. 寫代碼:gevent ***** from gevent import monkey monkey.patch_all() # 之後代碼中遇到IO都會自動執行greenlet的switch進行切換 import requests import gevent def get_page1(url): ret = requests.get(url) print(url,ret.content) def get_page2(url): ret = requests.get(url) print(url,ret.content) def get_page3(url): ret = requests.get(url) print(url,ret.content) gevent.joinall([ gevent.spawn(get_page1, 'https://www.python.org/'), # 協程1 gevent.spawn(get_page2, 'https://www.yahoo.com/'), # 協程2 gevent.spawn(get_page3, 'https://github.com/'), # 協程3 ]) 3. 寫代碼:twisted ***** from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done) reactor.run() 4. 異步非阻塞 5. IO多路複用 做用:能夠監聽全部的IO請求的狀態。 - socket I,input o,output 三種模式: - select - poll - epoll
兩週總結: 網絡編程: 1. 網絡基礎 - 網卡 - IP - ... 2. OSI 7層 3. 三次握手四次揮手 4. BS和CS架構? 5. socket基本代碼 6. 黏包 7. 斷點續傳 8. 協議 自定義協議:{'code':10001,data:{...}} Http協議:GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n 9. 面向對象+高級做業:反射/面向對象 併發編程: 1. 進程、線程、協程的區別? 2. 線程 - 基本寫法 - 實例化 - 繼承 - 鎖 - RLock ... - 線程池 3. 進程 - 基本寫法 - 實例化 - 繼承 - 鎖 - RLock ... - 線程池 - 進程數據共享 4. 協程 - 協程 - 協程+IO:gevent 5. IO多路複用 6. 異步/同步 阻塞/非阻塞