python筆記-10(socket提高、paramiko、線程、進程、協程、同步IO、異步IO)

1、socket提高

一、熟悉socket.socket()中的省略部分

socket.socket(AF.INET,socket.SOCK_STREAM)

二、send與recv發送大文件時對於黏包的處理。

2.1 sendall的發送方式與for i in f(局部)python

2.2 使用send、recv交替的方式作一個ack來解決黏包linux

三、socketserver多併發處理

3.1  5種不一樣的socket類nginx

3.1.1 baseserver用於繼承,不對外提供服務sql

3.1.2 tcpserver繼承baseserver類用於處理tcp鏈接shell

3.1.3 unixstreamserver繼承tcpserver使用UNIX域套接字實現面向數據流協議(sock_stream --->tcp)編程

3.1.4 udpserver繼承tcpsever用於處理udp鏈接windows

3.1.5 unixdatagramserver繼承tcpserver使用針對UNIX域套接字來處理數據報式協議(sock_dgram--->udp)安全

3.2 socketserver的實現服務器

3.2.1 step1 定義一個類,存放handler方法多線程

須要繼承socketserver.BaseRequestHandler,以後每生成一個新鏈接,都會實例化一個類,並調用這個handler方法,和客戶端全部的交互都是在這個handler裏面定義的

class myclass(socketserver.BaseRequestHandler):
    def handle(self):
        print('xxx')
        self.request.send(b'ok')

3.2.2 step2 定義一個實例,此實例關聯上一步的handler、並綁定ip和及端口,監聽端口處理新發起的鏈接,將新發起的鏈接實例化對像,並交給handler處理。

此處咱們要處理tcp,因此是tcpserver。考慮到併發,此處使用threading多線程實現,或者可使用ForkingServer來處理

s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass)
s2=socketserver.FuckingTCPServer(('localhost',8888),myclass)

3.2.3 step3 server.forever()

s1.serve_forever()
s2.serve_forever()

3.2.4 step4 客戶端對接測試

import socketserver
class myclass(socketserver.BaseRequestHandler):
    def handle(self):
        print('xxx')
        self.request.send(b'ok')
s1=socketserver.ThreadingTCPServer(('localhost',9999),myclass)
s2=socketserver.FockingTCPServer(('localhost',8888),myclass)
s1.serve_forever()
s2.serve_forever()
-------------------------------
import socket
c=socket.socket()
c.connect(('localhost',8888))
print(c.recv(1024).decode())

 

四、斷點續傳的實現->seek

經過byte的方式讀取文件結合使用ab的方式續寫文件的方式來實現斷點續傳,其核心的思路就是,得到當前半成品文件的字節數,使用seek將被操做文件的句柄移動到此處再日後讀取,以此實現斷點續傳

import socket
c1=socket.socket()
c1.connect(('localhost',6666))
'''
f=open(r'E:\L.exe','rb')
count = 0
for i in f:
    c1.send(i)
    count+=1
    if count > 10:
        break

'''
x=input('seek到哪一個位置?')
f=open(r'E:\L.exe','rb')
f.seek(int(x))
print('開始傳輸')
count=0
for i in f:
    print('\r%s'%count)
    count+=1
    c1.send(i)
print('ok')
---------------------
import socket
import os
s1=socket.socket()
s1.bind(('localhost',6666))
s1.listen()
print('----begin----')
conn,addr=s1.accept()
print('鏈接創建')

print(os.path.getsize(r'E:\\xxxx.exe'))
f=open(r'E:\\xxxx.exe','ab')
while True:
data=conn.recv(1024)
f.write(data)
if not data:
print('end')
break
'''
f=open(r'E:\\xxxx.exe','wb')
while True:
data=conn.recv(1024)
f.write(data)
if not data:
print('end')
break
'''

五、optparse模塊的使用

5.1 定義對象

import optparse
parse=optparse.OptionParser()

5.2 添加參數

parse.add_option('-u','--user',dest='user',action='store',type=str,metavar='user',help='Enter User Name!!') 
parse.add_option('-p','--port',dest='port',type=int,metavar='xxxxx',default=3306,help='Enter Mysql Port!!') 

#-u,--user 表示一個是短選項 一個是長選項

#dest='user' 將該用戶輸入的參數保存到變量user中,能夠經過options.user方式來獲取該值

#type=str,表示這個參數值的類型必須是str字符型,若是是其餘類型那麼將強制轉換爲str(可能會報錯)

#metavar='user',當用戶查看幫助信息,若是metavar沒有設值,那麼顯示的幫助信息的參數後面默認帶上dest所定義的變量名

#help='Enter..',顯示的幫助提示信息

#default=3306,表示若是參數後面沒有跟值,那麼將默認爲變量default的值

#parse.set_defaults(v=1.2) #也能夠這樣設置默認值

5.3 監聽

將監聽結果賦值給options和args,一個結果爲屬性一個結果爲列表。

 options,args=parse.parse_args()

5.4 例子

import optparse
class test():
def __init__(self):
parse=optparse.OptionParser()
parse.add_option('-s',dest='x',help='server binding host',metavar='HOST')
parse.add_option('-p',dest='port',help='server binding port')
(options,args)=parse.parse_args()
print(options.x,options.port)
for i in args:
print(i)
F:\ftp服務器_sockserver版\server\core>python main.py -s 0.0.0.0 -p xxx aaa bbb ccc
0.0.0.0 xxx
aaa
bbb
ccc

F:\ftp服務器_sockserver版\server\core>python main.py -h
Usage: main.py [options]

Options:
-h, --help show this help message and exit
-s HOST server binding host
-p PORT server binding port

 

2、paramiko模塊的使用

一、遠程ssh並執行指令返回結果

import paramiko
#step1 實例化ssh
ssh = paramiko.SSHClient()
#step2 
#加上這句話不用擔憂選yes的問題,會自動選上(用ssh鏈接遠程主機時,第一次鏈接時會提示是否繼續進行遠程鏈接,選擇yes)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)#家目錄/.ssh/known_hosts
#step3 鏈接
ssh.connect(hostname='xxx.cm',port=22,username='xxxxx',password='xxxxx')
#step4 執行
stdin,stdout,stderr=ssh.exec_command('df -h;pwd')#能夠用;執行多個指令

x1=stdout.read()
x2=stderr.read()

result=x1 if x1 else x2#三元運算
print(result.decode())

#step5 關閉
ssh.close()
------------------------

Filesystem Size Used Avail Use% Mounted on
/dev/sda1 7.9G 3.8G 3.8G 51% /
none 3.9G 4.0K 3.9G 1% /dev/shm
/dev/sda2 8.7G 298M 7.9G 4% /xxx/conf
/dev/sda3 893G 88G 761G 11% /xxx/data
/xxx/data/home/xxxxxx

二、transport文件遠程scp文件

import paramiko
#step1 寫鏈接信息
linkit=paramiko.Transport('xxx.cm',22)
linkit.connect(username='xxxxx',password='xxxx')
#step2 建立鏈接對象
sftp_object=paramiko.SFTPClient.from_transport(linkit)
#step3 上傳下載文件 
sftp_object.put('config.conf','tmpfromwin')#上傳 
# config.conf 爲本地文件  tmpfromwin爲本地文件上傳到服務器上的文件名 
sftp_object.get('xxx.zip',r'f:\x.zip')#下載
#xxx.zip爲服務器的文件名  r'f:\x.zip'本地保存的位置及文件名

