多進程與多線程(九)

1.1 基於UDP協議實現簡單的套接字通訊

udp是無連接的,先啓動哪一端都不會報錯html

udp套接字簡單示例python

1.1.1.1  客戶端:

from socket import *

 

client=socket(AF_INET,SOCK_DGRAM) #數據報協議,建立一個客戶的套接字

 

while True: # 通信循環

    msg=input('>>: ').strip()

    client.sendto(msg.encode('utf-8'),('127.0.0.1',8080)) #對話(發送)

    data,server_addr=client.recvfrom(1024)

    print(data.decode('utf-8'))

 

 

1.1.1.2  服務端:

from socket import *

 

server=socket(AF_INET,SOCK_DGRAM) #數據報協議,建立一個服務器的套接字

server.bind(('127.0.0.1',8080)) #綁定服務器套接字

 

 

data,client_addr=server.recvfrom(3) #對話(接收)

print('第一次:',data)

 

 

1.1.2 數據報協議

1.1.2.1  客戶端:

from socket import *

 

client=socket(AF_INET,SOCK_DGRAM) #數據報協議

 

 

client.sendto('hello'.encode('utf-8'),('127.0.0.1',8082))

client.sendto('world'.encode('utf-8'),('127.0.0.1',8082))

 

 

1.1.2.2  服務端:

from socket import *

 

server=socket(AF_INET,SOCK_DGRAM) #數據報協議

server.bind(('127.0.0.1',8080))

 

 

data,client_addr=server.recvfrom(3)

print('第一次:',data)

data,client_addr=server.recvfrom(3)

print('第二次: ',data)

 

 

1.2 多進程

顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。算法

進程的概念起源於操做系統,是操做系統最核心的概念,也是操做系統提供的最古老也是最重要的抽象概念之一。操做系統的其餘全部內容都是圍繞進程的概念展開的。數據庫

即便能夠利用的cpu只有一個(早期的計算機確實如此),也能保證支持(僞)併發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路複用和空間多路複用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。編程

操做系統的做用:json

    1:隱藏醜陋複雜的硬件接口,提供良好的抽象接口windows

    2:管理、調度進程,而且將多個進程對硬件的競爭變得有序安全

 

多道技術:服務器

    1.產生背景:針對單核,實現併發多線程

    ps:

    如今的主機通常是多核,那麼每一個核都會利用多道技術

    有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再從新調度,會被調度到4個

    cpu中的任意一個,具體由操做系統調度算法決定。

   

    2.空間上的複用:如內存中同時有多道程序

    3.時間上的複用:複用一個cpu的時間片

強調:遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來,這樣才能保證下次切換回來時,能基於上次切走的位置繼續運行

1.2.1 開啓進程的兩種方式:

1.2.1.1  multiprocessing模塊介紹

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。

    multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。

  multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。

    須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

 

1.2.1.2  方式一:

注意:在windows中Process()必須放到# if __name__ == '__main__':下

from multiprocessing import Process

import time

 

def task(name):

    print('%s is running' %name)

    time.sleep(5)

    print('%s is done' %name)

 

 

if __name__ == '__main__':

    p=Process(target=task,args=('alex',))

    p.start()

    print('')

 

 

1.2.1.3  方式二:

from multiprocessing import Process

import time

 

class MyProcess(Process):

    def __init__(self,name):

        super(MyProcess,self).__init__()

        self.name=name

 

    def run(self):

        print('%s is running' %self.name)

        time.sleep(3)

        print('%s is done' %self.name)

 

if __name__ == '__main__':

    p=MyProcess('進程1')

    p.start() #p.run()

    print('')

 

 

1.2.2 進程之間內存空間是隔離的

from multiprocessing import Process

import time

 

n=100

 

def task():

    global n

    time.sleep(5)

    n=0

 

if __name__ == '__main__':

    p=Process(target=task)

    p.start()

    # time.sleep(5)

    print(p.is_alive())

    p.join()

    print(p.is_alive())

    print('',n)

 

1.2.3 join方法

