socket.socket(AF.INET,socket.SOCK_STREAM)
2.1 sendall的發送方式與for i in f(局部)python
2.2 使用send、recv交替的方式作一個ack來解決黏包linux
3.1 5種不一樣的socket類nginx
3.1.1 baseserver用於繼承,不對外提供服務sql
3.1.2 tcpserver繼承baseserver類用於處理tcp鏈接shell
3.1.3 unixstreamserver繼承tcpserver使用UNIX域套接字實現面向數據流協議(sock_stream --->tcp)編程
3.1.4 udpserver繼承tcpsever用於處理udp鏈接windows
3.1.5 unixdatagramserver繼承tcpserver使用針對UNIX域套接字來處理數據報式協議(sock_dgram--->udp)安全
3.2 socketserver的實現服務器
3.2.1 step1 定義一個類,存放handler方法多線程
須要繼承socketserver.BaseRequestHandler,以後每生成一個新鏈接,都會實例化一個類,並調用這個handler方法,和客戶端全部的交互都是在這個handler裏面定義的
class myclass(socketserver.BaseRequestHandler): def handle(self): print('xxx') self.request.send(b'ok')
3.2.2 step2 定義一個實例,此實例關聯上一步的handler、並綁定ip和及端口,監聽端口處理新發起的鏈接,將新發起的鏈接實例化對像,並交給handler處理。
此處咱們要處理tcp,因此是tcpserver。考慮到併發,此處使用threading多線程實現,或者可使用ForkingServer來處理
s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass)
s2=socketserver.FuckingTCPServer(('localhost',8888),myclass)
3.2.3 step3 server.forever()
s1.serve_forever()
s2.serve_forever()
3.2.4 step4 客戶端對接測試
import socketserver class myclass(socketserver.BaseRequestHandler): def handle(self): print('xxx') self.request.send(b'ok') s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass) s2=socketserver.FockingTCPServer(('localhost',8888),myclass) s1.serve_forever() s2.serve_forever() ------------------------------- import socket c=socket.socket() c.connect(('localhost',8888)) print(c.recv(1024).decode())
經過byte的方式讀取文件結合使用ab的方式續寫文件的方式來實現斷點續傳,其核心的思路就是,得到當前半成品文件的字節數,使用seek將被操做文件的句柄移動到此處再日後讀取,以此實現斷點續傳
import socket c1=socket.socket() c1.connect(('localhost',6666)) ''' f=open(r'E:\L.exe','rb') count = 0 for i in f: c1.send(i) count+=1 if count > 10: break ''' x=input('seek到哪一個位置?') f=open(r'E:\L.exe','rb') f.seek(int(x)) print('開始傳輸') count=0 for i in f: print('\r%s'%count) count+=1 c1.send(i) print('ok') --------------------- import socket import os s1=socket.socket() s1.bind(('localhost',6666)) s1.listen() print('----begin----') conn,addr=s1.accept() print('鏈接創建') print(os.path.getsize(r'E:\\xxxx.exe')) f=open(r'E:\\xxxx.exe','ab') while True: data=conn.recv(1024) f.write(data) if not data: print('end') break ''' f=open(r'E:\\xxxx.exe','wb') while True: data=conn.recv(1024) f.write(data) if not data: print('end') break '''
5.1 定義對象
import optparse parse=optparse.OptionParser()
5.2 添加參數
parse.add_option('-u','--user',dest='user',action='store',type=str,metavar='user',help='Enter User Name!!') parse.add_option('-p','--port',dest='port',type=int,metavar='xxxxx',default=3306,help='Enter Mysql Port!!')
#-u,--user 表示一個是短選項 一個是長選項
#dest='user' 將該用戶輸入的參數保存到變量user中,能夠經過options.user方式來獲取該值
#type=str,表示這個參數值的類型必須是str字符型,若是是其餘類型那麼將強制轉換爲str(可能會報錯)
#metavar='user',當用戶查看幫助信息,若是metavar沒有設值,那麼顯示的幫助信息的參數後面默認帶上dest所定義的變量名
#help='Enter..',顯示的幫助提示信息
#default=3306,表示若是參數後面沒有跟值,那麼將默認爲變量default的值
#parse.set_defaults(v=1.2) #也能夠這樣設置默認值
5.3 監聽
將監聽結果賦值給options和args,一個結果爲屬性一個結果爲列表。
options,args=parse.parse_args()
5.4 例子
import optparse class test(): def __init__(self): parse=optparse.OptionParser() parse.add_option('-s',dest='x',help='server binding host',metavar='HOST') parse.add_option('-p',dest='port',help='server binding port') (options,args)=parse.parse_args() print(options.x,options.port) for i in args: print(i) F:\ftp服務器_sockserver版\server\core>python main.py -s 0.0.0.0 -p xxx aaa bbb ccc 0.0.0.0 xxx aaa bbb ccc F:\ftp服務器_sockserver版\server\core>python main.py -h Usage: main.py [options] Options: -h, --help show this help message and exit -s HOST server binding host -p PORT server binding port
import paramiko #step1 實例化ssh ssh = paramiko.SSHClient() #step2 #加上這句話不用擔憂選yes的問題,會自動選上(用ssh鏈接遠程主機時,第一次鏈接時會提示是否繼續進行遠程鏈接,選擇yes) ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)#家目錄/.ssh/known_hosts #step3 鏈接 ssh.connect(hostname='xxx.cm',port=22,username='xxxxx',password='xxxxx') #step4 執行 stdin,stdout,stderr=ssh.exec_command('df -h;pwd')#能夠用;執行多個指令 x1=stdout.read() x2=stderr.read() result=x1 if x1 else x2#三元運算 print(result.decode()) #step5 關閉 ssh.close()
------------------------
Filesystem Size Used Avail Use% Mounted on
/dev/sda1 7.9G 3.8G 3.8G 51% /
none 3.9G 4.0K 3.9G 1% /dev/shm
/dev/sda2 8.7G 298M 7.9G 4% /xxx/conf
/dev/sda3 893G 88G 761G 11% /xxx/data
/xxx/data/home/xxxxxx
import paramiko #step1 寫鏈接信息 linkit=paramiko.Transport('xxx.cm',22) linkit.connect(username='xxxxx',password='xxxx') #step2 建立鏈接對象 sftp_object=paramiko.SFTPClient.from_transport(linkit) #step3 上傳下載文件 sftp_object.put('config.conf','tmpfromwin')#上傳 # config.conf 爲本地文件 tmpfromwin爲本地文件上傳到服務器上的文件名 sftp_object.get('xxx.zip',r'f:\x.zip')#下載 #xxx.zip爲服務器的文件名 r'f:\x.zip'本地保存的位置及文件名
3.1 免密登陸的思路
PC-A 生成公鑰和私鑰
PC-A 將公鑰發給PC B
PC-A 無密碼登陸PC-B
Python 調用pca的私鑰便可完成對pcb的無密碼登陸
3.2 免密登陸的key生成
pc-A
[root@localhost ~]# ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: SHA256:cNTWlMyJXaOFTQMY3dFKCIywHfVzgmpXvjt0vVfuZao root@localhost.localdomain The key's randomart image is: +---[RSA 2048]----+ | ..o=+%o@Bo | | +..*o@o+o.| | o o.. *... | | o . o +. | | S . . . | | . . ... o| | ... .=| | .. ++| | Eo..o| +----[SHA256]-----+ [root@localhost ~]# [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 總用量 8 -rw-------. 1 root root 1675 4月 29 12:44 id_rsa -rw-r--r--. 1 root root 408 4月 29 12:44 id_rsa.pub [root@localhost .ssh]# ssh-copy-id -p 22 root@192.168.99.172 /usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub" The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? /usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? yes /usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys root@192.168.99.172's password: Number of key(s) added: 1 Now try logging into the machine, with: "ssh -p '22' 'root@192.168.99.172'" and check to make sure that only the key(s) you wanted were added. [root@localhost .ssh]# ssh 192.168.99.172 Last login: Sun Apr 29 12:39:28 2018 from 192.168.99.239 [root@localhost ~]# exit 登出 Connection to 192.168.99.172 closed. ----------------------------------------------------------
PC-B drwx------. 2 root root 29 4月 29 12:48 .ssh [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 總用量 4 -rw-------. 1 root root 408 4月 29 12:48 authorized_keys [root@localhost .ssh]# cat authorized_keys ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDMeTAz8cAtlZON+sUMSYpTXjhW32IkAdE+336BwW9LotWGznIOoS7d6cdvY...
3.3 python的實現
import paramiko #step 1 獲取私鑰 pk=paramiko.RSAKey.from_private_key_file('F:\id_rsa') #step 2 鏈接服務器,設置用戶名並關聯私鑰 link1=paramiko.Transport('xxx.cm',22) link1.connect(username='xxxx',pkey=pk) #step 3 建立transport對象 收發文件 myobject=paramiko.SFTPClient.from_transport(link1) myobject.get('xxx.sh',r'f:\xxx.sh')
3.4 一則網上的代碼,此處先不做分析
import paramiko import time def verification_ssh(host,username,password,port,root_pwd,cmd): s=paramiko.SSHClient() s.load_system_host_keys() s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) s.connect(hostname = host,port=int(port),username=username, password=password) if username != 'root': ssh = s.invoke_shell() time.sleep(0.1) ssh.send('su - \n') buff = '' while not buff.endswith('Password: '): resp = ssh.recv(9999) buff +=resp ssh.send(root_pwd) ssh.send('\n') buff = '' while not buff.endswith('# '): resp = ssh.recv(9999) buff +=resp ssh.send(cmd) ssh.send('\n') buff = '' while not buff.endswith('# '): resp = ssh.recv(9999) buff +=resp s.close() result = buff else: stdin, stdout, stderr = s.exec_command(cmd) result = stdout.read() s.close() return result
1.1 進程是系統進行資源分配和的基本單位
1.2 線程是CPU運算調度的最小單位
1.3 協程比線程還小的單位
2.1 進程就是資源的集合,這些資源包括對內存、對硬盤、對光驅等各類資源的調用的集合。
2.2 線程包含在進程中(進程是線程的容器)是進程的實際運算單位。
2.3 一個進程能夠包括一個或多個線程
2.4 進程須要操做cpu 必須經過建立線程來操做
2.5 一個線程,實際就是一個cpu的控制流,一個進程中能夠併發多個線程
2.6 程序是指令、數據及其組織形式的描述,進程是程序的實體。
3.1 一個進程是有一系列的相關的資源的集合一個進程的資源包括了memory page (內存頁,存儲頁,存儲頁面),文件句柄,socket鏈接,一些安全信息 如誰的id啓動了這個進程
3.2 進程上下文,意思是可執行程序代碼是進程的重要組成部分。進程上下文其實是進程執行活動全過程的靜態描述。
3.3 每一個進程執行過的、執行時的以及待執行的指令和數據;在指令寄存器、堆棧、狀態字寄存器等中的內容。此外, 還包括進程打開的文件描述符等.
3.4 多個線程共享進程上下文(即資源)
3.5 一個進程的全部線程 共享同一塊內存空間
3.6 線程快仍是進程快 沒有可比性 一個是資源的集合 一個是cpu的控制流,進程須要執行 也必須經過建立線程來執行
3.7 線程建立快 仍是進程建立快 線程建立快 它就是一段cpu的指令集,而進程須要去申請各類資源組成集合
3.8 線程共享內存空間 進程間資源互相獨立
3.8.1一個父進程建立子進程,至關於克隆了一份獨立的內容 多個子進程直接的資源是不能互相訪問的
3.8.2 一個線程,建立多個線程都共享同一個進程的資源
3.9同一個進程的多個線程直接能直接聯繫交流交互,兩個進程想通訊交流 要找一箇中間代理
3.10
3.10.1對於一個主線程的修改 可能或影響其餘線程的運行(由於共享資源)
3.10.2對於父進程的修改,不會影響到子進程(資源獨立)
兩步,一步關聯函數和變量,一步start
import threading import time def PointIt(x): print('---->',x) time.sleep(5) print(x,'--->ok') PointIt('n1') PointIt('n2') #對比n1/n2/n3/n4的輸出用時 #step 1 建立進程,target爲函數的函數名,args內輸入函數的參數 n3=threading.Thread(target=PointIt,args=('n3',))#逗號不能漏 n4=threading.Thread(target=PointIt,args=('n4',))#逗號不能漏
#step 2 start
n3.start() n4.start() print('done')
import threading import time #step 1 創建一個類,須要繼承線程類 class Mythread_class(threading.Thread): #step 2定義構造函數,接收須要輸入的變量 def __init__(self,n): super(Mythread_class, self).__init__() self.n=n #step 3 定義run ->這裏必須是run這個名字 def run(self): print(self.n) time.sleep(3) print(self.n,'done') #step 4 生成實例,每一個實例運行start方法都會啓動一個線程運行run方法 t1=Mythread_class('t1') t2=Mythread_class('t2') t1.run()#這樣不行 這樣仍是單線程 t2.run()#若是直接是run方法,仍是單線程 t1.start() t2.start()#多線程 for i in range(5): x=Mythread_class('x-%s'%i) x.start() print('主線程done')
程序運行的主線程在啓動子線程以後不會等子線程執行完畢 而是繼續運行主線程
一樣,在主線程中也沒法計算出子線程運行所花費的時間
若是run()運行結束則join()結束,若是run()沒有完成,則卡住等待完成。經過這個方法來判斷各個線程都運行結束所花費的時間
import threading import time class Mythread(threading.Thread): def __init__(self,n): super(Mythread, self).__init__() self.n=n def run(self): print('t-%s is running,當前線程爲%s,當前活躍線程數爲%s'%(self.n,threading.current_thread(),threading.active_count())) time.sleep(3) print('t-%s done'%self.n) time1=time.time() t_object=[]#使用這個列表來記錄線程對象 for i in range(10): x=Mythread(i) x.start() t_object.append(x) print('main',threading.current_thread()) for i in t_object: i.join() time2=time.time() print(time2-time1) ------------------------------------------ t-0 is running,當前線程爲<Mythread(Thread-1, started 2712)>,當前活躍線程數爲2 t-1 is running,當前線程爲<Mythread(Thread-2, started 9332)>,當前活躍線程數爲3 t-2 is running,當前線程爲<Mythread(Thread-3, started 2696)>,當前活躍線程數爲4 t-3 is running,當前線程爲<Mythread(Thread-4, started 9884)>,當前活躍線程數爲5 t-4 is running,當前線程爲<Mythread(Thread-5, started 3272)>,當前活躍線程數爲6 t-5 is running,當前線程爲<Mythread(Thread-6, started 9208)>,當前活躍線程數爲7 t-6 is running,當前線程爲<Mythread(Thread-7, started 1828)>,當前活躍線程數爲8 t-7 is running,當前線程爲<Mythread(Thread-8, started 9256)>,當前活躍線程數爲9 t-8 is running,當前線程爲<Mythread(Thread-9, started 1060)>,當前活躍線程數爲10 t-9 is running,當前線程爲<Mythread(Thread-10, started 9780)>,當前活躍線程數爲11 我是主線程main <_MainThread(MainThread, started 2056)> t-0 done t-4 done t-5 done t-2 done t-3 done t-1 done t-8 done t-9 done t-7 done t-6 done
3.003652811050415
當主線程執行完畢後,主線程結束,則全部守護線程所有結束,不管運行到什麼狀態 setDeamon
import threading import time class Myclass(threading.Thread): def __init__(self,n): super(Myclass, self).__init__() self.n=n def run(self): print(self.n,'begin') time.sleep(3) print(self.n,'------------->done','目前還活躍的線程數:',threading.active_count()) tmp_list=[] for i in range(20): j=Myclass(i) j.setDaemon(True)#setdaesmon來設置爲守護進程 j.start() tmp_list.append(j) time.sleep(3) print('我要結束主進程了') ----------------------------------------------- 0 begin 1 begin 2 begin 3 begin 4 begin 5 begin 6 begin 7 begin 8 begin 9 begin 10 begin 11 begin 12 begin 13 begin 14 begin 15 begin 16 begin 17 begin 18 begin 19 begin 1 ------------->done 目前還活躍的線程數: 21 0 ------------->done 目前還活躍的線程數: 20 4 ------------->done 目前還活躍的線程數: 19 3 ------------->done 目前還活躍的線程數: 18 6 ------------->done 目前還活躍的線程數: 17 5 ------------->done 目前還活躍的線程數: 16 2 ------------->done 目前還活躍的線程數: 15 11 ------------->done 目前還活躍的線程數: 14 12 ------------->done 目前還活躍的線程數: 14 10 ------------->done 目前還活躍的線程數: 14 7 ------------->done 目前還活躍的線程數: 12 9 ------------->done 目前還活躍的線程數: 12 14 ------------->done 目前還活躍的線程數: 11 8 ------------->done 目前還活躍的線程數: 9 我要結束主進程了
5.1 In CPython,This lock is necessary mainly because CPython’s memory management is not thread-safe.
5.2 多個線程都打到多個cpu的核上 可是同一時刻只能有一個線程在真正的工做
5.3 全局解釋器鎖(Global Interpreter Lock)是計算機程序設計語言解釋器用於同步線程的工具,使得任什麼時候刻僅有一個線程在執行
5.4 一個python解釋器進程內有一條主線程,以及多條用戶程序的執行線程。即便在多核CPU平臺上,因爲GIL的存在,因此禁止多線程的並行執行。
5.5 Python 3.2開始使用新的GIL。
5.6 能夠建立獨立的進程來實現並行化
一個全局變量,每一個線程都要對其進行操做,爲了防止一個線程還未對其操做完畢,即這個變量的值還未被原線程修改時,這個變量的值就被傳到下一個線程進行操做,這樣有可能照成最終的結果不許確。爲了防止這種狀況,使用互斥鎖解決
step1 實例一個鎖對象
step2 acqiure()方法->鎖變量
step3 release()方法釋放變量
import threading import time mylock=threading.Lock() num=0 class Myclass(threading.Thread): def __init__(self,n): super(Myclass, self).__init__() self.n=n def run(self): print(self.n,'begin') mylock.acquire() time.sleep(3) global num num += 1 mylock.release() time.sleep(1) #mylock.release()使用完後迅速釋放 tmp_list=[] for i in range(5000): x=Myclass(i) x.start() tmp_list.append(x) for i in tmp_list: i.join() print(num)
mylock=threading.RLock()#RLock 遞歸鎖
import threading mylock=threading.RLock()#RLock 遞歸鎖 number1=0 number2=1111 count=0 def run1(): mylock.acquire() global number1 number1+=1 mylock.release() def run2(): #mylock.acquire() 只能套兩層鎖,若是套三層鎖,即把這個註釋去掉,就卡死了 global number2 number2+=2 #mylock.acquire() def terminal_fun(): mylock.acquire() global count run1() print('between run1 with run2') run2() print('count:',count) count+=1 mylock.release() for i in range(5): x=threading.Thread(target=terminal_fun) x.start() while threading.active_count() != 1: print(threading.active_count()) else: print('ok')
---------------------------------
between run1 with run2
count: 0
between run1 with run2
count: 1
between run1 with run2
count: 2
between run1 with run2
count: 3
between run1 with run2
count: 4
ok
此代碼說明:注意主程序結束的寫法法,本程序使用的是:等待活躍線程數等於1結束的方法
不能單純理解爲線程併發數
threading.BoundedSemaphore(3) 同時訪問資源的線程數量,但不是活躍的線程數量
線程中,信號量主要是用來維持有限的資源,使得在必定時間使用該資源的線程只有指定的數量
是一個變量,控制着對公共資源或者臨界區的訪問。信號量維護着一個計數器,指定可同時訪問資源或者進入臨界區的線程數。
每次有一個線程得到信號量時,計數器-1。若計數器爲0,其餘線程就中止訪問信號量,直到另外一個線程釋放信號量。
如下代碼,io爲一個信號量,查看活躍的線程,實際全部的線程都跑起來了,可是,能輸出的只有三個線程
import threading import time limit_num = threading.BoundedSemaphore(3) # 控制併發,不是三個一組,釋放一個,就新增一個 def run(x): limit_num.acquire() print('i am %s' % x, threading.active_count()) time.sleep(3) print('i am %s' % x, threading.active_count()) limit_num.release() for i in range(20): x = threading.Thread(target=run, args=(i,)) x.start() while threading.active_count() != 1: pass else: print('done!') ------------------------------------------- i am 0 2 i am 1 3 i am 2 4 i am 1 21 i am 2 21 i am 4 20 i am 0 19 i am 3 18 i am 5 18 i am 3 18 i am 4 18 i am 6 16 i am 7 16 i am 5 16 i am 8 15 i am 7 15 i am 6 15 i am 9 13 i am 10 13 i am 8 13 i am 11 12 i am 10 12 i am 9 12 i am 11 11 i am 13 10 i am 14 9 i am 12 9 i am 14 9 i am 13 9 i am 12 9 i am 15 7 i am 17 7 i am 16 7 i am 15 6 i am 17 5 i am 18 5 i am 16 5 i am 19 3 i am 18 3 i am 19 2 done!
可用於多個線程間同步信息 如同一個線程控制紅綠燈,一個線程等待紅綠燈
四個知識點
9.1 生成事件 threading.Event()
9.2 set事件 x.set()
9.3 clear事件 x.clear()
9.4 wait事件 x.wait()
import threading import time my_green_red_light=threading.Event() def light():#負責對light event 進行set或clear time_count=0 my_green_red_light.set() while True: if time_count >8 and time_count < 16: my_green_red_light.clear() print('\033[41;1mred\033[0m') elif time_count > 16: my_green_red_light.set() time_count=0 else: print('\033[42;1mgreen\033[0m') time.sleep(0.3) time_count+=1 def car(x): while True: if my_green_red_light.is_set(): print('car [%s] is running'%x) else: print('is red now,wait for green....') my_green_red_light.wait() print('green now ,go go go !!!') time.sleep(0.8) light_thread=threading.Thread(target=light) car_thread=threading.Thread(target=car,args=('tesla',)) light_thread.start() car_thread.start() ------------------------------ green car [tesla] is running green green car [tesla] is running green green green car [tesla] is running green green car [tesla] is running green red red is red now,wait for green.... red red red red red green green now ,go go go !!! green green car [tesla] is running green green green car [tesla] is running green
使用隊列的意義
一、解耦 是程序之間實現雙耦合 經過生產者消費者模型
二、提升處理效率
10.1 q.qsize()獲取隊列長度
10.2 q.put()放元素
10.3 q.get()拿元素
10.4 q.nowait()是否等待(當即響應)
10.5 q.get(block=False)是否阻塞
10.6 q.get(timeout=1)響應超時
10.7 q=queue.Queue(maxsize=3)隊列長度
10.8 q=queue.LifoQueue()(last in first out 堆棧)
import queue >>> import queue >>> x=queue.Queue(maxsize=5) >>> x.put(1) >>> x.put(2) >>> x.put(3) >>> x.put(4) >>> x.put(5,timeout=1) >>> x.put(6,timeout=1) Traceback (most recent call last): File "<pyshell#8>", line 1, in <module> x.put(6,timeout=1) queue.Full >>> x.get() 1 >>> x.get() 2 >>> x.get() 3 >>> x.get() 4 >>> x.get(block=False) 5 >>> x.get(block=False) Traceback (most recent call last): File "<pyshell#14>", line 1, in <module> x.get(block=False) queue.Empty >>> >>> y=queue.LifoQueue() >>> y.put(1) >>> y.put(2) >>> y.put(3) >>> y.get() 3 >>> y.get() 2 >>> y.get() 1 >>> z=queue.PriorityQueue() >>> z.put((1,'123')) >>> z.put((-1,'456')) >>> z.put((5,'789')) >>> z.get <bound method Queue.get of <queue.PriorityQueue object at 0x000001A41A496550>> >>> z.get() (-1, '456') >>> z.get() (1, '123') >>> z.get() (5, '789') >>> >>> x.qsize() 0 >>>
import queue import time import threading q=queue.Queue(maxsize=5) def product_it(name,t): count=0 while True: data='骨頭%s'%count q.put(data) print('[%s]生成[%s]'%(name,data)) count+=1 time.sleep(t) def eat_it(name,t): while True: print('[%s]吃了骨頭[%s]'%(name,q.get())) time.sleep(t) t1=threading.Thread(target=product_it,args=('北京店',4,)) t2=threading.Thread(target=product_it,args=('上海店',0.3,)) t3=threading.Thread(target=eat_it,args=('小明',5,)) t4=threading.Thread(target=eat_it,args=('小王',2,)) t1.start() t2.start() t3.start() t4.start() ------------------------------ [北京店]生成[骨頭0] [上海店]生成[骨頭0] [小明]吃了骨頭[骨頭0] [小王]吃了骨頭[骨頭0] [上海店]生成[骨頭1] [上海店]生成[骨頭2] [上海店]生成[骨頭3] [上海店]生成[骨頭4] [上海店]生成[骨頭5] [小王]吃了骨頭[骨頭1] [上海店]生成[骨頭6] [小王]吃了骨頭[骨頭2] [上海店]生成[骨頭7] [小明]吃了骨頭[骨頭3] [北京店]生成[骨頭1]
cpu密集型,io密集型
線程佔用cpu資源 python的多線程實際是經過gil鎖進行上下文切換 某個時間點上 只有一個線程在跑
若是cpu只有一個核心 那麼 不管有多少個線程 同一時間,只可能有一個線程在被cpu處理
線程之間經過快速切換執行,使使用者感受同一時刻多線程並行的效果,但單核狀況實際是串行的
若是多核心 理論上來講 不一樣的核心同一時刻能夠運行不一樣的線程
可是在python中 因爲考慮到線程間數據共享的狀況 python內存在GLI鎖,使得同一時間內,只有一個線程運行 不管多少核 這個是python的侷限性
python語言誕生時 cpu只有單核 開發者沒有考慮多核的狀況,
cpu在執行指令時須要知道上下文關係 因此python在啓用線程的時候 調用的是c語言的thread接口 並將這個關係傳給cpu (和互斥鎖相似 有一個全局變量+1)
若是線程同時操做,拿到的上下文可能同樣 因此爲了不上下文雷同 使用gli鎖來確保同一時刻只有一個線程在執行
1.1 進程間是獨立的
1.2 python的進程是操做系統的原生進程,進程間管理是由操做系統來完成的,且進程間的數據相互獨立也不須要鎖這個概念。因此python的多進程 能夠解決python的多核問題
multiprocessing.Process(target=run,args=(i,))
import multiprocessing import time import threading def run(x): print('process %s is running'%x) t=threading.Thread(target=mythread,args=(x,)) t.start() print('process %s is done'%x) def mythread(x): print('進程%s啓用的線程的線程號是:%s'%(x,threading.get_ident()))#打印線程號 if __name__=='__main__': for i in range(10): x=multiprocessing.Process(target=run,args=(i,)) x.start() ------------------------------------------- process 4 is running process 4 is done 進程4啓用的線程的線程號是:2172 process 0 is running process 0 is done 進程0啓用的線程的線程號是:764 process 2 is running process 6 is running process 2 is done 進程2啓用的線程的線程號是:8236 process 9 is running process 3 is running 進程6啓用的線程的線程號是:2544 process 6 is done process 5 is running process 7 is running 進程9啓用的線程的線程號是:3536 process 9 is done process 3 is done 進程3啓用的線程的線程號是:484 process 5 is done process 7 is done 進程5啓用的線程的線程號是:9624 進程7啓用的線程的線程號是:1464 process 1 is running 進程1啓用的線程的線程號是:1764 process 1 is done process 8 is running 進程8啓用的線程的線程號是:3516 process 8 is done
此處能夠觀察cpu的使用狀況,能發現cpu的使用率快速提高
threading.get_ident()/os.getpid()/os.getppid()
import multiprocessing import os def info(): tmp_info ='''\tname:%s\n\tppid:%s\n\tpid :%s'''%(__name__,os.getppid(),os.getpid()) print(tmp_info) def run(): print('子進程信息'.center(50,'-')) x = multiprocessing.Process(target=run2) x.start() info() def run2(): print('子進程信息'.center(50,'-')) info() if __name__=="__main__": print('父進程信息'.center(50,'-')) info() x=multiprocessing.Process(target=run) x.start() --------------------------------------------------------------- ----------------------父進程信息----------------------- name:__main__ ppid:2592 pid :10720 ----------------------子進程信息----------------------- name:__mp_main__ ppid:10720 pid :7752 ----------------------子進程信息----------------------- name:__mp_main__ ppid:7752 pid :5696
5.1 進程隊列方式進行進程間通訊
此處須要對比進程隊列與線程隊列的區別
queue.Queue()普通的隊列,在一個進程中使用
multithreading.Queue 原理爲pickle序列號與反序列化,並非資源共享
import multiprocessing import threading import queue #進程queue與線程queue def main(my_queue): print(my_queue.get()) print(my_queue.get()) print(my_queue.get()) def main2(x): x.put(1) x.put(2) x.put(3) if __name__=='__main__': my_queue = queue.Queue() my_queue.put('[a]') my_queue.put('[b]') my_queue.put('[c]') t1 = threading.Thread(target=main,args=(my_queue,)) t1.start()#子線程訪問主線程資源 #t1 = threading.Thread(target=main)此處不將隊列傳入,線程依然能調用主線程的隊列 #t1.start() # 子線程訪問主線程資源 #p1=multiprocessing.Process(target=main,args=(my_queue,)) #p1.start()#子進程訪問主進程資源 #即便是父進程將進程queue作完參數傳給子進程,子進程依然沒法訪問這個queue Q = multiprocessing.Queue()#進程queue multiprocessing.Queue() p2=multiprocessing.Process(target=main2,args=(Q,))#將隊列傳入子進程 p2.start() p2.join()#等待子進程執行完畢,確保都入隊成功 print(Q.get()) print(Q.get()) print(Q.get()) ---------------------------------------- [a] [b] [c] 1 2 3
5.2 進程通道pip方式進行進程間通訊
multiprocessing.Pipe()結合send和recv來使用
import multiprocessing import threading import time def f(x1): print('f') x1.send('hello,i am p1') print(x1.recv()) def f2(x2): print('f2') time.sleep(3) print(x2.recv()) x2.send('ok , p2 ack') if __name__=='__main__': x1, x2 = multiprocessing.Pipe() p1=multiprocessing.Process(target=f,args=(x1,)) p2=multiprocessing.Process(target=f2,args=(x2,)) p1.start() p2.start() ------------------------ f f2 hello,i am p1 ok , p2 ack
5.3 多進程共同修改字典列表的方法manager().dict()、manager().list()
import multiprocessing import os def f(x,y): x[os.getpid()]=os.getpid()#x是一個字典,給x這字典添加一個key和value y.append(os.getpid())#y是一個列表,給y這個列表添加一個value print(x,y) if __name__=='__main__': with multiprocessing.Manager() as xxx: x=xxx.dict()#生成一個字典 一個多進程manage的字典 y=xxx.list(range(5))#列表生成 生成一個列表 一個多進程manage的列表 p_list=[]#主進程列表 for i in range(5): p=multiprocessing.Process(target=f,args=(x,y,))#給進程manage的列表字典添加元素 p.start() p_list.append(p)#爲了確保是否添加完成 for i in p_list: i.join() #配合上面的start 判斷是否子進程運行結束 ------------------------------------------------ {19080: 19080} [0, 1, 2, 3, 4, 19080] {19080: 19080, 20880: 20880} [0, 1, 2, 3, 4, 19080, 20880] {19080: 19080, 20880: 20880, 15908: 15908} [0, 1, 2, 3, 4, 19080, 20880, 15908] {19080: 19080, 20880: 20880, 15908: 15908, 8140: 8140} [0, 1, 2, 3, 4, 19080, 20880, 15908, 8140] {19080: 19080, 20880: 20880, 15908: 15908, 8140: 8140, 13536: 13536} [0, 1, 2, 3, 4, 19080, 20880, 15908, 8140, 13536]
以上能夠看出,多進程實時修改數據(字典,列表,隊列) 無需互斥鎖。
爲何要有進程鎖 ,不一樣進程之間不是不能訪問各自的內存空間
可是例如屏幕,打印機,投影儀這種資源在進程很是多的狀況下可能會出現例如輸出錯行的狀況,此時就要用到鎖
進程鎖比較少用到
import multiprocessing import os def f(x,i): x.acquire() print(i,'---',os.getpid()) x.release() if __name__=='__main__': lock=multiprocessing.Lock() for i in range(1000): p=multiprocessing.Process(target=f,args=(lock,i,)) p.start()
------------------------
2 --- 3784
4 --- 4544
3 --- 13556
5 --- 4864
0 --- 13100
6 --- 15272
1 --- 20104
8 --- 11800
7 --- 672
9 --- 19056
7.1
進程池 用於限制同時運行的進程數量
#運行進程過多致使進程間頻繁切換下降程序效率->根據cpu核心等實際狀況,優化同時運行的進程數量
#對同時運行的進程數進行限制 多進程同時存在,可是隻有固定數量的進程在執行
7.2
step1 實例化一個pool
step2 用實例化的pool去生成進程。須要注意兩種生成進程的方式apply與apply_async一種是單進程執行,一種是多進程執行
import multiprocessing import os import time def f(i): print('process',i,'is running...',os.getpid()) time.sleep(3) return i #傳遞給回調函數 def backtome(xxxx): print(xxxx,'is done',os.getpid()) if __name__=='__main__': print('主進程的pid爲:',os.getpid()) mypool=multiprocessing.Pool(processes=5)#設置同一時刻能實際運行的進程數量 for i in range(20): #p=mypool.apply(func=f,args=(i,))#單核串行 p=mypool.apply_async(func=f,args=(i,),callback=backtome) print('end') mypool.close()#必須添加close() mypool.join()#必須添加join ------------------------------------------- 主進程的pid爲: 12720 end process 0 is running... 10644 process 1 is running... 5056 process 2 is running... 20972 process 3 is running... 6148 process 4 is running... 19120 process 5 is running... 10644 0 is done 12720 process 6 is running... 5056 1 is done 12720 2 is done 12720 process 7 is running... 20972 3 is done 12720 process 8 is running... 6148 4 is done 12720 process 9 is running... 19120 process 10 is running... 10644 5 is done 12720 6 is done 12720 process 11 is running... 5056 7 is done 12720 process 12 is running... 20972 9 is done 12720 process 13 is running... 19120 8 is done 12720 process 14 is running... 6148 process 15 is running... 10644 10 is done 12720 process 16 is running... 5056 11 is done 12720 process 17 is running... 20972 12 is done 12720 14 is done 12720 process 18 is running... 6148 13 is done 12720 process 19 is running... 19120 15 is done 12720 16 is done 12720 17 is done 12720 18 is done 12720 19 is done 12720
apply無回調方法
import multiprocessing import os import time def f(i): print('process',i,'is running...',os.getpid()) time.sleep(3) print('end') return i #傳遞給回調函數 def backtome(xxxx): print(xxxx,'is done',os.getpid()) if __name__=='__main__': print('主進程的pid爲:',os.getpid()) mypool=multiprocessing.Pool(processes=5)#設置同一時刻能實際運行的進程數量 for i in range(20): p=mypool.apply(func=f,args=(i,),)#單核串行 #p=mypool.apply(func=f, args=(i,),callback=backtome)apply無callback函數 #p=mypool.apply_async(func=f,args=(i,),callback=backtome) print('end') mypool.close()#必須添加close() mypool.join()#必須添加join -------------------------------- 主進程的pid爲: 17912 process 0 is running... 4080 end process 1 is running... 18092 end process 2 is running... 5924 end process 3 is running... 14276 end process 4 is running... 1800 end process 5 is running... 4080 end process 6 is running... 18092 end process 7 is running... 5924 end process 8 is running... 14276 end process 9 is running... 1800 end process 10 is running... 4080 end process 11 is running... 18092 end process 12 is running... 5924 end process 13 is running... 14276 end process 14 is running... 1800 end process 15 is running... 4080 end process 16 is running... 18092 end process 17 is running... 5924 end process 18 is running... 14276 end process 19 is running... 1800 end end
協程 異步io
協程在單線程狀況下操做
協程是一種用戶態的輕量級線程 -> cpu根本不知道他的存在
線程在切換的時候有cpu的寄存器來保存每一個線程的狀態 每一個線程有本身的上下文
協程在切換時的狀態須要用戶本身將上下文保存到相應的地方。
yelid是協程的一種
在單線程下實現併發的效果,用戶須要本身保存運行狀態
2.1 無需線程切換 也就無需cpu在線程切換時上下文切換的開銷
2.2 無需原子操做鎖定及同步開銷 ->相對於多線程的互斥鎖 ->協程是單線程的串行操做
2.3 方便切換控制流 簡化模型
2.4 高併發 高擴展 低成本
協程實質上是一個單線程,沒法利用多核資源
協程須要和進程配合才能在多cpu上運行
nginx 就是單線程 就能支持上萬個併發
協程能處理上萬併發的思路,遇到io就進行協程切換 -> io完成就自動切換到原協程
#進程是資源分配的最小單位,線程是CPU調度的最小單位.這是計算機裏常常考的
#協程。比線程還小的單位
import time def produce_scq(n,x,y):#生產者實際就是個函數 ,實體化生成器並next他 x.__next__()#這裏很關鍵 y.__next__() print(n,'is producing') count=0 while count < 20: x.send('包子%s'%count) y.send('包子%s'%count) count +=1 return 'done' def custom_scq(n): #消費者是生成器 ,等待外部信息輸入 print('start eat') while True: baozi=yield print(n,'is eating',baozi) time.sleep(0.01) c1=custom_scq('小明') c2=custom_scq('小東') print(produce_scq('大明',c1,c2)) --------------------------------------------- start eat start eat 大明 is producing 小明 is eating 包子0 小東 is eating 包子0 小明 is eating 包子1 小東 is eating 包子1 小明 is eating 包子2 小東 is eating 包子2 小明 is eating 包子3 小東 is eating 包子3 小明 is eating 包子4 小東 is eating 包子4 小明 is eating 包子5 小東 is eating 包子5 小明 is eating 包子6 小東 is eating 包子6 小明 is eating 包子7 小東 is eating 包子7 小明 is eating 包子8 小東 is eating 包子8 小明 is eating 包子9 小東 is eating 包子9 小明 is eating 包子10 小東 is eating 包子10 小明 is eating 包子11 小東 is eating 包子11 小明 is eating 包子12 小東 is eating 包子12 小明 is eating 包子13 小東 is eating 包子13 小明 is eating 包子14 小東 is eating 包子14 小明 is eating 包子15 小東 is eating 包子15 小明 is eating 包子16 小東 is eating 包子16 小明 is eating 包子17 小東 is eating 包子17 小明 is eating 包子18 小東 is eating 包子18 小明 is eating 包子19 小東 is eating 包子19 done
step1 greenlet.greenlet(xxx)
step2 xxx.switch
import multiprocessing import greenlet def test1(): print(12) g2.switch() print(56) g2.switch() def test2(): print(34) g1.switch() print(78) g1=greenlet.greenlet(test1) g2=greenlet.greenlet(test2) #實例化兩個協程來運行函數,實際此時函數還未運行 g1.switch()#開始執行g1 --------------------------------------------- 12 34 56 78
切換思路,遇到io或非cpu操做時,自動切換,讓cpu繼續處理下一個指令
gevent.sleep與gevent.joinall()、gevent.spawn的搭配使用
import gevent def f1(): print('f1 run') gevent.sleep(2) print('f1 done') def f2(): print('f2 run') gevent.sleep(1) print('f2 done') def f3(): print('f3 run') gevent.sleep(0) print('f3 done') gevent.joinall([gevent.spawn(f1),gevent.spawn(f2),gevent.spawn(f3)]) #生成協程 ---------------------------------------- f1 run f2 run f3 run f3 done f2 done f1 done
#gevent.sleep 模擬io操做 -》 joinall 所有執行 gevent.spawn 生成協程
f1 -> sleep -> f2 -> sleep -> f3 -> f1還未執行完成 -> f2 還未完成 -> f3 ok -> f3 done -> f1 還未完成-> f2 還未完成->f1 還未完成->f2 ok -> f1 還未完成 -> f1 ok
import urllib import gevent,time #step 1 導入模塊 from urllib import request from gevent import monkey #step 2 監控IO monkey.patch_all() #step 3 def f(url): print('GET:',url) res=request.urlopen(url)#獲取數據 # step 4 用來展現get到的這個url有多少data data=res.read()#讀數據 print('bytes:',len(data),'url:',url) t1=time.time()#獲取程序開始時間 f('http://www.dangdang.com') f('http://d.cn') f('http://www.163.com') f('http://www.126.com')#單線程執行 t2=time.time() print('--------------------->cost:',t2-t1) #step 5 協程執行 gevent.joinall([gevent.spawn(f,'http://d.cn'),gevent.spawn(f,'http://www.dangdang.com'),gevent.spawn(f,'http://www.163.com'),gevent.spawn(f,'http://www.126.com')]) t3=time.time() print('--------------------->cost:',t3-t2) ------------------------------------------------------------------- GET: http://www.dangdang.com bytes: 720 url: http://www.dangdang.com GET: http://d.cn bytes: 80968 url: http://d.cn GET: http://www.163.com bytes: 688755 url: http://www.163.com GET: http://www.126.com bytes: 12921 url: http://www.126.com --------------------->cost: 1.0575993061065674 GET: http://d.cn GET: http://www.dangdang.com GET: http://www.163.com GET: http://www.126.com bytes: 720 url: http://www.dangdang.com bytes: 688755 url: http://www.163.com bytes: 80968 url: http://d.cn bytes: 12921 url: http://www.126.com --------------------->cost: 0.14879226684570312
此處的joinall是等協程都運行結束的意思
若是沒有joinall()
import urllib import gevent,time #step 1 導入模塊 from urllib import request from gevent import monkey #step 2 監控IO monkey.patch_all() #step 3 def f(url): print('GET:',url) res=request.urlopen(url)#獲取數據 # step 4 用來展現get到的這個url有多少data data=res.read()#讀數據 print('bytes:',len(data),'url:',url) t1=time.time()#獲取程序開始時間 f('http://www.dangdang.com') f('http://d.cn') f('http://www.163.com') f('http://www.126.com')#單線程執行 t2=time.time() print('--------------------->cost:',t2-t1) #step 5 協程執行 gevent.spawn(f,'http://d.cn') gevent.spawn(f,'http://www.dangdang.com') gevent.spawn(f,'http://www.163.com') gevent.spawn(f,'http://www.126.com') t3=time.time() print('--------------------->cost:',t3-t2) --------------------------------- GET: http://www.dangdang.com bytes: 168224 url: http://www.dangdang.com GET: http://d.cn bytes: 80968 url: http://d.cn GET: http://www.163.com bytes: 688790 url: http://www.163.com GET: http://www.126.com bytes: 12921 url: http://www.126.com --------------------->cost: 1.2731683254241943 --------------------->cost: 0.0
單線程遇到io輪訓的過程,須要注意註冊輪詢的過程。gevent.spawn
import socket import gevent from gevent import monkey monkey.patch_all() def server_port_and_listen(): sever1=socket.socket() sever1.bind(('localhost',9999)) sever1.listen() count=1 print('開始監聽9999端口') while True: conn,addr=sever1.accept() #精髓在這裏 每一個鏈接起一個協程 gevent.spawn(server_deal_conn,conn,count) #協程輪訓監聽 遇到io自動切換 #server_deal_conn(conn) count+=1 def server_deal_conn(x,i): print('鏈接%s創建'%i) try: while True: data=x.recv(1024) if not data: print('鏈接已經斷開') break tmp_data='i am %s\n'%i+data.decode() x.send(tmp_data.encode('utf-8')) #此處不考慮長度了,只作短消息傳遞 except Exception as ex: print(ex) finally: print('斷開鏈接',i,'詳細信息',x) x.close() server_port_and_listen()
------------------------------------------
開始監聽9999端口
鏈接1創建
鏈接2創建
鏈接3創建
鏈接4創建
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 1 詳細信息 <gevent._socket3.socket object, fd=616, family=2, type=1, proto=0>
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 2 詳細信息 <gevent._socket3.socket object, fd=624, family=2, type=1, proto=0>
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 3 詳細信息 <gevent._socket3.socket object, fd=628, family=2, type=1, proto=0>
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 4 詳細信息 <gevent._socket3.socket object, fd=620, family=2, type=1, proto=0>
鏈接5創建
鏈接6創建
即 :事件 -> 觸發 ->迴應
事件如我點擊鼠標、鍵盤、個人網卡收到數據包
迴應如點鼠標關機,ctrl+s保存文件。回覆對方的ping包。和對方三次握手。
2.1 單線程
2.2 多線程
2.3 異步
單線程和多線程隨着須要掃描設備的增長,更容易出現響應時間和遺漏響應的狀況。多線程須要屢次切換上下文。
針對不一樣的操做系統有不一樣的庫能夠選擇
4.一、select庫
select庫是各個版本的linux和windows平臺都支持的基本事件驅動模型庫,而且在接口的定義上也基本相同,只是部分參數的含義略有差別。
Select庫有三種事件:讀事件、寫事件、異常事件
4.二、poll庫
poll庫,做爲linux平臺上的基本事件驅動模型,Windows平臺不支持poll庫。
4.三、epoll庫
epoll庫是Nginx服務器支持的高性能事件之一,它是公認的很是優秀的事件驅動模型,和poll和select有很大的不一樣,屬於poll庫的一個變種,他們的處理方式都是建立一個待處理事件列表,而後把這個事件列表發送給內核,返回的時候,再去輪詢檢查這個列表,以判斷事件是否發生。
readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏 # select中第1個參數表示inputs中發生變化的句柄放入readable。 # select中第2個參數表示outputs中的值原封不動的傳遞給writeable。
# select中第3個參數表示inputs中發生錯誤的句柄放入exeptional.
import socket import queue import select input_list=[] output_list=[] conn_list=[] my_dict={} server1=socket.socket() server1.setblocking(False)#默認爲阻塞IO server1.bind(('localhost',6666)) server1.listen() input_list.append(server1) while True: readable, writeable, exeptional = select.select(input_list, output_list, input_list) # io輪詢監控 多路複用 # print(readable) for i in readable: if i is server1: print('新建鏈接') conn, addr = i.accept() # 注意accept的位置 conn.setblocking(False) input_list.append(conn) my_dict[conn] = queue.Queue() # 生成一個字典key key對應value爲字典 else: print('in else') try: data = i.recv(1024) print('-----------------------------', len(data)) if data: print('接收到消息') print(data.decode()) my_dict[i].put(data) print(my_dict) output_list.append(i) print('outputlist', output_list) else: print('斷開') input_list.remove(i) except Exception as areyouok: print(areyouok, i) input_list.remove(i) if i in output_list: output_list.remove(i) del my_dict[i] for i in writeable: print('writeable----------') data = my_dict[i].get() print(data.decode()) i.send(data) output_list.remove(i) for i in exeptional: input_list.remove(i) if i in output_list: output_list.remove(i) del my_dict[i]
seletor在select、poll、epoll基礎上封裝,若是系統支持eqoll優先epoll
#step 1 生成一個selector對象,至關於監聽列表
still_listen=selectors.DefaultSelector()
#step 2 定義方法
#等待鏈接的方法 def wait_for_connect(server,mask): conn,addr=server1.accept() ... #等待消息的方法 def conn_wait_for_message(conn,mask): data=conn.recv(1024) ...
#Step 3 建立socket
socket.socket()、bind(('127.0.0.1',9999))、listen(10000)、.setblocking(False)
#step 4 註冊及卸載 selectors.EVENT_READ/ x.register /x.unregister(conn)(相似於gevent.spawn)
still_listen.register(server1,selectors.EVENT_READ,wait_for_connect)
still_listen.register(conn,selectors.EVENT_READ,conn_wait_for_message)
#step 5 開始監聽
infor_come=still_listen.select()#無返回時阻塞 for key,mask in infor_come: choice_func=key.data #根據註冊的方式獲取須要調用的函數 choice_func(key.fileobj,mask)#傳入對象(已創建的鏈接conn或新建鏈接server)
import selectors import socket #ulimit - n #ulimit -SHn #step 1 selectors.DefaultSelector()生成一個額select對象 still_listen=selectors.DefaultSelector() #step 2 定義方法 #等待鏈接的方法 def wait_for_connect(server,mask): conn,addr=server1.accept() print('accepted', conn, 'from', addr) conn.setblocking(False) still_listen.register(conn,selectors.EVENT_READ,conn_wait_for_message) #等待消息的方法 def conn_wait_for_message(conn,mask): try: data=conn.recv(1024) print(conn,'message coming') if data: conn.send(data) else: print('close...') still_listen.unregister(conn) conn.close() except Exception as ex: print(ex) still_listen.unregister(conn) conn.close() server1=socket.socket() server1.bind(('127.0.0.1',9999)) server1.listen(10000) server1.setblocking(False) #step 3 註冊方法 x.register 與卸載方法 x.unregister(conn) still_listen.register(server1,selectors.EVENT_READ,wait_for_connect) print('運行成功,開始監聽端口。。。') #step 4 開始監聽 while True: infor_come=still_listen.select()#無返回時阻塞 for key,mask in infor_come: choice_func=key.data #根據註冊的方式獲取須要調用的函數 choice_func(key.fileobj,mask)#傳入對象(已創建的鏈接conn或新建鏈接server)
實際就是循環創建鏈接
import socket socket_List=[]#用於存放鏈接對象 t1=time.time()#開始時間 for i in range(1000):#生成一千個鏈接對象 x=socket.socket() socket_List.append(x) print('準備開始鏈接服務器') t2=time.time() for x in socket_List: x.connect(('192.168.99.106', 8999))#對象開始正式鏈接服務器 print('鏈接創建完畢') messgae_list=['it is the first message','it is the second message','it is the third message'] t3=time.time() for mes in messgae_list: print(mes) for s in socket_List:#每一個鏈接開始發送內容 print('%s: sending "%s"' % (s.getsockname(), mes) ) s.send(mes.encode('utf-8')) for s in socket_List:#每一個鏈接開始接收內容 data=s.recv(1024) print('%s: received "%s"' % (s.getsockname(), data)) if not data: print(sys.stderr, 'closing socket', s.getsockname()) t4=time.time() print(t2-t1) print(t3-t2) print(t4-t3) #輸出時間
os.walk()的使用!!!