三、免密登陸

 3.1 免密登陸的思路

PC-A 生成公鑰和私鑰

PC-A 將公鑰發給PC B

PC-A 無密碼登陸PC-B

Python 調用pca的私鑰便可完成對pcb的無密碼登陸

3.2 免密登陸的key生成

pc-A
[root@localhost ~]# ssh-keygen Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: SHA256:cNTWlMyJXaOFTQMY3dFKCIywHfVzgmpXvjt0vVfuZao root@localhost.localdomain The key's randomart image is: +---[RSA 2048]----+ | ..o=+%o@Bo | | +..*o@o+o.| | o o.. *... | | o . o +. | | S . . . | | . . ... o| | ... .=| | .. ++| | Eo..o| +----[SHA256]-----+ [root@localhost ~]# [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 總用量 8 -rw-------. 1 root root 1675 4月 29 12:44 id_rsa -rw-r--r--. 1 root root 408 4月 29 12:44 id_rsa.pub [root@localhost .ssh]# ssh-copy-id -p 22 root@192.168.99.172 /usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub" The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? /usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed The authenticity of host '192.168.99.172 (192.168.99.172)' can't be established. ECDSA key fingerprint is SHA256:JRJkZRzFncdiupBqjji0LP6XNMQ9eSdFKm0wVoDp8RY. ECDSA key fingerprint is MD5:f3:c8:77:ef:15:36:b7:3f:b9:36:bd:1a:4e:1a:5d:33. Are you sure you want to continue connecting (yes/no)? yes /usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys root@192.168.99.172's password: Number of key(s) added: 1 Now try logging into the machine, with: "ssh -p '22' 'root@192.168.99.172'" and check to make sure that only the key(s) you wanted were added. [root@localhost .ssh]# ssh 192.168.99.172 Last login: Sun Apr 29 12:39:28 2018 from 192.168.99.239 [root@localhost ~]# exit 登出 Connection to 192.168.99.172 closed. ----------------------------------------------------------

PC-B drwx
------. 2 root root 29 4月 29 12:48 .ssh [root@localhost ~]# cd .ssh/ [root@localhost .ssh]# ll 總用量 4 -rw-------. 1 root root 408 4月 29 12:48 authorized_keys [root@localhost .ssh]# cat authorized_keys ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDMeTAz8cAtlZON+sUMSYpTXjhW32IkAdE+336BwW9LotWGznIOoS7d6cdvY...

 3.3 python的實現

import paramiko
#step 1 獲取私鑰
pk=paramiko.RSAKey.from_private_key_file('F:\id_rsa')
#step 2 鏈接服務器,設置用戶名並關聯私鑰
link1=paramiko.Transport('xxx.cm',22)
link1.connect(username='xxxx',pkey=pk)
#step 3 建立transport對象 收發文件
myobject=paramiko.SFTPClient.from_transport(link1)
myobject.get('xxx.sh',r'f:\xxx.sh')

3.4 一則網上的代碼,此處先不做分析

import paramiko
import time
def verification_ssh(host,username,password,port,root_pwd,cmd):
   s=paramiko.SSHClient()
   s.load_system_host_keys()
   s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
   s.connect(hostname = host,port=int(port),username=username, password=password)
   if username != 'root':
       ssh = s.invoke_shell()
       time.sleep(0.1)
       ssh.send('su - \n')
       buff = ''
       while not buff.endswith('Password: '):
           resp = ssh.recv(9999)
           buff +=resp
       ssh.send(root_pwd)
       ssh.send('\n')
       buff = ''
       while not buff.endswith('# '):
           resp = ssh.recv(9999)
           buff +=resp
       ssh.send(cmd)
       ssh.send('\n')
       buff = ''
       while not buff.endswith('# '):
           resp = ssh.recv(9999)
           buff +=resp
       s.close()
       result = buff
   else:
       stdin, stdout, stderr = s.exec_command(cmd)
       result = stdout.read()
       s.close()
   return result

 

3、多線程 多進程的引入

 

一、進程、線程、協程的基本概念(兩個最小,須要分清楚)

1.1 進程是系統進行資源分配和的基本單位

1.2 線程是CPU運算調度的最小單位

1.3 協程比線程還小的單位

二、進程和線程的關係

2.1 進程就是資源的集合,這些資源包括對內存、對硬盤、對光驅等各類資源的調用的集合。

2.2 線程包含在進程中(進程是線程的容器)是進程的實際運算單位。

2.3 一個進程能夠包括一個或多個線程

2.4 進程須要操做cpu 必須經過建立線程來操做

2.5 一個線程,實際就是一個cpu的控制流,一個進程中能夠併發多個線程

2.6 程序是指令、數據及其組織形式的描述,進程是程序的實體。

三、多線程、多進程、進程上下文

3.1 一個進程是有一系列的相關的資源的集合一個進程的資源包括了memory page (內存頁,存儲頁,存儲頁面),文件句柄,socket鏈接,一些安全信息 如誰的id啓動了這個進程

3.2 進程上下文,意思是可執行程序代碼是進程的重要組成部分。進程上下文其實是進程執行活動全過程的靜態描述。

3.3 每一個進程執行過的、執行時的以及待執行的指令和數據;在指令寄存器、堆棧、狀態字寄存器等中的內容。此外, 還包括進程打開的文件描述符等.

3.4 多個線程共享進程上下文(即資源)

3.5 一個進程的全部線程 共享同一塊內存空間

3.6 線程快仍是進程快 沒有可比性 一個是資源的集合 一個是cpu的控制流,進程須要執行 也必須經過建立線程來執行

3.7 線程建立快 仍是進程建立快 線程建立快 它就是一段cpu的指令集,而進程須要去申請各類資源組成集合

3.8 線程共享內存空間 進程間資源互相獨立

3.8.1一個父進程建立子進程,至關於克隆了一份獨立的內容 多個子進程直接的資源是不能互相訪問的

3.8.2 一個線程,建立多個線程都共享同一個進程的資源

3.9同一個進程的多個線程直接能直接聯繫交流交互,兩個進程想通訊交流 要找一箇中間代理

3.10

3.10.1對於一個主線程的修改 可能或影響其餘線程的運行(由於共享資源)

3.10.2對於父進程的修改,不會影響到子進程(資源獨立)

4、線程語法詳解

一、建立多線程

兩步,一步關聯函數和變量,一步start

import threading
import time
def PointIt(x):
    print('---->',x)
    time.sleep(5)
    print(x,'--->ok')
PointIt('n1')
PointIt('n2')
#對比n1/n2/n3/n4的輸出用時
#step 1 建立進程,target爲函數的函數名,args內輸入函數的參數
n3=threading.Thread(target=PointIt,args=('n3',))#逗號不能漏
n4=threading.Thread(target=PointIt,args=('n4',))#逗號不能漏
#step 2 start
n3.start()
n4.start()
print('done')

二、面向對象的方式建立多線程

