多任務—進程

1、進程以及狀態服務器

1. 進程併發

程序:例如xxx.py這是程序,是一個靜態的app

進程:一個程序運行起來後,代碼+用到的資源 稱之爲進程,它是操做系統分配資源的基本單元。dom

 

2. 進程的狀態異步

工做中,任務數每每大於cpu的核數,即必定有一些任務正在執行,而另一些任務在等待cpu進行執行,所以致使了有了不一樣的狀態socket

  • 就緒態:當進程已分配到除CPU之外的全部必要的資源,只要得到處理機即可當即執行,這時的進程狀態稱爲就緒狀態。async

  • 執行態:當進程已得到處理機,其程序正在處理機上執行,此時的進程狀態稱爲執行狀態。tcp

  • 等待態:正在執行的進程,因爲等待某個事件發生而沒法執行時,便放棄處理機而處於阻塞狀態。引發進程阻塞的事件可有多種,例如,等待I/O完成、申請緩衝區不能知足、等待信件(信號)、一個程序sleep等ide

 

3.同步和異步函數

所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。

所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列。

 

4.阻塞與非阻塞

阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的

  1. 同步阻塞形式

  效率最低。銀行排隊。

  1. 異步阻塞形式

  若是在銀行等待辦理業務的人採用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間裏他不能離開銀行作其它的事情,那麼很顯然,這我的被阻塞在了這個等待的操做上面;

  異步操做是能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知時被阻塞。

  1. 同步非阻塞形式

  其實是效率低下的。

  想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的。

  1. 異步非阻塞形式

  效率更高,

  由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換。

  好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。

 

不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來,一樣的,不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞。

 

2、進程的建立

1.進程的建立-multiprocessing

multiprocessing模塊就是跨平臺版本的多進程模塊,提供了一個Process類來表明一個進程對象,這個對象能夠理解爲是一個獨立的進程,能夠執行另外的事情

 from multiprocessing import Process
 import time
 
 
 def func1():
     while True:
         print("test1")
         time.sleep(1)
 
 def func2():
     while True:
         print("test2")
         time.sleep(1)
 
 
 if __name__ == '__main__':
     p1 = Process(target=func1)
     p2 = Process(target=func2)
     p1.start()
     p2.start()
建立進程

說明

  • 建立子進程時,只須要傳入一個執行函數和函數的參數,建立一個Process實例,用start()方法啓動

 

2. 進程pid

 from multiprocessing import Process
 import time
 import os
 
 
 def func():
     print("子進程的pid:%d" % os.getpid())   # os.getpid 獲取當前進程的進程號
     print("子進程的父進程pid:%d" % os.getppid())    # os.getppid 獲取當前進程的父進程的進程號
 
 
 if __name__ == "__main__":
     print("父進程pid:%d" % os.getpid())
     p = Process(target=func)
     p.start()
                                                                                                 
進程pid

 

3. Process語法結構以下:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target:若是傳遞了函數的引用,能夠任務這個子進程就執行這裏的代碼
  • args:給target指定的函數傳遞的參數,以元組的方式傳遞
  • kwargs:給target指定的函數傳遞命名參數
  • name:給進程設定一個名字,能夠不設定
  • group:指定進程組,大多數狀況下用不到

Process建立的實例對象的經常使用方法:

  • start():啓動子進程實例(建立子進程)
  • is_alive():判斷進程子進程是否還在活着
  • join([timeout]):是否等待子進程執行結束,或等待多少秒
  • terminate():無論任務是否完成,當即終止子進程

Process建立的實例對象的經常使用屬性:

  • name:當前進程的別名,默認爲Process-N,N爲從1開始遞增的整數
  • pid:當前進程的pid(進程號)

 

4. 給子進程指定的函數傳遞參數

 from multiprocessing import Process
 import time
 import os
 
 
 def func(name, age, **kwargs):
     for i in range(10):
         print("子進程運行中:name=%s,age=%d,pid=%d" % (name, age, os.getpid()))
         print(kwargs)
         time.sleep(0.2)
 
 
 if __name__ == "__main__":
     p = Process(target=func, args=('test', 11), kwargs={"ab":'ab'})
     p.start()
     time.sleep(1)   # 1秒後結束子進程
     p.terminate()
     p.join()

