day9-python-進程、線程和協程

1、進程python

程序的執行實例稱爲進程。linux

每一個進程提供執行程序所需的資源。進程有虛擬地址空間、可執行代碼、系統對象的打開句柄、安全上下文、唯一進程標識符、環境變量、優先級類、最小和最大工做集大小,以及至少一個執行線程。每一個進程由一個線程(一般稱爲主線程)啓動,可是能夠從它的任何線程建立額外的線程。程序員

程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。web

在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行。這是這樣的設計,大大提升了CPU的利用率。進程的出現讓每一個用戶感受到本身獨享CPU,所以,進程就是爲了在CPU上實現多道編程而提出的。數據庫

有了進程爲何還要線程?

進程有不少優勢,它提供了多道編程,讓咱們感受咱們每一個人都擁有本身的CPU和其餘資源,能夠提升計算機的利用率。不少人就不理解了,既然進程這麼優秀,爲何還要線程呢?其實,仔細觀察就會發現進程仍是有不少缺陷的,主要體如今兩點上:編程

  • 進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。數組

  • 進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。安全

例如,咱們在使用qq聊天, qq作爲一個獨立進程若是同一時間只能幹一件事,那他如何實如今同一時刻 即能監聽鍵盤輸入、又能監聽其它人給你發的消息、同時還能把別人發的消息顯示在屏幕上呢?你會說,操做系統不是有分時麼?但個人親,分時是指在不一樣進程間的分時呀, 即操做系統處理一會你的qq任務,又切換到word文檔任務上了,每一個cpu時間片分給你的qq程序時,你的qq仍是隻能同時幹一件事呀。服務器

再直白一點, 一個操做系統就像是一個工廠,工廠裏面有不少個生產車間,不一樣的車間生產不一樣的產品,每一個車間就至關於一個進程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,爲了能讓全部車間都能同時生產,你的工廠的電工只能給不一樣的車間分時供電,可是輪到你的qq車間時,發現只有一個幹活的工人,結果生產效率極低,爲了解決這個問題,應該怎麼辦呢?。。。。沒錯,你確定想到了,就是多加幾個工人,讓幾我的工人並行工做,這每一個工人,就是線程!多線程

2、線程

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務

線程是一個執行上下文,它是CPU執行指令流所需的全部信息。

假設你正在讀一本書,你如今想休息一下,可是你想從你中止的地方回來繼續閱讀。一種方法是記下頁碼、行號和字數。你閱讀一本書的執行環境是這三個數字。

若是你有一個室友,她用了一樣的方法,她能夠在你不用的時候拿起這本書,從她停下的地方開始讀。而後你能夠把它拿回去,從你原來的地方繼續。

線程以相同的方式工做。CPU給你的錯覺是它在同一時間作多個計算。它經過在每次計算上花費一點時間來實現這一點。它能夠這樣作,由於它對每一個計算都有一個執行上下文。就像你能夠和朋友共享一本書同樣,許多任務也能夠共享一個CPU。

在更技術的層面上,執行上下文(所以是線程)由CPU寄存器的值組成。

最後:線程與進程不一樣。線程是執行的上下文,而進程是一組與計算相關的資源。一個進程能夠有一個或多個線程。全部在同一個進程裏的線程是共享同一塊內存空間的。

說明:與進程相關的資源包括內存頁(進程中的全部線程都具備相同的內存視圖)、文件描述符(例如open sockets)和安全憑據(例如,啓動進程的用戶的ID)。

3、進程和線程的區別

一、線程共享內存空間,進程的內存空間是獨立的。

二、線程能夠直接訪問其進程的數據段;進程有本身的父進程數據段的副本。

三、同一個進程的線程之間能夠直接交流,兩個進程想通訊,必須經過一箇中間代理來實現。

四、建立新線程很簡單,建立新進程須要對其父進程進行一次克隆。

五、一個線程能夠控制和操做同一進程裏的其餘線程,可是進程只能操做子進程。

六、對主線程的修改可能影響到其餘線程,對父進程的修改不會影響子進程。

4、Python GIL(Global Interpreter Lock)

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行,擦。。。,那這還叫什麼多線程呀?莫如此早的下結結論,聽我現場講。

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL

5、Python threading模塊

線程有兩種調用方式,以下:

直接調用

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

def run(n):
    print("task",n)
    time.sleep(2)