import threading
import time
#step 1 創建一個類,須要繼承線程類
class Mythread_class(threading.Thread):
    #step 2定義構造函數,接收須要輸入的變量
    def __init__(self,n):
        super(Mythread_class, self).__init__()
        self.n=n
    #step 3 定義run ->這裏必須是run這個名字
    def run(self):
        print(self.n)
        time.sleep(3)
        print(self.n,'done')
#step 4 生成實例,每一個實例運行start方法都會啓動一個線程運行run方法
t1=Mythread_class('t1')
t2=Mythread_class('t2')
t1.run()#這樣不行 這樣仍是單線程
t2.run()#若是直接是run方法,仍是單線程
t1.start()
t2.start()#多線程

for i in range(5):
    x=Mythread_class('x-%s'%i)
    x.start()
print('主線程done')

程序運行的主線程在啓動子線程以後不會等子線程執行完畢 而是繼續運行主線程

一樣,在主線程中也沒法計算出子線程運行所花費的時間

三、join()方法的使用

若是run()運行結束則join()結束,若是run()沒有完成,則卡住等待完成。經過這個方法來判斷各個線程都運行結束所花費的時間

import threading
import time

class Mythread(threading.Thread):
def __init__(self,n):
super(Mythread, self).__init__()
self.n=n
def run(self):
print('t-%s is running,當前線程爲%s,當前活躍線程數爲%s'%(self.n,threading.current_thread(),threading.active_count()))
time.sleep(3)
print('t-%s done'%self.n)
time1=time.time()
t_object=[]#使用這個列表來記錄線程對象
for i in range(10):
x=Mythread(i)
x.start()
t_object.append(x)
print('main',threading.current_thread())
for i in t_object:
i.join()
time2=time.time()
print(time2-time1)
------------------------------------------
t-0 is running,當前線程爲<Mythread(Thread-1, started 2712)>,當前活躍線程數爲2
t-1 is running,當前線程爲<Mythread(Thread-2, started 9332)>,當前活躍線程數爲3
t-2 is running,當前線程爲<Mythread(Thread-3, started 2696)>,當前活躍線程數爲4
t-3 is running,當前線程爲<Mythread(Thread-4, started 9884)>,當前活躍線程數爲5
t-4 is running,當前線程爲<Mythread(Thread-5, started 3272)>,當前活躍線程數爲6
t-5 is running,當前線程爲<Mythread(Thread-6, started 9208)>,當前活躍線程數爲7
t-6 is running,當前線程爲<Mythread(Thread-7, started 1828)>,當前活躍線程數爲8
t-7 is running,當前線程爲<Mythread(Thread-8, started 9256)>,當前活躍線程數爲9
t-8 is running,當前線程爲<Mythread(Thread-9, started 1060)>,當前活躍線程數爲10
t-9 is running,當前線程爲<Mythread(Thread-10, started 9780)>,當前活躍線程數爲11
我是主線程main <_MainThread(MainThread, started 2056)>
t-0 done
t-4 done
t-5 done
t-2 done
t-3 done
t-1 done
t-8 done
t-9 done
t-7 done
t-6 done
3.003652811050415

四、守護線程

當主線程執行完畢後,主線程結束,則全部守護線程所有結束,不管運行到什麼狀態  setDeamon

import threading
import time
class Myclass(threading.Thread):
    def __init__(self,n):
        super(Myclass, self).__init__()
        self.n=n
    def run(self):
       print(self.n,'begin')
       time.sleep(3)
       print(self.n,'------------->done','目前還活躍的線程數:',threading.active_count())

tmp_list=[]
for i in range(20):
    j=Myclass(i)
  j.setDaemon(True)#setdaesmon來設置爲守護進程
    j.start()
    tmp_list.append(j)
time.sleep(3)
print('我要結束主進程了')
-----------------------------------------------
0 begin
1 begin
2 begin
3 begin
4 begin
5 begin
6 begin
7 begin
8 begin
9 begin
10 begin
11 begin
12 begin
13 begin
14 begin
15 begin
16 begin
17 begin
18 begin
19 begin
1 ------------->done 目前還活躍的線程數: 21
0 ------------->done 目前還活躍的線程數: 20
4 ------------->done 目前還活躍的線程數: 19
3 ------------->done 目前還活躍的線程數: 18
6 ------------->done 目前還活躍的線程數: 17
5 ------------->done 目前還活躍的線程數: 16
2 ------------->done 目前還活躍的線程數: 15
11 ------------->done 目前還活躍的線程數: 14
12 ------------->done 目前還活躍的線程數: 14
10 ------------->done 目前還活躍的線程數: 14
7 ------------->done 目前還活躍的線程數: 12
9 ------------->done 目前還活躍的線程數: 12
14 ------------->done 目前還活躍的線程數: 11
8 ------------->done 目前還活躍的線程數: 9
我要結束主進程了

五、GIL 全局解釋器鎖

5.1 In CPython,This lock is necessary mainly because CPython’s memory management is not thread-safe. 

5.2 多個線程都打到多個cpu的核上 可是同一時刻只能有一個線程在真正的工做

5.3 全局解釋器鎖(Global Interpreter Lock)是計算機程序設計語言解釋器用於同步線程的工具,使得任什麼時候刻僅有一個線程在執行

5.4 一個python解釋器進程內有一條主線程,以及多條用戶程序的執行線程。即便在多核CPU平臺上,因爲GIL的存在,因此禁止多線程的並行執行。

5.5 Python 3.2開始使用新的GIL。

5.6 能夠建立獨立的進程來實現並行化

六、互斥鎖

一個全局變量,每一個線程都要對其進行操做,爲了防止一個線程還未對其操做完畢,即這個變量的值還未被原線程修改時,這個變量的值就被傳到下一個線程進行操做,這樣有可能照成最終的結果不許確。爲了防止這種狀況,使用互斥鎖解決

step1 實例一個鎖對象

step2 acqiure()方法->鎖變量

step3 release()方法釋放變量

import threading
import time
mylock=threading.Lock()
num=0
class Myclass(threading.Thread):
   def __init__(self,n):
      super(Myclass, self).__init__()
      self.n=n
   def run(self):
       print(self.n,'begin')
       mylock.acquire()
       time.sleep(3)
       global num
       num += 1
       mylock.release()
       time.sleep(1)
      #mylock.release()使用完後迅速釋放
tmp_list=[]
for i in range(5000):
    x=Myclass(i)
    x.start()
    tmp_list.append(x)
for i in tmp_list:
    i.join()
print(num)

七、遞歸鎖

mylock=threading.RLock()#RLock 遞歸鎖

import threading
mylock=threading.RLock()#RLock 遞歸鎖

number1=0
number2=1111
count=0
def run1():
  mylock.acquire()
  global number1
  number1+=1
  mylock.release()

def run2():
  #mylock.acquire() 只能套兩層鎖,若是套三層鎖,即把這個註釋去掉,就卡死了
  global number2
  number2+=2
  #mylock.acquire()

def terminal_fun():
  mylock.acquire()
  global count
  run1()
  print('between run1 with run2')
  run2()
  print('count:',count)
  count+=1
  mylock.release()

for i in range(5):
  x=threading.Thread(target=terminal_fun)
  x.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('ok')