# 運行結果
子進程運行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子進程運行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子進程運行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子進程運行中:name=test,age=11,pid=4260
{'ab': 'ab'}
子進程運行中:name=test,age=11,pid=4260
{'ab': 'ab'}
View Code

 

5. 進程間不共享全局變量

 from multiprocessing import Process
 import time
 import os
 
 
 num_list = [11,22,33]
 
 def func1():
     print("in func1:pid=%d, num_list=%s" % (os.getpid(), num_list))
     for i in range(3):
         num_list.append(i)
         time.sleep(1)
         print("in func1:pid=%d, num_list=%s" % (os.getpid(), num_list))
 
 
 def func2():
     print("in func2:pid=%d, num_list=%s" % (os.getpid(), num_list))
 
 
 if __name__ == "__main__":
     p1 = Process(target=func1)
     p1.start()
     p1.join()
     p2 = Process(target=func2)
     p2.start()

# 執行結果:
in func1:pid=4344, num_list=[11, 22, 33]
in func1:pid=4344, num_list=[11, 22, 33, 0]
in func1:pid=4344, num_list=[11, 22, 33, 0, 1]
in func1:pid=4344, num_list=[11, 22, 33, 0, 1, 2]
in func2:pid=4345, num_list=[11, 22, 33]
View Code

 

6.守護進程

守護進程會隨着主進程的結束而結束。

主進程建立守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

 from multiprocessing import Process
 import time
 import os
 
 
 class MyProcess(Process):
 
     def run(self):
         print(os.getpid(), self.name)
 
 
 p = MyProcess()
 p.daemon = True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p
 即終止運行
 p.start()
 time.sleep(10) # 在sleep時查看進程id對應的進程ps -ef|grep id
 print('')
守護進程開啓
 from multiprocessing import Process
 import time
 
 
 def func1():
     print("123")
     time.sleep(1)
     print("123end")
 
 
 def func2():
     print("456")
     time.sleep(3)
     print("456end")
 
 
 p1 = Process(target=func1)
 p2 = Process(target=func2)
 p1.daemon = True
 p1.start()
 p2.start()
 time.sleep(0.1)
 print("main-------")#打印該行則主進程代碼結束,則守護進程p1應該被終止.#可能會有p1任務執行的打印信息123,由於主進程打印main----時,p1也執行了,可是隨即被終止.


# 執行結果:
123
456
main-------
456end
主進程代碼執行結束守護進程當即結束

 

 7.使用多進程實現socket服務端的併發效果

 from multiprocessing import Process
 import socket
 
 
 def server(conn):
     recv_data = conn.recv(1024).decode('utf-8')
     print(recv_data)
     conn.send("抱着妹妹上花轎".encode('utf-8'))
     conn.close()
 
 if __name__ == "__main__":
     tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     tcp_socket.bind(('192.168.1.1',8010))
     tcp_socket.listen(128)
     while True:
         conn, addr = tcp_socket.accept()
         p = Process(target=server, args=(conn,))
         p.start()
server
 from socket import *
 
 
 # 建立套接字 
 tcp_client_socket = socket(AF_INET, SOCK_STREAM)
 
 # 目的地址
 server_ip = input("服務端ip:")
 server_port = input("服務端端口:")
 
 # 連接服務器
 tcp_client_socket.connect((server_ip, int(server_port)))
 
 # 客戶端發送信息
 send_data = input("輸入發送的信息:")
 tcp_client_socket.send(send_data.encode('utf-8'))
 
 # 接受服務端發來的信息
 recv_data = tcp_client_socket.recv(1024)
 print("收到的信息:%s" % recv_data.decode('utf-8'))
 
 # 關閉套接字
 tcp_client_socket.close()
client

 

 3、進程間通訊-Queue

1. Queue的使用