主進程等,等待子進程結束

from multiprocessing import Process

import time

import os

 

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(n)

    print('%s is done' %os.getpid())

 

 

if __name__ == '__main__':

 

    start_time=time.time()

    p1=Process(target=task,args=(1,))

    p2=Process(target=task,args=(2,))

    p3=Process(target=task,args=(3,))

 

    p_l=[p1,p2,p3]

    # p1.start()

    # p2.start()

    # p3.start()

    for p in p_l:

        p.start()

 

    # p3.join()  #3

    # p1.join() #

    # p2.join() #

    for p in p_l:

        p.join()

    stop_time=time.time()

    print('',(stop_time-start_time))

 

既然join是等待進程結束,進程不就又變成串行的了嗎?固然不是了,必須明確:p.join()是讓誰等?很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,
進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了, 而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵,join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待, 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
 
from multiprocessing import Process

import time

import os

 

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(n)

    print('%s is done' %os.getpid())

 

 

if __name__ == '__main__':

 

    start_time=time.time()

    p1=Process(target=task,args=(1,))

    p2=Process(target=task,args=(2,))

    p3=Process(target=task,args=(3,))

 

    p_l=[p1,p2,p3]

    # p1.start()

    # p2.start()

    # p3.start()

    for p in p_l:

        p.start()

 

    # p3.join()  #3

    # p1.join() #

    # p2.join() #

    for p in p_l:

        p.join()

    stop_time=time.time()

    print('',(stop_time-start_time))

 

 
from multiprocessing import Process

import time

import os

 

def task(n):

    print('%s is runing' %os.getpid())

    time.sleep(n)

    print('%s is done' %os.getpid())

 

 

if __name__ == '__main__':

 

    start_time=time.time()

    p1=Process(target=task,args=(1,))

    p2=Process(target=task,args=(2,))

    p3=Process(target=task,args=(3,))

 

 

    p1.start()

    p1.join()

    p2.start()

    p2.join()

    p3.start()

    p3.join()

    stop_time=time.time()

    print('',(stop_time-start_time))

 

1.2.4 進程對象的其餘屬性或方法

進程對象的其餘方法一:terminate,is_alive

from multiprocessing import Process

import time

import os

 

def task(n):

    print('pid:%s ppid:%s' %(os.getpid(),os.getppid()))

    time.sleep(n)

 

 

if __name__ == '__main__':

    p=Process(target=task,args=(15,),name='進程1')

    p.start()

    p.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活

    # time.sleep(1)

    print(p.is_alive())#結果爲True

    print('主pid:%s ppid:%s' %(os.getpid(),os.getppid()))

    # print(p.pid)

    p.name='xxxx'

print(p.name)

 

 

 

1.2.5 守護進程

主進程建立守護進程

守護進程:當子進程執行的任務在父進程代碼運行完畢後就沒有存在的必要了,那

該子進程就應該被設置爲守護進程,

守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process

import time

 

def task(name):

    p=Process(target=time.sleep,args=(6,))

    p.start()

    print('%s is running' %name)

    time.sleep(5)

    print('%s is done' %name)

 

 

if __name__ == '__main__':

    p=Process(target=task,args=('alex',))

    p.daemon=True  #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行

    p.start()

    time.sleep(1)

    print('')

 

主進程代碼運行完畢,守護進程就會結束

from multiprocessing import Process

from threading import Thread

import time

def foo():

    print(123)

    time.sleep(1)

    print("end123")

 

def bar():

    print(456)

    time.sleep(3)

    print("end456")

 

if __name__ == '__main__':

 

    p1=Process(target=foo)

    p2=Process(target=bar)

 

    p1.daemon=True

    p1.start()

    p2.start()

    print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止

 

 

 

1.2.6 互斥鎖

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

多個進程共享同一打印終端, 併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂, 加鎖:由併發變成了串行,犧牲了運行效率,但避免了競爭

多個進程共享同一文件, 併發運行,效率高,但競爭寫同一文件,數據寫入錯亂, 加鎖:購票行爲由併發變成了串行,犧牲了運行效率,但保證了數據安全