---------------------------------


between run1 with run2
count: 0
between run1 with run2
count: 1
between run1 with run2
count: 2
between run1 with run2
count: 3
between run1 with run2
count: 4
ok

此代碼說明:注意主程序結束的寫法法,本程序使用的是:等待活躍線程數等於1結束的方法

八、信號量,

不能單純理解爲線程併發數

threading.BoundedSemaphore(3) 同時訪問資源的線程數量,但不是活躍的線程數量

線程中,信號量主要是用來維持有限的資源,使得在必定時間使用該資源的線程只有指定的數量

是一個變量,控制着對公共資源或者臨界區的訪問。信號量維護着一個計數器,指定可同時訪問資源或者進入臨界區的線程數。 

每次有一個線程得到信號量時,計數器-1。若計數器爲0,其餘線程就中止訪問信號量,直到另外一個線程釋放信號量。 

如下代碼,io爲一個信號量,查看活躍的線程,實際全部的線程都跑起來了,可是,能輸出的只有三個線程

import threading
import time

limit_num = threading.BoundedSemaphore(3)  # 控制併發,不是三個一組,釋放一個,就新增一個


def run(x):
    limit_num.acquire()
    print('i am %s' % x, threading.active_count())
    time.sleep(3)
    print('i am %s' % x, threading.active_count())
    limit_num.release()

for i in range(20):
    x = threading.Thread(target=run, args=(i,))
    x.start()

while threading.active_count() != 1:
    pass
else:
    print('done!')
-------------------------------------------
i am 0 2
i am 1 3
i am 2 4
i am 1 21
i am 2 21
i am 4 20
i am 0 19
i am 3 18
i am 5 18
i am 3 18
i am 4 18
i am 6 16
i am 7 16
i am 5 16
i am 8 15
i am 7 15
i am 6 15
i am 9 13
i am 10 13
i am 8 13
i am 11 12
i am 10 12
i am 9 12
i am 11 11
i am 13 10
i am 14 9
i am 12 9
i am 14 9
i am 13 9
i am 12 9
i am 15 7
i am 17 7
i am 16 7
i am 15 6
i am 17 5
i am 18 5
i am 16 5
i am 19 3
i am 18 3
i am 19 2
done!

九、事件 event

可用於多個線程間同步信息 如同一個線程控制紅綠燈,一個線程等待紅綠燈

四個知識點

9.1 生成事件 threading.Event()

9.2 set事件 x.set()

9.3 clear事件 x.clear()

9.4 wait事件 x.wait()

import threading
import time
my_green_red_light=threading.Event() def light():#負責對light event 進行set或clear
  time_count=0
  my_green_red_light.set()   while True:
     if time_count >8 and time_count < 16:
         my_green_red_light.clear()          print('\033[41;1mred\033[0m')
     elif time_count > 16:
         my_green_red_light.set()
         time_count=0
     else:
         print('\033[42;1mgreen\033[0m')
     time.sleep(0.3)
     time_count+=1
def car(x):
   while True:
       if my_green_red_light.is_set():
           print('car [%s] is running'%x)
       else:
           print('is red now,wait for green....')
           my_green_red_light.wait()            print('green now ,go go go !!!')
       time.sleep(0.8)

light_thread=threading.Thread(target=light)
car_thread=threading.Thread(target=car,args=('tesla',))
light_thread.start()
car_thread.start()
------------------------------
green
car [tesla] is running
green
green
car [tesla] is running
green
green
green
car [tesla] is running
green
green
car [tesla] is running
green
red
red
is red now,wait for green....
red
red
red
red
red
green
green now ,go go go !!!
green
green
car [tesla] is running
green
green
green
car [tesla] is running
green

十、隊列與堆棧 queue模塊

使用隊列的意義

一、解耦 是程序之間實現雙耦合 經過生產者消費者模型

二、提升處理效率

10.1 q.qsize()獲取隊列長度

10.2 q.put()放元素

10.3 q.get()拿元素

10.4 q.nowait()是否等待(當即響應)

10.5 q.get(block=False)是否阻塞

10.6 q.get(timeout=1)響應超時

10.7 q=queue.Queue(maxsize=3)隊列長度

10.8 q=queue.LifoQueue()(last in first out 堆棧)

import queue

>>> import queue
>>> x=queue.Queue(maxsize=5)
>>> x.put(1)
>>> x.put(2)
>>> x.put(3)
>>> x.put(4)
>>> x.put(5,timeout=1)
>>> x.put(6,timeout=1)
Traceback (most recent call last):
File "<pyshell#8>", line 1, in <module>
x.put(6,timeout=1)
queue.Full
>>> x.get()
1
>>> x.get()
2
>>> x.get()
3
>>> x.get()
4
>>> x.get(block=False)
5
>>> x.get(block=False)
Traceback (most recent call last):
File "<pyshell#14>", line 1, in <module>
x.get(block=False)
queue.Empty
>>> 


>>> y=queue.LifoQueue()
>>> y.put(1)
>>> y.put(2)
>>> y.put(3)
>>> y.get()
3
>>> y.get()
2
>>> y.get()
1


>>> z=queue.PriorityQueue()
>>> z.put((1,'123'))
>>> z.put((-1,'456'))
>>> z.put((5,'789'))
>>> z.get
<bound method Queue.get of <queue.PriorityQueue object at 0x000001A41A496550>>
>>> z.get()
(-1, '456')
>>> z.get()
(1, '123')
>>> z.get()
(5, '789')
>>> 
>>> x.qsize()
0
>>> 

十一、生產者消費者模型(同步隊列)

import queue
import time
import threading

q=queue.Queue(maxsize=5)

def product_it(name,t):
    count=0
    while True:
       data='骨頭%s'%count
       q.put(data)
       print('[%s]生成[%s]'%(name,data))
       count+=1
       time.sleep(t)
def eat_it(name,t):
     while True:
          print('[%s]吃了骨頭[%s]'%(name,q.get()))
          time.sleep(t)

t1=threading.Thread(target=product_it,args=('北京店',4,))
t2=threading.Thread(target=product_it,args=('上海店',0.3,))
t3=threading.Thread(target=eat_it,args=('小明',5,))
t4=threading.Thread(target=eat_it,args=('小王',2,))
t1.start()
t2.start()
t3.start()
t4.start()
------------------------------
[北京店]生成[骨頭0]
[上海店]生成[骨頭0]
[小明]吃了骨頭[骨頭0]
[小王]吃了骨頭[骨頭0]
[上海店]生成[骨頭1]
[上海店]生成[骨頭2]
[上海店]生成[骨頭3]
[上海店]生成[骨頭4]
[上海店]生成[骨頭5]
[小王]吃了骨頭[骨頭1]
[上海店]生成[骨頭6]
[小王]吃了骨頭[骨頭2]
[上海店]生成[骨頭7]
[小明]吃了骨頭[骨頭3]
[北京店]生成[骨頭1]

 十二、總結:多線程實現同步的四種方式(鎖機制(互斥,迭代,gli)、條件變量(event)、信號量和同步隊列)

1三、python中多線程的使用場景

cpu密集型,io密集型

