1、socket服務端實現併發
服務端:
import socket
from threading import Thread
"""
服務端:
一、固定的ip和port
二、24小時不間斷提供服務
三、支持高併發
"""
server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5) # 半鏈接池
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0:break
print(data)
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
while True:
conn,addr = server.accept()
print(addr)
t = Thread(target=communicate,args=(conn,))
t.start()
客戶端:
import socket
client = socket.socket()
client.connect(('127.0.0.1',8080))
while True:
msg = input('>>>:').encode('utf-8')
if len(msg) == 0:continue
client.send(msg)
data = client.recv(1024)
print(data)
2、進程池線程池介紹
池:
爲了減緩計算機硬件的壓力,避免計算機硬件設施崩潰
雖然減輕了計算機的壓力,但必定程度上下降了持續的效率
進程池線程池:
爲了限制開設的進程數和線程數,從而保證計算機硬件的安全
使用方法:
concurrent.futures模塊導入
線程池建立(線程數=CPU核數*5左右)
submit提交任務(提交任務的兩種方式)
異步提交的submit返回值對象
shutdown關閉池並等待全部任務運行結束
對象獲取任務返回值
進程池的使用,驗證進程池在建立的時候裏面固定有指定的進程數
異步提交回調函數的使用
進程池:
from concurrent.futures import ProcessPoolExecutor
import time
import os
pool = ProcessPoolExecutor(5) # 建立一個池子,池子裏面有5個進程
def task(n):
print(n,os.getpid())
time.sleep(2)
return n**2
def call_back(n):
print('拿到告終果:%s'%n.result())
"""
提交任務的方式
同步:提交任務以後,原地等待任務的返回結果,再繼續執行下一步代碼
異步:提交任務以後,不等待任務的返回結果(經過回調函數拿到返回結果並處理),直接執行下一步操做
"""
if __name__ == '__main__':
for i in range(20):
future = pool.submit(task,i).add_done_callback(call_back)
print('主')
線程池:
from concurrent.futures import ThreadPoolExecutor
import time
import os
pool = ThreadPoolExecutor(5) # 建立一個池子,池子裏面有5個線程
def task(n):
print(n,os.getpid())
time.sleep(2)
return n**2
def call_back(n):
print('拿到告終果:%s'%n.result())
"""
提交任務的方式
同步:提交任務以後,原地等待任務的返回結果,再繼續執行下一步代碼
異步:提交任務以後,不等待任務的返回結果(經過回調函數拿到返回結果並處理),直接執行下一步操做
"""
if __name__ == '__main__':
for i in range(20):
future = pool.submit(task,i).add_done_callback(call_back)
print('主')
3、協程
進程:資源單位
線程:執行單位
協程:單線程下實現併發(可以在多個任務之間切換和保存狀態來節省IO),
這裏注意區分操做系統的切換+保存狀態是針對多個線程而言,
而咱們如今是想在單個線程下本身手動實現操做系統的切換+保存狀態的功能
注意:協程這個概念徹底是程序員本身想出來的東西,它對於操做系統來講根本不存在
操做系統只知道進程和線程,並非單個線程下實現切換+保存狀態就能提高效率,
若是沒有遇到io操做反而會下降效率
高併發:
多進程下開多線程,多線程下用協程
實現併發的手段:
yield可以實現保存上次運行狀態,可是沒法識別遇到io才切
gevent模塊:
一個spawn就是一個幫你管理任務的對象
from gevent import monkey;monkey.patch_all() # 檢測全部的io行爲
from gevent import spawn,joinall # joinall列表裏放多個對象,實現join效果
import time
def play(name):
print('%s play 1' % name)
time.sleep(5)
print('%s play 2' % name)
def eat(name):
print('%s eat 1' %name)
time.sleep(3)
print('%s eat 2' % name)
start = time.time()
g1 = spawn(play,'lucas')
g2 = spawn(eat,'lucas')
joinall([g1,g2])
print('主',time.time()-start)
4、協程實現服務端客戶端通訊
連接和通訊都是io密集型操做,咱們只須要在這二者之間來回切換其實就能實現併發的效果
服務端監測連接和通訊任務,客戶端起多線程同時連接服務端
服務端:
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket
def communicate(conn):
while True:
while True:
try:
data = conn.recv(1024)
if len(data) == 0:break
print(data)
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def sever():
server = socket.socket()
server.bind(('127.0.0.1',8080))
server.listen(5)
while True:
conn,addr = server.accept()
spawn(communicate,conn)
if __name__ == '__main__':
s1 = spawn(sever)
s1.join()
客戶端:
from threading import Thread,current_thread
import socket
def client():
client = socket.socket()
client.connect(('127.0.0.1',8080))
n = 1
while True:
data = '%s %s' % (current_thread().name,n)
n += 1
client.send(data.encode('utf-8'))
info = client.recv(1024)
print(info)
if __name__ == '__main__':
for i in range(500):
t = Thread(target=client)
t.start()
5、IO模型
阻塞IO
非阻塞IO(服務端通訊針對accept用s.setblocking(False)加異常捕獲,cpu佔用率太高)
IO多路複用
異步IO