文件當數據庫,模擬搶票

from multiprocessing import Process,Lock

import json

import time

import random

import os

 

def search():

    time.sleep(random.randint(1,3))

    dic=json.load(open('db.txt','r',encoding='utf-8'))

    print('%s 查看到剩餘票數%s' %(os.getpid(),dic['count']))

 

def get():

    dic=json.load(open('db.txt','r',encoding='utf-8'))

    if dic['count'] > 0:

        dic['count']-=1

        time.sleep(random.randint(1,3))

        json.dump(dic,open('db.txt','w',encoding='utf-8'))

        print('%s 購票成功' %os.getpid())

 

def task(mutex):

    search()

    mutex.acquire()

    get()

    mutex.release()

 

if __name__ == '__main__':

    mutex=Lock()

    for i in range(10):

        p=Process(target=task,args=(mutex,))

        p.start()

        # p.join()

 

db.txt

{"count": 1}

 

加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

雖然能夠用文件共享數據實現進程間通訊,但問題是:

1.效率低(共享數據基於文件,而文件是硬盤上的數據)

2.須要本身加鎖處理

所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。

1 隊列和管道都是將數據存放於內存中

2 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,

咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

 

1.2.7 隊列

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

建立隊列的類(底層就是以管道和鎖定的方式實現)

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

maxsize是隊列中容許最大項數,省略則無大小限制。

q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。

q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.

 

q.get_nowait():同q.get(False)

q.put_nowait():同q.put(False)

 

q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。

q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。

q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

from multiprocessing import Queue

 

q=Queue(3)

 

q.put('first')

q.put(2)

q.put({'count':3})

# q.put('fourth',block=False) #q.put_nowait('fourth')

# q.put('fourth',block=True,timeout=3)

 

print(q.get())

print(q.get())

print(q.get())

# print(q.get(block=False)) #q.get_nowait()

print(q.get(block=True,timeout=3))

 

其餘方法(瞭解):

1 q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞

2 q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

3 q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

1.2.8 生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time
import random

def producer(name,food,q):
    for i in range(3):
        res='%s%s' %(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('廚師[%s]生產了<%s>' %(name,res))


def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('吃貨[%s]吃了<%s>' % (name, res))

if __name__ == '__main__':
    #隊列
    q=Queue()
    #生產者們
    p1=Process(target=producer,args=('egon1','泔水',q))
    p2=Process(target=producer,args=('egon2','骨頭',q))
    #消費者們
    c1=Process(target=consumer,args=('管廷威',q))
    c2=Process(target=consumer,args=('oldboy',q))
    c3=Process(target=consumer,args=('oldgirl',q))


    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()

    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print('')

 

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

 

   #參數介紹:

    maxsize是隊列中容許最大項數,省略則無大小限制。   

  #方法介紹:

    JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:

    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

from multiprocessing import Process,JoinableQueue

import time

import random

 

def producer(name,food,q):

    for i in range(3):

        res='%s%s' %(food,i)

        time.sleep(random.randint(1,3))

        q.put(res)

        print('廚師[%s]生產了<%s>' %(name,res))

 

 

def consumer(name,q):

    while True:

        res=q.get()

        if res is None:break  #生產者在生產完畢後發送結束信號None

        time.sleep(random.randint(1,3))

        print('吃貨[%s]吃了<%s>' % (name, res))

        q.task_done()

 

if __name__ == '__main__':

    #隊列

    q=JoinableQueue()

    #生產者們

    p1=Process(target=producer,args=('egon1','泔水',q))

    p2=Process(target=producer,args=('egon2','骨頭',q))

    #消費者們

    c1=Process(target=consumer,args=('管廷威',q))

    c2=Process(target=consumer,args=('oldboy',q))

    c3=Process(target=consumer,args=('oldgirl',q))

    c1.daemon=True

    c2.daemon=True

    c3.daemon=True

 

    p1.start()

    p2.start()

    c1.start()

    c2.start()

    c3.start()

 

    p1.join()

    p2.join()

    q.join()

    print('')

 

 

1.3 多線程

threading模塊介紹

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹

官網連接:https://docs.python.org/3/library/threading.html?highlight=threading#

1.3.1 開啓線程的兩種方式

方式一:

from threading import Thread

import time

import random

 

def piao(name):

    print('%s is piaoing' %name)

    time.sleep(random.randint(1,3))

    print('%s is piao end' %name)

 

 

if __name__ == '__main__':

    t1=Thread(target=piao,args=('alex',))

    t1.start()

    print('')

 

 

方式二:

from threading import Thread
import time
import random

class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randint(1,3))
        print('%s is piao end' %self.name)