線程佔用cpu資源 python的多線程實際是經過gil鎖進行上下文切換 某個時間點上 只有一個線程在跑

若是cpu只有一個核心 那麼 不管有多少個線程 同一時間,只可能有一個線程在被cpu處理

線程之間經過快速切換執行,使使用者感受同一時刻多線程並行的效果,但單核狀況實際是串行的

若是多核心 理論上來講 不一樣的核心同一時刻能夠運行不一樣的線程

可是在python中 因爲考慮到線程間數據共享的狀況 python內存在GLI鎖,使得同一時間內,只有一個線程運行 不管多少核 這個是python的侷限性

python語言誕生時 cpu只有單核 開發者沒有考慮多核的狀況,

cpu在執行指令時須要知道上下文關係 因此python在啓用線程的時候 調用的是c語言的thread接口 並將這個關係傳給cpu (和互斥鎖相似 有一個全局變量+1)

若是線程同時操做,拿到的上下文可能同樣 因此爲了不上下文雷同 使用gli鎖來確保同一時刻只有一個線程在執行

 5、進程詳解

一、進程的引入

1.1 進程間是獨立的

1.2  python的進程是操做系統的原生進程,進程間管理是由操做系統來完成的,且進程間的數據相互獨立也不須要鎖這個概念。因此python的多進程 能夠解決python的多核問題

二、建立多進程的方法

multiprocessing.Process(target=run,args=(i,))

三、經過建立多進程,進程中使用多線程來充分利用cpu資源

import multiprocessing
import time
import threading

def run(x):
    print('process %s is running'%x)
    t=threading.Thread(target=mythread,args=(x,))
    t.start()
    print('process %s is done'%x)

def mythread(x):
   print('進程%s啓用的線程的線程號是:%s'%(x,threading.get_ident()))#打印線程號


if __name__=='__main__':
   for i in range(10):
       x=multiprocessing.Process(target=run,args=(i,))
       x.start()
-------------------------------------------
process 4 is running
process 4 is done
進程4啓用的線程的線程號是:2172
process 0 is running
process 0 is done
進程0啓用的線程的線程號是:764
process 2 is running
process 6 is running
process 2 is done
進程2啓用的線程的線程號是:8236
process 9 is running
process 3 is running
進程6啓用的線程的線程號是:2544
process 6 is done
process 5 is running
process 7 is running
進程9啓用的線程的線程號是:3536
process 9 is done
process 3 is done
進程3啓用的線程的線程號是:484
process 5 is done
process 7 is done
進程5啓用的線程的線程號是:9624
進程7啓用的線程的線程號是:1464
process 1 is running
進程1啓用的線程的線程號是:1764
process 1 is done
process 8 is running
進程8啓用的線程的線程號是:3516
process 8 is done

此處能夠觀察cpu的使用狀況,能發現cpu的使用率快速提高

四、線程號、進程號、父進程號的查看

threading.get_ident()/os.getpid()/os.getppid()
import multiprocessing
import os

def info():
   tmp_info ='''\tname:%s\n\tppid:%s\n\tpid :%s'''%(__name__,os.getppid(),os.getpid())
   print(tmp_info)

def run():
    print('子進程信息'.center(50,'-'))
    x = multiprocessing.Process(target=run2)
    x.start()
    info()
def run2():
    print('子進程信息'.center(50,'-'))
    info()

if __name__=="__main__":
    print('父進程信息'.center(50,'-'))
    info()
    x=multiprocessing.Process(target=run)
    x.start()    
---------------------------------------------------------------
----------------------父進程信息-----------------------
name:__main__
ppid:2592
pid :10720
----------------------子進程信息-----------------------
name:__mp_main__
ppid:10720
pid :7752
----------------------子進程信息-----------------------
name:__mp_main__
ppid:7752
pid :5696

五、不一樣進程間通訊的方法

5.1 進程隊列方式進行進程間通訊

此處須要對比進程隊列與線程隊列的區別

queue.Queue()普通的隊列,在一個進程中使用

multithreading.Queue 原理爲pickle序列號與反序列化,並非資源共享

import multiprocessing
import threading
import queue
#進程queue與線程queue

def main(my_queue):
   print(my_queue.get())
   print(my_queue.get())
   print(my_queue.get())

def main2(x):
   x.put(1)
   x.put(2)
   x.put(3)

if __name__=='__main__':
     my_queue = queue.Queue()
     my_queue.put('[a]')
     my_queue.put('[b]')
     my_queue.put('[c]')
     t1 = threading.Thread(target=main,args=(my_queue,))
     t1.start()#子線程訪問主線程資源
     #t1 = threading.Thread(target=main)此處不將隊列傳入,線程依然能調用主線程的隊列
     #t1.start()  # 子線程訪問主線程資源

     #p1=multiprocessing.Process(target=main,args=(my_queue,))
     #p1.start()#子進程訪問主進程資源
     #即便是父進程將進程queue作完參數傳給子進程,子進程依然沒法訪問這個queue

     Q = multiprocessing.Queue()#進程queue multiprocessing.Queue()
     p2=multiprocessing.Process(target=main2,args=(Q,))#將隊列傳入子進程
     p2.start()
     p2.join()#等待子進程執行完畢,確保都入隊成功
     print(Q.get())
     print(Q.get())
     print(Q.get())
----------------------------------------
[a]
[b]
[c]
1
2
3

5.2 進程通道pip方式進行進程間通訊

multiprocessing.Pipe()結合send和recv來使用
import multiprocessing
import threading
import time

def f(x1):
   print('f')
   x1.send('hello,i am p1')
   print(x1.recv())
def f2(x2):
    print('f2')
    time.sleep(3)
    print(x2.recv())
    x2.send('ok , p2 ack')

if __name__=='__main__':
    x1, x2 = multiprocessing.Pipe()
    p1=multiprocessing.Process(target=f,args=(x1,))
    p2=multiprocessing.Process(target=f2,args=(x2,))
    p1.start()
    p2.start()
------------------------
f
f2
hello,i am p1
ok , p2 ack

5.3 多進程共同修改字典列表的方法manager().dict()、manager().list()

import multiprocessing
import os

def f(x,y):
    x[os.getpid()]=os.getpid()#x是一個字典,給x這字典添加一個key和value
    y.append(os.getpid())#y是一個列表,給y這個列表添加一個value
    print(x,y)
if __name__=='__main__':
     with multiprocessing.Manager() as xxx:
         x=xxx.dict()#生成一個字典 一個多進程manage的字典
         y=xxx.list(range(5))#列表生成 生成一個列表 一個多進程manage的列表
         p_list=[]#主進程列表
         for i in range(5):
            p=multiprocessing.Process(target=f,args=(x,y,))#給進程manage的列表字典添加元素
            p.start()
            p_list.append(p)#爲了確保是否添加完成
         for i in p_list:
             i.join() #配合上面的start 判斷是否子進程運行結束
------------------------------------------------
{19080: 19080} [0, 1, 2, 3, 4, 19080]
{19080: 19080, 20880: 20880} [0, 1, 2, 3, 4, 19080, 20880]
{19080: 19080, 20880: 20880, 15908: 15908} [0, 1, 2, 3, 4, 19080, 20880, 15908]
{19080: 19080, 20880: 20880, 15908: 15908, 8140: 8140} [0, 1, 2, 3, 4, 19080, 20880, 15908, 8140]
{19080: 19080, 20880: 20880, 15908: 15908, 8140: 8140, 13536: 13536} [0, 1, 2, 3, 4, 19080, 20880, 15908, 8140, 13536]