可使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,Queue自己是一個消息列隊程序,首先用一個小實例來演示一下Queue的工做原理:

 from multiprocessing import Queue
 
 
 q = Queue(3)    # 初始哈Queue對象,最多可接手3條put
 q.put(1)
 q.put(2)
 print(q.full())   # False
 q.put(3)
 print(q.full())   # True
 
 
 #由於消息列隊已滿下面的try都會拋出異常,第一個try會等待2秒後再拋出異常,第二個Try會馬上拋出異常
 try:
     q.put(4, True, 2)
 except:
     print("消息隊列已滿:現有消息數量%s" % q.qsize())
 
 try:
     q.put_nowait(4)
 except:
     print("消息隊列已滿:現有消息數量%s" % q.qsize())
 
 #推薦的方式,先判斷消息列隊是否已滿,再寫入
 if not q.full():
     q.put_nowait(4)
 
 #讀取消息時,先判斷消息列隊是否爲空,再讀取
 if not q.empty():
     for i in range(q.qsize()):
         print(q.get_nowait())
                                              
View Code

說明

初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最大可接收的消息數量,或數量爲負值,那麼就表明可接受的消息數量沒有上限(直到內存的盡頭);

  • Queue.qsize():返回當前隊列包含的消息數量;

  • Queue.empty():若是隊列爲空,返回True,反之False ;

  • Queue.full():若是隊列滿了,返回True,反之False;

  • Queue.get([block[, timeout]]):獲取隊列中的一條消息,而後將其從列隊中移除,block默認值爲True;

1)若是block使用默認值,且沒有設置timeout(單位秒),消息列隊若是爲空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息爲止,若是設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出"Queue.Empty"異常;

2)若是block值爲False,消息列隊若是爲空,則會馬上拋出"Queue.Empty"異常;

  • Queue.get_nowait():至關Queue.get(False);

  • Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值爲True;

1)若是block使用默認值,且沒有設置timeout(單位秒),消息列隊若是已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間爲止,若是設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;

2)若是block值爲False,消息列隊若是沒有空間可寫入,則會馬上拋出"Queue.Full"異常;

  • Queue.put_nowait(item):至關Queue.put(item, False);

 

2. Queue實例

咱們以Queue爲例,在父進程中建立兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據:

 from multiprocessing import Process, Queue
 import time,random
 
 
 # 寫數據進程執行的代碼:
 def write(q):
     for i in range(3):
         print("puting %d in queue" % i)
         q.put(i)
         time.sleep(random.random())
 
 
 def read(q):
     while True:
         if not q.empty():
             print("%i reading in queue" % q.get(True))
             time.sleep(random.random())
         else:
             break
 
 if __name__ == "__main__":
     q = Queue()
     wp = Process(target=write, args=(q,))
     rp = Process(target=read, args=(q,))
 
     wp.start()   # 啓動子進程wq,寫入:
     wp.join()    # 等待pw結束:
 
     rp.start()
     rp.join()
     print('全部數據都寫入而且讀完')
View Code

 

4、進程池Pool

當須要建立的子進程數量很少時,能夠直接利用multiprocessing中的Process動態成生多個進程,但若是是上百甚至上千個目標,手動的去建立進程的工做量巨大,此時就能夠用到multiprocessing模塊提供的Pool方法。

初始化Pool時,能夠指定一個最大進程數,當有新的請求提交到Pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,纔會用以前的進程來執行新的任務,請看下面的實例:

 from multiprocessing import Process, Pool
 import time, os, random
 
 
 def func(num):
     start_time = time.time()
     print("%d開始執行,進程號%s" %(num,os.getpid()))
     time.sleep(random.random())
     end_time = time.time()
     print("%d執行完畢,耗時%0.2f" % (num,end_time-start_time))
 
 
 po = Pool(3)
 for i in range(10):
     # Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,))
     # 每次循環將會用空閒出來的子進程去調用目標
     po.apply_async(func, (i,))
 
 print("--start--")
 po.close()   # 關閉進程池,關閉後po再也不接收新的請求
 po.join()   # 等待po中全部子進程執行完成,必須放在close語句以後
 print("--end--")