if __name__ == '__main__':
    t1 = threading.Thread(target=run,args=(1,)) #生成一個線程實例
    t2 = threading.Thread(target=run,args=(2,)) #生成另外一個線程實例

    t1.start()  #啓動線程
    t2.start()  #啓動另外一個線程
    
    print(t1.getName()) #獲取線程名
    print(t2.getName())

繼承式調用

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

class MyThread(threading.Thread):
    def __init__(self,n):
        super(MyThread,self).__init__()
        self.n = n

    def run(self):  #定義每一個線程要運行的函數
        print("running task",self.n)

        time.sleep(3)

if __name__ == '__main__':
    
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

6、join & Daemon

有些線程執行後臺任務,好比發送keepalive數據包,或者執行週期性的垃圾收集,等等。這些只有在主程序運行時纔有用,而且當其餘非守護進程的線程退出時,能夠終止它們。

若是沒有守護進程線程,您必須跟蹤它們,並在程序徹底退出以前告訴它們退出。經過將它們設置爲守護線程,您可讓它們運行並忘記它們,當您的程序退出時,任何守護線程都會自動被殺死。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def run(n):
    print("task",n)
    time.sleep(2)
    print("task done",n,threading.current_thread())

start_time = time.time()
t_objs = [] #存線程實例
for i in range(20):
    t = threading.Thread(target=run,args=("t-%s" %i,))
    t.start()
    t_objs.append(t)    #爲了避免阻塞後面線程的啓動,不在這裏join,先放到一個列表

for t in t_objs:    #循環線程實例列表,等待全部線程執行完畢
    t.join()

print("-----------------all threads has finished...",threading.current_thread(),threading.active_count())
print("cost:",time.time() - start_time)
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def run(n):
    print("task",n)
    time.sleep(2)
    print("task done",n,threading.current_thread())

start_time = time.time()
t_objs = [] #存線程實例
for i in range(20):
    t = threading.Thread(target=run,args=("t-%s" %i,))
    t.setDaemon(True)   #把當前線程設置成守護線程
    t.start()
    t_objs.append(t)


print("-----------------all threads has finished...",threading.current_thread(),threading.active_count())
print("cost:",time.time() - start_time)

注意:守護進程線程在關閉時忽然中止。它們的資源(如打開的文件、數據庫事務等)可能沒法正確釋放。若是但願線程優雅地中止,請使它們非守護進程,並使用適當的信號機制(如事件)。

7、線程鎖(互斥鎖Mutex)

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading
import time

def addNum():
global num # 在每一個線程中都獲取這個全局變量
print('--get num:', num)
time.sleep(1)
num -= 1 # 對此公共變量進行-1操做


num = 100 # 設定一個共享變量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: # 等待全部線程執行完畢
t.join()

print('final num:', num)

正常來說,這個num結果應該是0, 但在python 2.7上多運行幾回,會發現,最後打印出來的num結果不老是0,爲何每次運行的結果不同呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。 

*注:不要在3.x上運行,不知爲何,3.x上的結果老是正確的,多是自動加了鎖

加鎖版本

import time
import threading


def addNum():
    global num  # 在每一個線程中都獲取這個全局變量
    print('--get num:', num)
    time.sleep(1)
    lock.acquire()  # 修改數據前加鎖
    num -= 1  # 對此公共變量進行-1操做
    lock.release()  # 修改後釋放


num = 100  # 設定一個共享變量
thread_list = []
lock = threading.Lock()  # 生成全局鎖
for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list:  # 等待全部線程執行完畢
    t.join()

print('final num:', num)

8、GIL VS Lock

機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體咱們經過下圖來看一下+配合我現場講給你們,就明白了。

那你又問了, 既然用戶程序已經本身有鎖了,那爲何C python還須要GIL呢?加入GIL主要的緣由是爲了下降程序的開發的複雜度,好比如今的你寫python不須要關心內存回收的問題,由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題,  這能夠說是Python早期版本的遺留問題。

9、RLock(遞歸鎖)

說白了就是在一個大鎖中還要再包含子鎖

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time


def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num


def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)




num, num2 = 0, 0
lock = threading.RLock()
for i in range(1):
    t = threading.Thread(target=run3)
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

10、Semaphore(信號量)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s\n" % n)
    semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)  # 最多容許5個線程同時運行
    for i in range(22):
        t = threading.Thread(target=run, args=(i,))
        t.start()
while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    #print(num)

11、Timer

該類表示僅在通過必定時間後才應運行的操做

與線程同樣,經過調用它們的start()方法來啓動計時器。能夠經過調用thecancel()方法來中止計時器(在其操做開始以前)。計時器在執行其操做以前將等待的時間間隔可能與用戶指定的時間間隔不徹底相同。