以上能夠看出,多進程實時修改數據(字典,列表,隊列) 無需互斥鎖。

六、進程鎖

爲何要有進程鎖 ,不一樣進程之間不是不能訪問各自的內存空間

 可是例如屏幕,打印機,投影儀這種資源在進程很是多的狀況下可能會出現例如輸出錯行的狀況,此時就要用到鎖

進程鎖比較少用到

import multiprocessing
import os

def f(x,i):
    x.acquire()
    print(i,'---',os.getpid())
    x.release() if __name__=='__main__':
   lock=multiprocessing.Lock() for i in range(1000):
       p=multiprocessing.Process(target=f,args=(lock,i,))
       p.start()
------------------------

2 --- 3784
4 --- 4544
3 --- 13556
5 --- 4864
0 --- 13100
6 --- 15272
1 --- 20104
8 --- 11800
7 --- 672
9 --- 19056

 

七、進程池

7.1

進程池 用於限制同時運行的進程數量

#運行進程過多致使進程間頻繁切換下降程序效率->根據cpu核心等實際狀況,優化同時運行的進程數量

#對同時運行的進程數進行限制 多進程同時存在,可是隻有固定數量的進程在執行

7.2

step1 實例化一個pool

step2 用實例化的pool去生成進程。須要注意兩種生成進程的方式apply與apply_async一種是單進程執行,一種是多進程執行

import multiprocessing
import os
import time
def f(i):
    print('process',i,'is running...',os.getpid())
    time.sleep(3)
    return i  #傳遞給回調函數

def backtome(xxxx):
    print(xxxx,'is done',os.getpid())


if __name__=='__main__':
     print('主進程的pid爲:',os.getpid())
     mypool=multiprocessing.Pool(processes=5)#設置同一時刻能實際運行的進程數量

     for i in range(20):
         #p=mypool.apply(func=f,args=(i,))#單核串行
         p=mypool.apply_async(func=f,args=(i,),callback=backtome)
     print('end')
     mypool.close()#必須添加close()
     mypool.join()#必須添加join
-------------------------------------------
主進程的pid爲: 12720
end
process 0 is running... 10644
process 1 is running... 5056
process 2 is running... 20972
process 3 is running... 6148
process 4 is running... 19120
process 5 is running... 10644
0 is done 12720
process 6 is running... 5056
1 is done 12720
2 is done 12720
process 7 is running... 20972
3 is done 12720
process 8 is running... 6148
4 is done 12720
process 9 is running... 19120
process 10 is running... 10644
5 is done 12720
6 is done 12720
process 11 is running... 5056
7 is done 12720
process 12 is running... 20972
9 is done 12720
process 13 is running... 19120
8 is done 12720
process 14 is running... 6148
process 15 is running... 10644
10 is done 12720
process 16 is running... 5056
11 is done 12720
process 17 is running... 20972
12 is done 12720
14 is done 12720
process 18 is running... 6148
13 is done 12720
process 19 is running... 19120
15 is done 12720
16 is done 12720
17 is done 12720
18 is done 12720
19 is done 12720

apply無回調方法

import multiprocessing
import os
import time
def f(i):
    print('process',i,'is running...',os.getpid())
    time.sleep(3)
    print('end')
    return i  #傳遞給回調函數

def backtome(xxxx):
    print(xxxx,'is done',os.getpid())


if __name__=='__main__':
     print('主進程的pid爲:',os.getpid())
     mypool=multiprocessing.Pool(processes=5)#設置同一時刻能實際運行的進程數量

     for i in range(20):
         p=mypool.apply(func=f,args=(i,),)#單核串行
         #p=mypool.apply(func=f, args=(i,),callback=backtome)apply無callback函數
         #p=mypool.apply_async(func=f,args=(i,),callback=backtome)
     print('end')
     mypool.close()#必須添加close()
     mypool.join()#必須添加join
--------------------------------
主進程的pid爲: 17912
process 0 is running... 4080
end
process 1 is running... 18092
end
process 2 is running... 5924
end
process 3 is running... 14276
end
process 4 is running... 1800
end
process 5 is running... 4080
end
process 6 is running... 18092
end
process 7 is running... 5924
end
process 8 is running... 14276
end
process 9 is running... 1800
end
process 10 is running... 4080
end
process 11 is running... 18092
end
process 12 is running... 5924
end
process 13 is running... 14276
end
process 14 is running... 1800
end
process 15 is running... 4080
end
process 16 is running... 18092
end
process 17 is running... 5924
end
process 18 is running... 14276
end
process 19 is running... 1800
end
end

6、協程的概念

一、協程概念的理解  單線程 觸發切換來完成並行 不依賴cpu切換上下文。

協程 異步io

協程在單線程狀況下操做

協程是一種用戶態的輕量級線程 -> cpu根本不知道他的存在

線程在切換的時候有cpu的寄存器來保存每一個線程的狀態 每一個線程有本身的上下文

協程在切換時的狀態須要用戶本身將上下文保存到相應的地方。

yelid是協程的一種

在單線程下實現併發的效果,用戶須要本身保存運行狀態

二、協程的好處

2.1 無需線程切換 也就無需cpu在線程切換時上下文切換的開銷

2.2 無需原子操做鎖定及同步開銷 ->相對於多線程的互斥鎖 ->協程是單線程的串行操做

2.3 方便切換控制流 簡化模型

2.4 高併發 高擴展 低成本

三、協程的缺點

協程實質上是一個單線程,沒法利用多核資源

協程須要和進程配合才能在多cpu上運行

四、一些補充

nginx 就是單線程 就能支持上萬個併發

協程能處理上萬併發的思路,遇到io就進行協程切換 -> io完成就自動切換到原協程

#進程是資源分配的最小單位,線程是CPU調度的最小單位.這是計算機裏常常考的

#協程。比線程還小的單位

7、協程的使用

一、yield完成協助的一種形式

import time

def produce_scq(n,x,y):#生產者實際就是個函數 ,實體化生成器並next他
    x.__next__()#這裏很關鍵
    y.__next__()
    print(n,'is producing')
    count=0
    while count < 20:
        x.send('包子%s'%count)
        y.send('包子%s'%count)
        count +=1
    return 'done'

def custom_scq(n): #消費者是生成器 ,等待外部信息輸入
     print('start eat')
     while True:
         baozi=yield
         print(n,'is eating',baozi)
         time.sleep(0.01)