if __name__ == '__main__':
    t1=MyThread('alex')
    t1.start()
    print('')

 

 

1.3.2 練習

多線程併發

客戶端:

from socket import *

 

client=socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8081))

 

while True:

    msg=input('>>: ').strip()

    if not msg:continue

    client.send(msg.encode('utf-8'))

    data=client.recv(1024)

    print(data.decode('utf-8'))

 

client.close()

 

服務端:

from threading import Thread,current_thread

from socket import *

 

def comunicate(conn):

    print('子線程:%s' %current_thread().getName())

    while True:

        try:

            data=conn.recv(1024)

            if not data:break

            conn.send(data.upper())

        except ConnectionResetError:

            break

    conn.close()

 

def server(ip,port):

    print('主線程:%s' %current_thread().getName())

    server = socket(AF_INET, SOCK_STREAM)

    server.bind((ip,port))

    server.listen(5)

 

    while True:

        conn, addr = server.accept()

        print(addr)

        # comunicate(conn)

        t=Thread(target=comunicate,args=(conn,))

        t.start()

 

    server.close()

 

if __name__ == '__main__':

    server('127.0.0.1', 8081)

 

 

 

1.3.3 進程與線程的區別

from threading import Thread
import time
import random
import os

def piao():
    print('%s is piaoing' %os.getpid())
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    t1=Thread(target=piao,)
    t2=Thread(target=piao,)
    t3=Thread(target=piao,)
    t1.start()a
    t2.start()
    t3.start()
    print('',os.getpid())

 

 

from threading import Thread

import time

import random

import os

 

n=100

def piao():

    global n

    n=0

 

if __name__ == '__main__':

    t1=Thread(target=piao,)

    t1.start()

    t1.join()

    print('',n)

 

 

1.3.4 守護線程

不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬

須要強調的是:運行完畢並不是終止運行

1.對主進程來講,運行完畢指的是主進程代碼運行完畢

2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢

主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束

主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

from threading import Thread
import time

def sayhi(name):
    print('====>')
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    # t.setDaemon(True)
    t.daemon=True
    t.start()

    print('主線程')

 

from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == '__main__':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True
    t1.start()
    t2.start()
    print("main-------")

 

 

 

1.3.5 線程的互斥鎖

from threading import Thread,Lock

import time

 

n=100

 

def task():

    global n

    with mutex:

        temp=n

        time.sleep(0.1)

        n=temp-1

 

if __name__ == '__main__':

    start_time=time.time()

    mutex=Lock()

    t_l=[]

    for i in range(100):

        t=Thread(target=task)

        t_l.append(t)

        t.start()

 

    for t in t_l:

        t.join()

    stop_time=time.time()

    print('',n)

    print('run time is %s' %(stop_time-start_time))

 

1.3.6 測試文件

# import time

# import os

#

# print(os.getpid())

# time.sleep(1000)

 

import os

print(os.cpu_count())  # 本機爲4核

 

 

1.3.7 GIL測試

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL

GIL本質就是一把互斥鎖,既然是互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

能夠確定的一點是:保護不一樣的數據的安全,就應該加不一樣的鎖

GIL保護的是解釋器級的數據,保護用戶本身的數據則須要本身加鎖處理,以下圖

 

 

圖1-1  

有了GIL的存在,同一時刻同一進程中只有一個線程被執行