def hello():
    print("hello, world")


t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed

12、Events

事件是一個簡單的同步對象;

事件表示內部標誌和線程

能夠等待旗子被設置,或者本身設置或清除旗子。

event = threading.Event()

客戶端線程能夠等待設置標記

event.wait()

服務器線程能夠設置或重置它

event.set()
event.clear()

若是設置了該標誌,那麼wait方法將不執行任何操做。

若是標誌被清除,wait將被阻塞,直到它再次被設置。

任意數量的線程能夠等待同一個事件。

經過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

event = threading.Event()

def lighter():
    count = 0
    event.set() #先設爲綠燈
    while True:
        if count > 5 and count <10:  #改爲紅燈
            event.clear()   #把標誌位請了
            print("\033[41;1mred light is on ...\033[0m")
        elif count > 10:
            event.set() #變綠燈
            count = 0
        else:
            print("\033[42;1mgreen light is on ...\033[0m")
        time.sleep(1)
        count += 1
def car(name):
    while True:
        if event.is_set():  #表明綠燈
            print("[%s] running..."% name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."% name)
            event.wait()
            print("\033[33;1m[%s] green light is on,start going...\033[0m"% name)


light = threading.Thread(target=lighter,)
light.start()

car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()

這裏還有一個event使用的例子,員工進公司門要刷卡, 咱們這裏設置一個線程是「門」, 再設置幾個線程爲「員工」,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就能夠經過。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
import random

def door():
    door_open_time_counter = 0
    while True:
        if door_swiping_event.is_set():
            print("\033[32;1mdoor opening....\033[0m")
            door_open_time_counter +=1

        else:
            print("\033[31;1mdoor closed...., swipe to open.\033[0m")
            door_open_time_counter = 0 #清空計時器
            door_swiping_event.wait()


        if door_open_time_counter > 3:#門開了已經3s了,該關了
            door_swiping_event.clear()

        time.sleep(0.5)


def staff(n):

    print("staff [%s] is comming..." % n )
    while True:
        if door_swiping_event.is_set():
            print("\033[34;1mdoor is opened, passing.....\033[0m")
            break
        else:
            print("staff [%s] sees door got closed, swipping the card....." % n)
            print(door_swiping_event.set())
            door_swiping_event.set()
            print("after set ",door_swiping_event.set())
        time.sleep(0.5)
door_swiping_event  = threading.Event() #設置事件


door_thread = threading.Thread(target=door)
door_thread.start()



for i in range(5):
    p = threading.Thread(target=staff,args=(i,))
    time.sleep(random.randrange(3))
    p.start()

十3、queque隊列

當必須在多個線程之間安全地交換信息時,隊列在線程編程中特別有用。

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

優先隊列的構造函數。maxsize是一個整數,它設置能夠放置在隊列中的項數的上限。一旦達到此大小,插入將阻塞,直到隊列項被使用。若是maxsize小於或等於0,則隊列大小爲無窮大。

首先檢索值最低的條目(值最低的條目是由已排序的sorted(list(entries))[0])。條目的典型模式是表單中的元組:(priority_number, data)。

exception queue.Empty

當對空的隊列對象調用非阻塞get()(或get_nowait())時引起異常。

exception queue.Full

在隊列對象滿時調用非阻塞put()(或put_nowait())時引起異常。

Queue. qsize ()
Queue. empty () #return True if empty  
Queue. full () # return True if full 
Queue. put (itemblock=Truetimeout=None)
將項目放入隊列。若是可選的args塊爲true, timeout爲None(缺省值),則在空閒槽可用以前,必要時進行塊處理。若是timeout是一個正數,那麼它將阻塞最多的超時秒數,若是在這段時間內沒有可用的空閒插槽,則引起完整的異常。不然(block爲false),若是空閒插槽當即可用,則將一個項放到隊列中,不然將引起徹底異常(在這種狀況下忽略timeout)。
Queue.put_nowait(item)

等效於put(item, False).

Queue.get(block=Truetimeout=None)

從隊列中刪除並返回一個項。若是可選的args塊爲true, timeout爲None(缺省值),則在須要時阻塞,直到有可用的項爲止。若是timeout是一個正數,那麼它將在大多數超時秒內阻塞,若是在這段時間內沒有可用的項,則引起空異常。不然(block爲false),返回一個可當即使用的項,不然拋出空異常(超時在這種狀況下被忽略)。

Queue.get_nowait()

 

等效於get(False).

提供了兩種方法來支持跟蹤入隊任務是否已被守護進程使用者線程徹底處理。

Queue.task_done()

指示之前加入隊列的任務已經完成。由隊列使用者線程使用。對於用於獲取任務的每一個get(),對task_done()的後續調用告訴隊列任務的處理已經完成。

若是join()當前處於阻塞狀態,那麼在處理完全部項以後它將繼續運行(這意味着對於已將()放入隊列的每一個項,都將收到task_done()調用)。

若是調用次數超過放置在隊列中的項的次數,則引起ValueError錯誤。

Queue.join() block直到queue被消費完畢

十4、生產者消費模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

下面來學習一個最基本的生產者消費者模型的例子

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import threading,time

import queue

q = queue.Queue(maxsize=10)

def Producer(name):
    count = 1
    while True:
        q.put("骨頭%s"%count)
        print("生成了骨頭",count)
        count += 1
        time.sleep(0.1)

def Consumer(name):
    # while q.qsize()>0:
    while True:
        print("[%s] 取到[%s] 而且吃了它..."%(name,q.get()))
        time.sleep(1)

p = threading.Thread(target=Producer,args=("Alex",))
c = threading.Thread(target=Consumer,args=("Chengronghua",))
c1 = threading.Thread(target=Consumer,args=("Wangsen",))

p.start()
c.start()
c1.start()

 

import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
def Consumer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()

十5、多進程multiprocessing

multiprocessing是一個包,它支持使用相似於線程模塊的API來生成進程。多處理包提供了本地和遠程併發性,經過使用子進程而不是線程有效地避開了全局解釋器鎖。所以,多處理模塊容許程序員充分利用給定機器上的多個處理器。它能夠在Unix和Windows上運行。

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join(

爲了顯示所涉及的單個進程id,下面是一個擴展現例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import multiprocessing
import time,threading

def thread_run():
    print(threading.get_ident())
def run(name):
    time.sleep(2)
    print('hello',name)
    t = threading.Thread(target=thread_run,)
    t.start()

if __name__=='__main__':

    for i in range(10):
        p = multiprocessing.Process(target=run, args=('bob %s'%i,))
        p.start()
    # p.join()

十6、進程間的通信

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法:

Queues

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process,Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

Pipes

Pipe()函數返回一對由管道鏈接的鏈接對象,管道默認狀況下是雙工(雙向)的。例如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])
    print("from parent:",conn.recv())
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    parent_conn.send("hhh可好")
    p.join()