# 執行結果:
--start--
0開始執行,進程號4333
1開始執行,進程號4334
2開始執行,進程號4332
0執行完畢,耗時0.65
3開始執行,進程號4333
1執行完畢,耗時0.85
4開始執行,進程號4334
2執行完畢,耗時0.95
5開始執行,進程號4332
4執行完畢,耗時0.43
6開始執行,進程號4334
5執行完畢,耗時0.41
7開始執行,進程號4332
3執行完畢,耗時0.85
8開始執行,進程號4333
6執行完畢,耗時0.51
9開始執行,進程號4334
9執行完畢,耗時0.24
8執行完畢,耗時0.54
7執行完畢,耗時0.75
--end--
View Code

multiprocessing.Pool經常使用函數解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args爲傳遞給func的參數列表,kwds爲傳遞給func的關鍵字參數列表;
  • close():關閉Pool,使其再也不接受新的任務;
  • terminate():無論任務是否完成,當即終止;
  • join():主進程阻塞,等待子進程的退出, 必須在close或terminate以後使用;

 

進程池中的Queue

若是要使用Pool建立進程,就須要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),不然會獲得一條以下的錯誤信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的實例演示了進程池中的進程如何通訊:

 from multiprocessing import Manager, Pool
 import os, time, random
 
 
 def write(q):
     print("write啓動(%s),父進程(%s)" % (os.getpid(), os.getppid()))
     for i in range(3):
         q.put(i)
 
 def read(q):
     print("read啓動(%s), 父進程(%s)" % (os.getpid(), os.getppid()))
     for i in range(q.qsize()):
         print("正在從隊列讀取消息%d" % q.get())
 
 if __name__ == "__main__":
     print("start:%s" % os.getpid())
     q = Manager().Queue()
     po = Pool()
     po.apply_async(write, (q,))
     time.sleep(1)   # 先讓上面的任務向Queue存入數據,而後再讓下面的任務開始從中取數據
 
     po.apply_async(read, (q,))
 
     po.close()
     po.join()
     print("end:%s" % os.getpid())

# 執行結果:
start:4586
write啓動(4592),父進程(4586)
read啓動(4592), 父進程(4586)
正在從隊列讀取消息0
正在從隊列讀取消息1
正在從隊列讀取消息2
end:4586
View Code

 

應用:文件夾copy器(多進程版)

 import multiprocessing, time, os, random
 
 
 def copy_file(queue, file_name, need_copy_file, new_file):
     f_read = open(need_copy_file + '/' + file_name, 'rb')
     f_write = open(new_file + '/' + file_name, 'wb')
     while True:
         time.sleep(random.random())
         content = f_read.read()
         if content:
             f_write.write(content)
         else:
             break
     f_read.close()
     f_write.close()
     # 發送已經拷貝完畢的文件名字
     queue.put(file_name)
 
 
 if __name__ == "__main__":
 
     # 獲取想要copy的文件夾
     need_copy_file = input("輸入文件名:")
     # 新的文件夾名稱
     new_file = need_copy_file + "[副本]"
     # 建立目標文件夾
     try:
         os.mkdir(new_file)
     except:
         print("建立文件失敗")
     # 獲取這個文件夾中全部的普通文件名
     file_names = os.listdir(need_copy_file)
 
     # 建立隊列
     queue = multiprocessing.Manager().Queue()
     # 建立進程池
     pool = multiprocessing.Pool(3)
     # 向進程池中添加任務
     for file_name in file_names:
         pool.apply_async(copy_file, (queue, file_name, need_copy_file, new_file))
     pool.close()
     # 主進程顯示進度
     all_file_nums = len(file_names)
     while True:
         file_name = queue.get()
         if file_name in file_names:
             file_names.remove(file_name)
         copy_rate = (all_file_nums - len(file_names))*100/all_file_nums
         print("\r%.2f...(%s)" % (copy_rate, file_name) + " "*50, end="")
         if copy_rate >= 100:
             print("\ncopy完成")
             break
View Code
相關文章
相關標籤/搜索