多cpu,意味着能夠有多個核並行完成計算,因此多核提高的是計算性能, 每一個cpu一旦遇到I/O阻塞,仍然須要等待,因此多核對I/O操做沒什麼用處

一個工人至關於cpu,此時計算至關於工人在幹活,I/O阻塞至關於爲工人幹活提供所需原材料的過程,工人幹活的過程當中若是沒有原材料了,則工人幹活的過程須要中止,直到等待原材料的到來。

若是你的工廠乾的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的工人,意義也不大,還不如一我的,在等材料的過程當中讓工人去幹別的活,

反過來說,若是你的工廠原材料都齊全,那固然是工人越多,效率越高

 對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用

固然對運行一個程序來講,隨着cpu的增多執行效率確定會有所提升(無論提升幅度多大,總會有所提升),這是由於一個程序基本上不會是純計算或者純I/O,因此咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,從而進一步分析python的多線程到底有無用武之地

如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。

多線程性能測試

1.3.7.1  計算密集型:開多進程

from multiprocessing import Process

from threading import Thread

import os,time

def work():

    res=0

    for i in range(100000000):

        res*=i

 

 

if __name__ == '__main__':

    l=[]

    start=time.time()

    for i in range(4):

        # p=Process(target=work) #5.826333284378052

        p=Thread(target=work) #run time is 19.91913938522339

        l.append(p)

        p.start()

    for p in l:

        p.join()

    stop=time.time()

    print('run time is %s' %(stop-start))

 

 

1.3.7.2  I/O密集型:多線程效率高

from multiprocessing import Process

from threading import Thread

import threading

import os,time

def work():

    time.sleep(2)

 

if __name__ == '__main__':

    l=[]

    start=time.time()

    for i in range(400):

        # p=Process(target=work) # 12.465712785720825

        p=Thread(target=work) #2.037116765975952

        l.append(p)

        p.start()

    for p in l:

        p.join()

    stop=time.time()

    print('run time is %s' %(stop-start))

 

from threading import Thread,Lock

import time

 

n=100

 

def task():

    global n

    mutex.acquire()

    temp=n

    time.sleep(0.1)

    n=temp-1

    mutex.release()

 

if __name__ == '__main__':

    mutex=Lock()

    for i in range(3):

        t=Thread(target=task)

        t.start()

 

 

 

1.3.8 paramiko模塊

paramiko是一個用於作遠程控制的模塊,使用該模塊能夠對遠程服務器進行命令或文件操做,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實。

下載安裝

pip3 install paramiko #在python3中

 

在python2中

pycrypto,因爲 paramiko 模塊內部依賴pycrypto,因此先下載安裝pycrypto #在python2中

pip3 install pycrypto

pip3 install paramiko

 

注:若是在安裝pycrypto2.0.1時發生以下錯誤

        command 'gcc' failed with exit status 1...

多是缺乏python-dev安裝包致使

若是gcc沒有安裝,請事先安裝gcc

1.3.8.1  用於鏈接遠程服務器並執行基本命令

import paramiko

 

# 建立SSH對象

ssh = paramiko.SSHClient()

# 容許鏈接不在know_hosts文件中的主機

ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 鏈接服務器

ssh.connect(hostname='106.74.230.135', port=22101, username='root', password='123456')

 

# 執行命令

stdin, stdout, stderr = ssh.exec_command('df')

# 獲取命令結果

result = stdout.read()

print(result.decode('utf-8'))

# 關閉鏈接

ssh.close()

 

 

import paramiko

 

transport = paramiko.Transport(('106.74.230.135', 22101))

transport.connect(username='root', password='123456')

 

sftp = paramiko.SFTPClient.from_transport(transport)

# 將location.py 上傳至服務器 /tmp/test.py

sftp.put(r'D:\video\python20期\day9\02 多線程\7 GIL測試.py', '/root/test.txt')

# 將remove_path 下載到本地 local_path

# sftp.get('remove_path', 'local_path')

 

transport.close()
相關文章
相關標籤/搜索