30、進程的基礎理論,併發(multiprocessing模塊)

咱們以前基於tcp所作的通訊都只能一個一個連接只有關閉當前連接時才能去連接下一個通訊,這顯然與現實狀況不合。今天咱們未來學一個新的概念進程,來作一個python多進程的併發編程。還會貼一個用json序列化將上個隨筆中的ssh例子優化的代碼。html

 

本篇導航:python

 

1、粘包優化方案算法

以前咱們解決粘包的方式是用struct模塊來製做一個報頭,可是這個解決的方案是有漏洞的,當咱們須要傳送的文件大於2g時將會報錯。因此咱們今天將用json來製做報頭。shell

from socket import *
import subprocess
import struct
import json
ss = socket(AF_INET,SOCK_STREAM)
ss.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
ss.bind(('127.0.0.1',8082))
ss.listen(5)

print('starting...')
while True : #連接循環
    conn,addr = ss.accept() #連接,客戶的的ip和端口組成的元組
    print('-------->',conn,addr)

    #收,發消息
    while True :#通訊循環
        try :
            cmd = conn.recv(1024)
            res = subprocess.Popen(cmd.decode('utf-8'), shell = True,
                                   stdout = subprocess.PIPE,
                                   stderr = subprocess.PIPE)
            stdout = res.stdout.read()
            stderr = res.stderr.read()
            #製做報頭
            h_dic = {
                'total_size': len(stdout) + len(stderr),
                'filename': None,
                'md5': None}

            h_json = json.dumps(h_dic)
            h_bytes = h_json.encode('utf-8')
            #發送階段
            #先發報頭長度
            conn.send(struct.pack('i',len(h_bytes)))
            #再發報頭
            conn.send(h_bytes)

            #最後發送命令的結果
            conn.send(stdout)
            conn.send(stderr)
        except Exception :
            break
    conn.close()
ss.close()
服務端
from socket import *
import struct
import json
cs = socket(AF_INET,SOCK_STREAM) #買手機
cs.connect(('127.0.0.1',8082)) #綁定手機卡

#發,收消息
while True :
    cmd = input('>>: ').strip()
    if not cmd : continue

    cs.send(cmd.encode('utf-8'))
    #先收報頭的長度
    h_len = struct.unpack('i',cs.recv(4))[0]

    #再收報頭
    h_bytes = cs.recv(h_len)
    h_json = h_bytes.decode('utf-8')
    h_dic = json.loads(h_json)
    total_size = h_dic['total_size']

    #最後收數據
    recv_size = 0
    total_data = b''
    while recv_size < total_size :
        recv_data = cs.recv(1024)
        recv_size += len(recv_data)
        total_data += recv_data
    print(total_data.decode('gbk'))
cs.close()
客戶端

 

2、知識儲備編程

進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。操做系統的其餘全部內容都是圍繞進程的概念展開的。json

對操做系統基礎知識比較模糊的能夠在閱讀本文前預習計算機基礎二、操做系統http://www.cnblogs.com/liluning/p/7162317.htmlwindows

即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。安全

#一 操做系統的做用:
    1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口
    2:管理、調度進程,而且將多個進程對硬件的競爭變得有序

#二 多道技術:
    1.產生背景:針對單核,實現併發
    ps:
    如今的主機通常是多核,那麼每一個核都會利用多道技術,可是核與核之間沒有使用多道技術切換這麼一說;
    有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個cpu中的任意一個,具體由操做系統調度算法決定。
    
    2.時間上的複用(複用一個cpu的時間片)+空間上的複用(如內存中同時有多道程序)

 

3、進程網絡

一、進程概念併發

進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。正在進行的一個過程或者說一個任務。而負責執行任務則是cpu

二、進程和程序

程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。

須要強調的是:同一個程序執行兩次,那也是兩個進程,好比打開暴風影音,雖然都是同一個軟件,可是一個能夠播放蒼井井,一個能夠播放小澤澤。

三、併發與並行

不管是並行仍是併發,在用戶看來都是'同時'運行的,不論是進程仍是線程,都只是一個任務而已,真實幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行一個任務

1) 併發:是僞並行,即看起來是同時運行。單個cpu+多道技術就能夠實現併發,(並行也屬於併發)

你是一個cpu,你同時談了三個女友,每個均可以是一個戀愛任務,你被這三個任務共享
要玩出併發戀愛的效果,
應該是你先跟女朋友1去看電影,看了一會說:很差,我要拉肚子,而後跑去跟第二個女朋友吃飯,吃了一會說:那啥,我
去趟洗手間,而後跑去跟女朋友3開了個房

2)同時運行,只有具有多個cpu才能實現並行

單核下,能夠利用多道技術,多個核,每一個核也均可以利用多道技術(多道技術是針對單核而言的)

有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,

一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術

而一旦任務1的I/O結束了,操做系統會從新調用它(需知進程的調度、分配給哪一個cpu運行,由操做系統說了算),可能被分配給四個cpu中的任意一個去執行

四、同步與異步

同步執行:一個進程在執行某個任務時,另一個進程必須等待其執行完畢,才能繼續執行
異步執行:一個進程在執行某個任務時,另一個進程無需等待其執行完畢,就能夠繼續執行,當有消息返回時,系統會通知後者進行處理,這樣能夠提升執行效率

打電話時就是同步通訊,發短息時就是異步通訊。

(我僅是對進程作簡單介紹足夠咱們編程使用,對進程具體想了解的能夠本身查閱相關資料)


 

4、python的併發編程

一、multiprocessing模塊

multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數)

multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件

進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

二、Process類的介紹

1)進程的建立

Process(target = talk,args = (conn,addr))
#由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

2)參數介紹

group參數未使用,值始終爲None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'egon',)

kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}

name爲子進程的名稱

3)方法介紹

p.start():啓動進程,並調用該子進程中的p.run() 
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  

p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

4)屬性介紹

p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)

p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

三、process類的使用

注意:在windows中Process()必須放到# if __name__ == '__main__':下

1)開啓進程的方式一:

from multiprocessing import Process
import time,random
import os
def piao(name):
    print(os.getppid(),os.getpid())
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)
if __name__ == '__main__':
    p1=Process(target=piao,kwargs={'name':'alex',})
    p2=Process(target=piao,args=('wupeiqi',))
    p3=Process(target=piao,kwargs={'name':'yuanhao',})
    p1.start()
    p2.start()
    p3.start()
    print('主進程',os.getpid())
#os.getppid(),os.getpid()
#父進程id,當前進程id

2)開啓進程的方式二:

from multiprocessing import Process
import time,random
import os
class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getppid(),os.getpid())
        print('%s is piaoing' %self.name)
        # time.sleep(random.randint(1,3))
        print('%s is piao end' %self.name)
if __name__ == '__main__':
    p1=Piao('alex')
    p2=Piao('wupeiqi')
    p3=Piao('yuanhao')

    p1.start()
    p2.start()
    p3.start()
    print('主進程',os.getpid(),os.getppid())

四、將基於tcp協議的socket通訊變成併發的形式

服務端:

from socket import *
from multiprocessing import Process
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
s.bind(('127.0.0.1',8088))
s.listen(5)
def talk(conn,addr):
    while True: #通訊循環
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
if __name__ == '__main__':
    while True:#連接循環
        conn,addr=s.accept()
        p=Process(target=talk,args=(conn,addr))
        p.start()
    s.close()

客戶端:

from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
c.close()
相關文章
相關標籤/搜索