Pipe()返回的兩個鏈接對象表示管道的兩端。每一個鏈接對象都有send()和recv()方法。請注意,若是兩個進程(或線程)試圖同時從管道的同一端讀寫數據,管道中的數據可能會損壞。固然,同時使用管道的不一樣端部不會有腐敗的風險。

Managers

manager()返回的manager對象控制一個服務器進程,該進程持有Python對象並容許其餘進程使用代理操做它們。

manager()返回的管理器將支持類型列表、字典、名稱空間、鎖、遞歸鎖、信號量、有界信號量、條件、事件、屏障、隊列、值和數組。例如,

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process,Manager
import os
def f(d, l):
    d[os.getpid()] = os.getpid()

    l.append(os.getpid())
    print(l)
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #{} #生成一個字典,可在多個進程間共享和傳遞
        l = manager.list(range(5)) #生成一個列表,可在多個進程間共享和傳遞
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:  #等待結果
            res.join()

        print(d)
        print(l)

進程同步

若是不使用不一樣進程的鎖輸出,則極可能會混淆。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process, Lock


def f(l, i):
    l.acquire()
    print('hello world', i)
    l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(100):
        Process(target=f, args=(lock, num)).start()

十7、進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有兩個方法:

  • apply
  • apply_async
#!/usr/bin/env python
# -*- coding:utf-8 -*-

from multiprocessing import Process, Pool, freeze_support
import time
import os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:',arg,os.getpid())

if __name__ == '__main__':
    # freeze_support()
    pool = Pool(processes=5)    #容許進程池同時放入5個進程
    print("主進程",os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo,args=(i,),callback=Bar) #並行 callback=回調
        # pool.apply(func=Foo, args=(i,)) #串行

    print('end')
    pool.close()
    pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉...

 

 

做業:

題目:簡單主機批量管理工具