c1=custom_scq('小明')
c2=custom_scq('小東')
print(produce_scq('大明',c1,c2))
---------------------------------------------
start eat
start eat
大明 is producing
小明 is eating 包子0
小東 is eating 包子0
小明 is eating 包子1
小東 is eating 包子1
小明 is eating 包子2
小東 is eating 包子2
小明 is eating 包子3
小東 is eating 包子3
小明 is eating 包子4
小東 is eating 包子4
小明 is eating 包子5
小東 is eating 包子5
小明 is eating 包子6
小東 is eating 包子6
小明 is eating 包子7
小東 is eating 包子7
小明 is eating 包子8
小東 is eating 包子8
小明 is eating 包子9
小東 is eating 包子9
小明 is eating 包子10
小東 is eating 包子10
小明 is eating 包子11
小東 is eating 包子11
小明 is eating 包子12
小東 is eating 包子12
小明 is eating 包子13
小東 is eating 包子13
小明 is eating 包子14
小東 is eating 包子14
小明 is eating 包子15
小東 is eating 包子15
小明 is eating 包子16
小東 is eating 包子16
小明 is eating 包子17
小東 is eating 包子17
小明 is eating 包子18
小東 is eating 包子18
小明 is eating 包子19
小東 is eating 包子19
done

二、協程手動切換 greenlet的使用

step1 greenlet.greenlet(xxx)

step2 xxx.switch

import multiprocessing
import greenlet

def test1():
   print(12)
   g2.switch()
   print(56)
   g2.switch()
def test2():
   print(34)
   g1.switch()
   print(78)

g1=greenlet.greenlet(test1)
g2=greenlet.greenlet(test2)
#實例化兩個協程來運行函數,實際此時函數還未運行
g1.switch()#開始執行g1
---------------------------------------------
12
34
56
78

三、gevet的使用 gevent是greenlet的第三方庫,能夠自動切換協程

切換思路,遇到io或非cpu操做時,自動切換,讓cpu繼續處理下一個指令

gevent.sleep與gevent.joinall()、gevent.spawn的搭配使用

import gevent

def f1():
      print('f1 run')
      gevent.sleep(2)
      print('f1 done')
def f2():
      print('f2 run')
      gevent.sleep(1)
      print('f2 done')
def f3():
      print('f3 run')
      gevent.sleep(0)
      print('f3 done')
gevent.joinall([gevent.spawn(f1),gevent.spawn(f2),gevent.spawn(f3)])
#生成協程
----------------------------------------
f1 run
f2 run
f3 run
f3 done
f2 done
f1 done

#gevent.sleep 模擬io操做 -》 joinall 所有執行 gevent.spawn 生成協程

f1 -> sleep -> f2 -> sleep -> f3 -> f1還未執行完成 -> f2 還未完成 -> f3 ok -> f3 done -> f1 還未完成-> f2 還未完成->f1 還未完成->f2 ok -> f1 還未完成 -> f1 ok

四、使用協程gevent,搭配monkey.patch_all()來對比urllib.request模塊url.openget多個網頁的時間

import urllib
import gevent,time
#step 1  導入模塊
from urllib import request
from gevent import monkey
#step 2 監控IO
monkey.patch_all()


#step 3
def f(url):
    print('GET:',url)
    res=request.urlopen(url)#獲取數據

# step 4 用來展現get到的這個url有多少data
    data=res.read()#讀數據
    print('bytes:',len(data),'url:',url)

t1=time.time()#獲取程序開始時間
f('http://www.dangdang.com')
f('http://d.cn')
f('http://www.163.com')
f('http://www.126.com')#單線程執行
t2=time.time()
print('--------------------->cost:',t2-t1)
#step 5 協程執行

gevent.joinall([gevent.spawn(f,'http://d.cn'),gevent.spawn(f,'http://www.dangdang.com'),gevent.spawn(f,'http://www.163.com'),gevent.spawn(f,'http://www.126.com')])
t3=time.time()

print('--------------------->cost:',t3-t2)
-------------------------------------------------------------------
GET: http://www.dangdang.com
bytes: 720 url: http://www.dangdang.com
GET: http://d.cn
bytes: 80968 url: http://d.cn
GET: http://www.163.com
bytes: 688755 url: http://www.163.com
GET: http://www.126.com
bytes: 12921 url: http://www.126.com
--------------------->cost: 1.0575993061065674
GET: http://d.cn
GET: http://www.dangdang.com
GET: http://www.163.com
GET: http://www.126.com
bytes: 720 url: http://www.dangdang.com
bytes: 688755 url: http://www.163.com
bytes: 80968 url: http://d.cn
bytes: 12921 url: http://www.126.com
--------------------->cost: 0.14879226684570312

此處的joinall是等協程都運行結束的意思

 若是沒有joinall()

import urllib
import gevent,time
#step 1  導入模塊
from urllib import request
from gevent import monkey
#step 2 監控IO
monkey.patch_all()


#step 3
def f(url):
    print('GET:',url)
    res=request.urlopen(url)#獲取數據

# step 4 用來展現get到的這個url有多少data
    data=res.read()#讀數據
    print('bytes:',len(data),'url:',url)

t1=time.time()#獲取程序開始時間
f('http://www.dangdang.com')
f('http://d.cn')
f('http://www.163.com')
f('http://www.126.com')#單線程執行
t2=time.time()
print('--------------------->cost:',t2-t1)
#step 5 協程執行

gevent.spawn(f,'http://d.cn')
gevent.spawn(f,'http://www.dangdang.com')
gevent.spawn(f,'http://www.163.com')
gevent.spawn(f,'http://www.126.com')
t3=time.time()

print('--------------------->cost:',t3-t2)
---------------------------------
GET: http://www.dangdang.com
bytes: 168224 url: http://www.dangdang.com
GET: http://d.cn
bytes: 80968 url: http://d.cn
GET: http://www.163.com
bytes: 688790 url: http://www.163.com
GET: http://www.126.com
bytes: 12921 url: http://www.126.com
--------------------->cost: 1.2731683254241943
--------------------->cost: 0.0

五、gevent完成socket併發

單線程遇到io輪訓的過程,須要注意註冊輪詢的過程。gevent.spawn

import socket
import gevent
from gevent import monkey
monkey.patch_all()

def server_port_and_listen():
      sever1=socket.socket()
      sever1.bind(('localhost',9999))
      sever1.listen()
      count=1
      print('開始監聽9999端口')
      while True:
            conn,addr=sever1.accept()
#精髓在這裏 每一個鏈接起一個協程
            gevent.spawn(server_deal_conn,conn,count) #協程輪訓監聽 遇到io自動切換
#server_deal_conn(conn)
            count+=1


def server_deal_conn(x,i):
    print('鏈接%s創建'%i)
    try:
        while True:
           data=x.recv(1024)
           if not data:
                print('鏈接已經斷開')
                break
           tmp_data='i am %s\n'%i+data.decode()
           x.send(tmp_data.encode('utf-8'))
           #此處不考慮長度了,只作短消息傳遞
    except Exception as ex:
        print(ex)
    finally:
        print('斷開鏈接',i,'詳細信息',x)
        x.close()

server_port_and_listen()

------------------------------------------

開始監聽9999端口
鏈接1創建
鏈接2創建
鏈接3創建
鏈接4創建
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 1 詳細信息 <gevent._socket3.socket object, fd=616, family=2, type=1, proto=0>
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 2 詳細信息 <gevent._socket3.socket object, fd=624, family=2, type=1, proto=0>
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 3 詳細信息 <gevent._socket3.socket object, fd=628, family=2, type=1, proto=0>
[WinError 10054] 遠程主機強迫關閉了一個現有的鏈接。
斷開鏈接 4 詳細信息 <gevent._socket3.socket object, fd=620, family=2, type=1, proto=0>
鏈接5創建
鏈接6創建

 

 8、事件驅動模型的簡述