需求:

  1. 主機分組
  2. 主機信息配置文件用configparser解析
  3. 可批量執行命令、發送文件,結果實時返回,執行格式以下 
    1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 
    2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 
  4. 主機用戶名密碼、端口能夠不一樣
  5. 執行遠程命令使用paramiko模塊
  6. 批量命令需使用multiprocessing併發

 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import configparser
import paramiko
import os
import sys

BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASEDIR)
from conf import settings


class Mainprogram(object):
    def __init__(self):
        pass
    def interactive(self):
        menu = u'''-----歡迎來到主機批量管理-----
                1. 查看主機信息
                2. 批量執行命令
                3. 批量發送文件
                4. 退出
                '''
        menu_dic = {
            '1': "account_info",
            '2': "batch_run",
            '3': "batch_scp",
            '4': "logout",
        }
        exit_flag = False
        while not exit_flag:
            print(menu)
            user_option = input(">>:").strip()
            if user_option in menu_dic:
                method = menu_dic[user_option]
                if hasattr(self,method):
                    func = getattr(self,method)
                    func()
            else:
                print("Option does not exist!")

    def account_info(self):
        group_info = os.listdir(settings.DB_DIR)
        for group_ini in group_info:
            group = group_ini.split('.')[0]
            print(group)
        select_group = input("你要選擇的組>>:")
        config = configparser.ConfigParser()
        config.read(settings.DB_DIR + ("/%s.ini" % select_group), encoding="utf-8")
        print("組內的服務器有:", config.sections())
        select_host = input("你要選擇的主機>>:")
        print("主機信息爲:", config.items(select_host))

    def batch_run(self):
        pl_cmd = input("請輸入批量執行的命令>>:")
        pl_groups = pl_cmd.split("-g")[1].strip().split(" ")[0].split(',')
        pl_hosts = pl_cmd.split("-h")[1].strip().split(" ")[0].split(',')
        cmd = pl_cmd.split("-cmd")[1].strip()
        def ssh(pl_host):
            for pl_group in pl_groups:
                config = configparser.ConfigParser()
                config.read(settings.DB_DIR + ("/%s.ini" % pl_group), encoding="utf-8")
                if pl_host in config.sections():
                    hostname = config.get(pl_host, "hostname")
                    port = config.get(pl_host, "port")
                    username = config.get(pl_host, "username")
                    password = config.get(pl_host, "password")
                    ssh = paramiko.SSHClient()
                    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                    ssh.connect(hostname=hostname, port=port, username=username, password=password)
                    stdin, stdout, stderr = ssh.exec_command(cmd)
                    res, err = stdout.read(), stderr.read()
                    result = res if res else err
                    print(hostname)
                    print(result.decode())
                    ssh.close()

        t_objs = []  # 存線程實例
        for i in range(len(pl_hosts)):
            t = threading.Thread(target=ssh, args=("%s" % pl_hosts[i - 1],))
            t.start()
            t_objs.append(t)

        for t in t_objs:
            t.join()
    def batch_scp(self):
        put_cmd = input("請輸入批量傳輸文件的命令>>:")
        put_groups = put_cmd.split("-g")[1].strip().split(" ")[0].split(',')
        put_hosts = put_cmd.split("-h")[1].strip().split(" ")[0].split(',')
        put_action = put_cmd.split("-h")[1].strip().split(" ")[0]
        put_local = put_cmd.split("-local")[1].strip().split(" ")[0]
        put_remote = put_cmd.split("-remote")[1].strip().split(" ")[0]
        def scp(put_host):
            for put_group in put_groups:
                config = configparser.ConfigParser()
                config.read(settings.DB_DIR + ("/%s.ini" % put_group), encoding="utf-8")
                if put_host in config.sections():
                    hostname = config.get(put_host, "hostname")
                    port = config.get(put_host, "port")
                    username = config.get(put_host, "username")
                    password = config.get(put_host, "password")
                    transport = paramiko.Transport((hostname, int(port)))
                    transport.connect(username=username, password=password)
                    sftp = paramiko.SFTPClient.from_transport(transport)
                    sftp.put(put_local, put_remote + "%s" % put_local)
                    print(hostname, "文件發送成功!")
                    # sftp.get('/root/oldgirl.txt', 'fromlinux.txt')
                    transport.close()

        t_objs = []  # 存線程實例
        for i in range(len(put_hosts)):
            t = threading.Thread(target=scp, args=("%s" % put_hosts[i - 1],))
            t.start()
            t_objs.append(t)

        for t in t_objs:
            t.join()

    def logout(self):
        sys.exit()
相關文章
相關標籤/搜索