一、如何簡單的理解事件

即 :事件  -> 觸發 ->迴應

事件如我點擊鼠標、鍵盤、個人網卡收到數據包

迴應如點鼠標關機,ctrl+s保存文件。回覆對方的ping包。和對方三次握手。

二、三種編程思路(範式)的對比

2.1 單線程

2.2 多線程

2.3 異步

 

單線程和多線程隨着須要掃描設備的增長,更容易出現響應時間和遺漏響應的狀況。多線程須要屢次切換上下文。

三、事件驅動模型圖解,在上文的協程事件驅動模型。

 

 

 

 

 

四、事件驅動模型通常是由事件收集器、事件發送器和事件處理器三部分組成基本單元組成。

針對不一樣的操做系統有不一樣的庫能夠選擇

4.一、select庫 

select庫是各個版本的linux和windows平臺都支持的基本事件驅動模型庫,而且在接口的定義上也基本相同,只是部分參數的含義略有差別。

Select庫有三種事件:讀事件、寫事件、異常事件

4.二、poll庫

poll庫,做爲linux平臺上的基本事件驅動模型,Windows平臺不支持poll庫。

4.三、epoll庫

  epoll庫是Nginx服務器支持的高性能事件之一,它是公認的很是優秀的事件驅動模型,和poll和select有很大的不一樣,屬於poll庫的一個變種,他們的處理方式都是建立一個待處理事件列表,而後把這個事件列表發送給內核,返回的時候,再去輪詢檢查這個列表,以判斷事件是否發生。

 

9、select實現IO多路複用

readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏 # select中第1個參數表示inputs中發生變化的句柄放入readable。 # select中第2個參數表示outputs中的值原封不動的傳遞給writeable。

 # select中第3個參數表示inputs中發生錯誤的句柄放入exeptional.

import socket
import queue
import select
input_list=[]
output_list=[]
conn_list=[]
my_dict={}
server1=socket.socket()
server1.setblocking(False)#默認爲阻塞IO
server1.bind(('localhost',6666))
server1.listen()
input_list.append(server1)

while True:
    readable, writeable, exeptional = select.select(input_list, output_list, input_list)
    # io輪詢監控 多路複用
    # print(readable)
    for i in readable:
        if i is server1:
            print('新建鏈接')
            conn, addr = i.accept()  # 注意accept的位置
            conn.setblocking(False)
            input_list.append(conn)
            my_dict[conn] = queue.Queue()  # 生成一個字典key key對應value爲字典
        else:
            print('in else')
            try:
                data = i.recv(1024)
                print('-----------------------------', len(data))
                if data:
                    print('接收到消息')
                    print(data.decode())
                    my_dict[i].put(data)
                    print(my_dict)
                    output_list.append(i)
                    print('outputlist', output_list)
                else:
                    print('斷開')
                    input_list.remove(i)
            except Exception as areyouok:
                print(areyouok, i)
                input_list.remove(i)
                if i in output_list:
                    output_list.remove(i)
                    del my_dict[i]
    for i in writeable:
        print('writeable----------')
        data = my_dict[i].get()
        print(data.decode())
        i.send(data)
        output_list.remove(i)
    for i in exeptional:
        input_list.remove(i)
        if i in output_list:
            output_list.remove(i)
            del my_dict[i]

 

10、selector與百萬鏈接

 seletor在select、poll、epoll基礎上封裝,若是系統支持eqoll優先epoll

一、selector的思路

#step 1 生成一個selector對象,至關於監聽列表

still_listen=selectors.DefaultSelector()

#step 2 定義方法

#等待鏈接的方法  
def wait_for_connect(server,mask):
    conn,addr=server1.accept()
    ...
#等待消息的方法
def conn_wait_for_message(conn,mask):
       data=conn.recv(1024)
       ...

#Step 3 建立socket

socket.socket()、bind(('127.0.0.1',9999))、listen(10000)、.setblocking(False)

 #step 4 註冊及卸載 selectors.EVENT_READ/ x.register /x.unregister(conn)(相似於gevent.spawn)

still_listen.register(server1,selectors.EVENT_READ,wait_for_connect)
still_listen.register(conn,selectors.EVENT_READ,conn_wait_for_message)

#step 5 開始監聽

   infor_come=still_listen.select()#無返回時阻塞
   for key,mask in infor_come:
       choice_func=key.data #根據註冊的方式獲取須要調用的函數
       choice_func(key.fileobj,mask)#傳入對象(已創建的鏈接conn或新建鏈接server)

二、selector 實例

import selectors
import socket

#ulimit - n
#ulimit -SHn

#step 1 selectors.DefaultSelector()生成一個額select對象
still_listen=selectors.DefaultSelector()

#step 2 定義方法 
#等待鏈接的方法  
def wait_for_connect(server,mask):
    conn,addr=server1.accept()
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    still_listen.register(conn,selectors.EVENT_READ,conn_wait_for_message)

#等待消息的方法
def conn_wait_for_message(conn,mask):
    try:
       data=conn.recv(1024)
       print(conn,'message coming')
       if data:
         conn.send(data)
       else:
           print('close...')
           still_listen.unregister(conn)
           conn.close()
    except Exception as ex:
        print(ex)
        still_listen.unregister(conn)
        conn.close()

server1=socket.socket()
server1.bind(('127.0.0.1',9999))
server1.listen(10000)
server1.setblocking(False)
#step 3 註冊方法 x.register 與卸載方法 x.unregister(conn)
still_listen.register(server1,selectors.EVENT_READ,wait_for_connect)
print('運行成功,開始監聽端口。。。')

#step 4 開始監聽
while True:
   infor_come=still_listen.select()#無返回時阻塞
   for key,mask in infor_come:
       choice_func=key.data #根據註冊的方式獲取須要調用的函數
       choice_func(key.fileobj,mask)#傳入對象(已創建的鏈接conn或新建鏈接server)

三、觸發大量鏈接的方法

實際就是循環創建鏈接

import socket

socket_List=[]#用於存放鏈接對象
t1=time.time()#開始時間
for i in range(1000):#生成一千個鏈接對象
    x=socket.socket()
    socket_List.append(x)
print('準備開始鏈接服務器')
t2=time.time()
for x in socket_List:
    x.connect(('192.168.99.106', 8999))#對象開始正式鏈接服務器
print('鏈接創建完畢')
messgae_list=['it is the first message','it is the second message','it is the third message']
t3=time.time()

for mes in messgae_list:
    print(mes)
    for s in socket_List:#每一個鏈接開始發送內容
        print('%s: sending "%s"' % (s.getsockname(), mes) )
        s.send(mes.encode('utf-8'))
    for s in socket_List:#每一個鏈接開始接收內容
        data=s.recv(1024)
        print('%s: received "%s"' % (s.getsockname(), data))
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname())
t4=time.time()
print(t2-t1)
print(t3-t2)
print(t4-t3)
#輸出時間

 

 

os.walk()的使用!!!

相關文章
相關標籤/搜索