python--(十五步代碼學會進程)

python--(十五步代碼學會進程)python

一.進程的建立json

import time
import os

#os.getpid() 獲取本身進程的id號
#os.getppid() 獲取本身進程的父進程id號

from multiprocessing import Process
def func():
    print("aaa")
    time.sleep(1)
    print("子進程>>>",os.getpid())
    print("該子進程的父進程>>>",os.getppid())
    print(12345)

if __name__ == "__main__":
    p = Process(target=func,)
    p.start()
    print("*" * 10)
    print("主進程>>>",os.getpid())
    print("父進程>>>",os.getppid())

給要執行的函數傳參數
import time
from multiprocessing import Process
def func(x,y):
    print(x)
    time.sleep(1)
    print(y)

if __name__ == "__main__":
    p = Process(target=func,args=("姑娘","來玩啊"))#這是func須要接受的參數的傳輸方式
    p.start()
    print("父進程執行結束")

二.join方法安全

import time
from multiprocessing import Process

驗證join方法
global_num = 100
def func1():
    time.sleep(2)
    global global_num
    global_num = 0
    print("子進程全局變量>>>",global_num)
if __name__ == "__main__":
    p1 = Process(target=func1,)
    p1.start()
    print("子進程執行")
    time.sleep(3)
    p1.join()#阻塞住,等待你的p1子進程執行sing結束,主進程的程序才能從這裏繼續往下執行
    print("主進程的全局變量>>>",global_num)

驗證了一下併發的執行時間
import time
from multiprocessing import Process

def func1(n):
    time.sleep(n)
    print("func1",n)
def func2(n):
    time.sleep(n)
    print("func2",n)
def func3(n):
    time.sleep(n)
    print("func3",n)
if __name__ == "__main__":
    p1 = Process(target=func1,args=(1,))
    p2 = Process(target=func2,args=(2,))
    p3 = Process(target=func3,args=(3,))

    p1.start()
    p2.start()
    p3.start()

for循環在建立進程中的應用
import time
from multiprocessing import Process
def func1(n):
    time.sleep(1)
    print(n)

if __name__ == "__main__":
    pro_list = []
    for i in range(10):
        p1 = Process(target=func1,args=(i,))
        p1.start()
        pro_list.append(p1)
        # p1.join()

    # for p in pro_list:
    #     # p.join()
    p1.join()
    print("主進程結束")
View Code

 

  殭屍進程和孤兒進程服務器

import time
import os
from multiprocessing import Process

def func1():
    time.sleep(30)
    print(os.getpid())
    print('子進程')

if __name__ == '__main__':
    p1 = Process(target=func1,)
    p1.start()
    # p1.join()
    # time.sleep(2)
    # print(p1.pid)
    print('主進程的ID',os.getpid())
    print('主進程結束')
View Code


三.建立進程的兩種方式網絡

import time
from multiprocessing import Process
import os
# import test01
# def func1(n):
#     # time.sleep(1)
#     print(n)
#
# def func2(n):
#     # time.sleep(1)
#     print(n)
#
# def func3(n):
#     # time.sleep(1)
#     print(n)
#
# def func4(n):
#     # time.sleep(1)
#     print(n)
#
# if __name__ == '__main__':
#     p1 = Process(target=func1,args=(1,))
#     p2 = Process(target=func2,args=(2,))
#     p3= Process(target=func3,args=(3,))
#     p4 = Process(target=func4,args=(4,))
#     p1.start() # run()
#     p2.start()
#     p3.start()
#     p4.start()
#     # time.sleep(0.5)
#     print('主進程結束')

    # 以前同步執行的
    # func1(1)
    # func2(2)
    # func3(3)
    # func4(4)

建立進程的第一種方式:
    # p1 = Process(target=func1, args=(1,))
    # p1.start()
建立進行的第二種方式:
    #本身定義一個類,繼承Process類,必須寫一個run方法,想傳參數,自行寫init方法,而後執行super父類的init方法

# class MyProcess(Process):
#     def __init__(self,n,name):
#         super().__init__()
#         self.n = n
#         self.name = name
#
#     def run(self):
#         # print(1+1)
#         # print(123)
#         print('子進程的進程ID',os.getpid())
#         print('你看看n>>',self.n)
#
# if __name__ == '__main__':
#     p1 = MyProcess(100,name='子進程1')
#     p1.start() #給操做系統發送建立進程的指令,子進程建立好以後,要被執行,執行的時候就會執行run方法
#     print('p1.name',p1.name)
#     print('p1.pid',p1.pid)
#     print('主進程結束')
View Code

 

四.進程的其餘方法terminate is_alive.py併發

import time
from multiprocessing import Process
def func1():
    time.sleep(2)
    print()
    print("子進程")
if __name__ == "__main__":
    p1 = Process(target=func1,)
    p1.start()
    p1.terminate() #給操做系統發了一個關閉p1子進程的信號,關閉進程
    time.sleep(1)
    print("進程是否還活着:",p1.is_alive())#是返回True,否返回False
    print(p1.pid)
    print("主進程結束")
View Code

 

五.守護進程app

#守護的子進程跟着主進程走

import time
import os
from multiprocessing import Process
def func():
    time.sleep(5)
    print('子進程', os.getpid())

if __name__ == '__main__':
    p1 = Process(target=func)
    p1.daemon = True   # 設置守護進程, 當主進程結束時所有子進程當即結束
    p1 .start()
    # time.sleep(5.5)
    print('主進程結束')
View Code

 

六.驗證進程之間是空間隔離的dom

import time
from multiprocessing import Process
#進程之間是空間隔離的,不共享資源
global_num = 100
def func1():
    global global_num
    global_num = 0
    print("子進程全局變量>>>",global_num)
if __name__ == "__main__":
    p1 = Process(target=func1,)
    p1.start()
    time.sleep(1)
    print("主進程的全局變量>>>",global_num)
View Code

 

七.子進程中不能使用input異步

from multiprocessing import Process

def func1():
    s = input('>>>')

if __name__ == '__main__':

    p1 = Process(target=func1,)
    p1.start()

    # a = input('>>>:')
    print('主進程結束')

##報錯
View Code


 

八.進程鎖socket

  ticket_lock = Lock()#建立鎖  .acquire()#加鎖,  .release()#解鎖

同步鎖的做用:#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 # 雖然能夠用文件共享數據實現進程間通訊,但問題是:
# 1.效率低(共享數據基於文件,而文件是硬盤上的數據) # 2.須要本身加鎖處理
import json
import time
import random
from multiprocessing import Process,Lock

def get_ticket(i,ticket_lock):
    print("咱們都到齊了,你們預備!!123")
    time.sleep(1)
    #全部代碼 異步執行,到這裏等待,同時再去搶下面的代碼執行
    ticket_lock.acquire()
    #這裏有個門,只有一我的可以搶到這個鑰匙,加鎖
    with open("ticket","r") as f:
        last_ticket_info = json.load(f)
        #將文件數據load爲字典類型的數據
    last_ticket = last_ticket_info["count"]
    print(last_ticket)
    #查看一下餘票的信息
    if last_ticket > 0:
        #若是看到餘票大於零,說明你能夠搶到票
        time.sleep(random.random())
        #模擬網絡延遲時間
        last_ticket = last_ticket - 1
        last_ticket_info["count"] = last_ticket
        with open("ticket","w") as f:
            #將修改後的參數寫回文件
            json.dump(last_ticket_info,f)
        print("%s號搶到了,丫nb!" % i)
    else:
        print("%s號傻逼,沒票了,明年再來" % i)
        ticket_lock.release()
if __name__ ==  "__main__":
    ticket_lock = Lock()
    #建立一個進程鎖
    for i in range(10):
        p = Process(target=get_ticket, args=(i, ticket_lock))
        p.start()
進程鎖模擬購票系統

 

九.信號量

Semaphore()
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
假設商場裏有4個迷你唱吧,因此同時能夠進去4我的,若是來了第五我的就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器爲0時,acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念
import time
import random
from multiprocessing import Process,Semaphore

def dbj(i,s):
    s.acquire()
    print('%s號男主人公來洗腳'%i)
    print('-------------')
    time.sleep(random.randrange(3,6))
    # print(time.time())
    s.release()

if __name__ == '__main__':
    s = Semaphore(4) #建立一個計數器,每次acquire就減1,直到減到0,那麼上面的任務只有4個在同時異步的執行,後面的進程須要等待.
    for i in range(10):
        p1 = Process(target=dbj,args=(i,s,))
        p1.start()
View Code
 

十.事件

e = Event()#  e.set()#將e改成True  e.clear()   # 將e改成False
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
from multiprocessing import Process, Event
e = Event() #False True

print(e.is_set())
e.set() #將e事件的狀態改成True
print("在這裏等待")
e.clar()   #將e事件的狀態改成False
print("111")
e.wait()
print("是真的嗎")
View Code
import time
from multiprocessing import Process,Event

#模擬紅綠燈執行狀態的函數
def traffic_lights(e):
    while 1:
        print("紅燈啦")
        time.sleep(5)
        e.set() #將e改成True
        print("綠燈了")
        time.sleep(3)
        e.clear()   #將e改成False
def car(i,e):
    if not e.is_set():  #新來的車看到的是紅燈
        print("咱們在等待....")
        e.wait()
        print("走你")
    else:
        print("能夠走了!!!")
if __name__ == "__main__":
    e = Event()
    hld = Process(target=traffic_lights, args=(e,))
    hld.start()
    while 1:
        time.sleep(0.5)
        #建立10個車
        for i in range(3):
            p1 = Process(target=car,args=(i,e,))
            p1.start()
Even模擬紅綠燈

 

十一.隊列

# 遵循先進先出的原則   q = Queue(3)    建立3個隊列 q.put()發送數據  q.get()接受數據
q = Queue([maxsize])  #建立共享的進程隊列

q.get( [ block [ ,timeout ] ] )  #返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) #和q.get(False)方法,同樣

q.put(item [, block [,timeout ] ] ) #將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() #返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。

q.empty() #若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

.full() #若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。

q.close() #關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread()  #不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread() #鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。
隊列的相關方法
 
from multiprocessing import Process,Queue
#先進先出
q = Queue(3)
q.put(1)
q.put(2)
# print(q.full()) #q.full()隊列滿了返回True,不滿返回False
q.put(3)
# print('>>>>',q.full())

q.get_nowait()= ()  #不會阻塞住,至關於空隊列

# try:
#     q.get(False)  # queue.Empty
#     q.get_nowait() #queue.Empty
# except:
#     print('隊列目前是空的')

# while 1:
#     try:
#         q.get(False)  #queue.Empty
#     except:
#         print('隊列目前是空的')
View Code

  隊列實現進程的通訊

import time
from multiprocessing import Process,Queue

def girl(q):
    print('來自boy的信息',q.get())
    print('來自校領導的凝視',q.get())
def boy(q):
    q.put('約嗎')

if __name__ == '__main__':
    q = Queue(5)
    boy_p = Process(target=boy,args=(q,))
    girl_p = Process(target=girl,args=(q,))
    boy_p.start()
    girl_p.start()
    time.sleep(1)
    q.put('好好工做,別亂搞')
隊列實現進程的通訊


十二.生產者消費者模式

#生產者消費者模型總結

    #程序中有兩類角色
        一類負責生產數據(生產者)
        一類負責處理數據(消費者)
        
    #引入生產者消費者模型爲了解決的問題是:
        平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度
        
    #如何實現:
        生產者<-->隊列<——>消費者
    #生產者消費者模型實現類程序的解耦和

 

import time
from multiprocessing import Process,Queue
def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生產了包子%s號' % i)
        q.put(i)
    q.put(None)  #針對第三個版本的消費者,往隊列裏面加了一個結束信號
#版本1
# def consumer(q):
#     while 1:
#         time.sleep(2)
#         s = q.get()
#         print('消費者吃了%s包子' % s)

#版本2
# def consumer(q):
#     while 1:
#         time.sleep(0.5)
#         try:
#             s = q.get(False)
#             print('消費者吃了%s包子' % s)
#         except:
#             break

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消費者吃了%s包子' % s)
生產者消費者模型
生產者消費者模型
import time
from multiprocessing import Process,Queue

def producer(q):
    for i in range(1,11):
        time.sleep(1)
        print('生產了包子%s號' % i)
        q.put(i)

def consumer(q):
    while 1:
        time.sleep(2)
        s = q.get()
        if s == None:
            break
        else:
            print('消費者吃了%s包子' % s)

if __name__ == '__main__':
    #經過隊列來模擬緩衝區,大小設置爲20
    q = Queue(20)
    #生產者進程
    pro_p = Process(target=producer,args=(q,))
    pro_p.start()
    #消費者進程
    con_p = Process(target=consumer,args=(q,))
    con_p.start()
    pro_p.join()

    q.put(None)
生產者消費者模型主進程發送結束信號
 1 #生產者消費者模型
 2 import time
 3 from multiprocessing import Process,Queue,JoinableQueue
 4 
 5 def producer(q):
 6     for i in range(1,11):
 7         time.sleep(0.5)
 8         print('生產了包子%s號' % i)
 9         q.put(i)
10     q.join()
11     print('在這裏等你')
12 def consumer(q):
13     while 1:
14         time.sleep(1)
15         s = q.get()
16         print('消費者吃了%s包子' % s)
17         q.task_done()  #給q對象發送一個任務結束的信號
18 
19 if __name__ == '__main__':
20     #經過隊列來模擬緩衝區,大小設置爲20
21     q = JoinableQueue(20)
22     #生產者進程
23     pro_p = Process(target=producer,args=(q,))
24     pro_p.start()
25     #消費者進程
26     con_p = Process(target=consumer,args=(q,))
27     con_p.daemon = True #
28     con_p.start()
29     pro_p.join()
30     print('主進程結束')
JoinableQueue的生產者消費者模型

 

 十三.管道

 from multiprocessing import Process,Pipe
conn1,conn2 = Pipe()
進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可),會致使數據不安全的狀況出現

 

# 管道
from multiprocessing import Process,Pipe
import time
# conn1,conn2 = Pipe()
# conn1.send("你好")
# print(">>>>>")
# msg = conn2.recv()
# print(msg)

# def func1(conn2):
#     try:
#         msg = conn2.recv()
#         print(">>>",msg)
#         #若是管道一端關閉了,那麼另一端在接收消息的時候回報錯
#     except EOFError:
#         print("對方管道一端已經關閉")
#         conn2.close()
# if __name__ == '__main__':
#     conn1,conn2 = Pipe()
#     p = Process(target=func1,args=(conn2,))
#     p.start()
#     conn1.send("收到了嗎")

def func1(conn1,conn2):
    msg = conn2.recv()  #阻塞
    print(">>>>",msg)
if __name__ == '__main__':
    conn1,conn2 = Pipe()
    p = Process(target=func1, args=(conn1, conn2,))
    p.start()
    conn1.send("收到了嗎")
    conn1.close()
    #conn1.recv()        #OSError: handle is closed
View Code

 

十四.數據共享(不安全)

 

# 數據共享
# from multiprocessing import Process,Manager
#
# def func(m_dic):
#     m_dic["輝哥"] = "輝哥大帥比"
# if __name__ == '__main__':
#     m = Manager()
#     m_dic = m.dict({"輝哥":"輝哥帥不帥"})
#     print("主進程",m_dic)
#     p = Process(target=func, args=(m_dic,))
#     p.start()
#     p.join()
#     print("主進程2",m_dic)

# 數據共享manager不安全
# from multiprocessing import Process,Manager,Lock
# def func(m_dic, ml):
#     """不加鎖的狀況會出現數據錯亂
#         m_dic["count"] -= 1
#         下面是加鎖的另外一種形式
#         等同 : ml.acquire()
#                m_dic["count"] -= 1
#                ml.release()"""
#     with ml:
#         m_dic["count"] -= 1
# if __name__ == '__main__':
#     m = Manager()
#     ml = Lock()
#     m_dic = m.dict({"count":100})
#     p_list = []
#     for i in range(20):
#         p1 = Process(target=func,args=(m_dic, ml,))
#         p1.start()
#         p_list.append(p1)
#     [pp.join() for pp in p_list]
#     print("主進程",m_dic)
View Code

 

十五.進程池

  multiprocess.Poll模塊

建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務(高級一些的進程池能夠根據你的併發量,搞成動態增長或減小進程池中的進程數量的操做),不會開啓其餘進程,提升操做系統效率,減小空間的佔用等。

進程池相關方法:

p.apply(func [, args [, kwargs]]):
在一個池工做進程中執行func(*args,**kwargs),而後返回結果。 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):
在一個池工做進程中執行func(
*args,**kwargs),而後返回結果。 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。''' p.close():
關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.jion():
等待全部工做進程退出。此方法只能在close()或teminate()以後調用 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。 obj.ready():若是調用完成,返回True obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常 obj.wait([timeout]):等待結果變爲可用。 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

 

import time
from multiprocessing import Process,Pool

def func(n):
    print(n)


if __name__ == '__main__':
    pool = Pool(4)
    # pool.map(func,range(100)) #參數是可迭代的
    pool.map(func,['sb',(1,2)]) #參數是可迭代的
進程池的map傳參

 

# 進程池
import time
from multiprocessing import Process,Pool
def func(n):
    for i in range(5):
        time.sleep(1)
        n= n + i
    print(n)
if __name__ == '__main__':
    #用時間驗證一下傳參
    pool_start_time = time.time()
    pool = Pool(4) #4個進程
    pool.map(func,range(100))
    #map(方法,可迭代對象) 映射 自帶join功能,異步執行任務
    pool_end_time = time.time()
    pool_dif_time = pool_end_time - pool_start_time

    #多進程的執行時間
    # p_s_time = time.time()
    # p_list = []
    # for i in range(200):
    #     p1 = Process(target=func, args=(i,))
    #     p1.start()
    #     p_list.append(p1)
    # [p.join() for p in p_list]
    # p_e_time = time.time()
    # p_dif_time = p_e_time - p_s_time

    print('進程池的執行時間', pool_dif_time)
    print('多進程的執行時間', p_dif_time)
進程池和多進程時間對比

 

import time
from multiprocessing import Process,Pool
def fun(i):
    time.sleep(0.5)
    return i**2
if __name__ == '__main__':
    p = Pool(4)
    for i in range(10):
        res = p.apply(fun,args=(i,))
        # apply 同步執行的進程方法,他會等待你的任務的返回結果
        print(res)
進程池的同步方法 apply

 

import time
from multiprocessing import Process,Pool
def fun(i):
    time.sleep(1)
    print(i)
    return i**2
if __name__ == '__main__':
    p = Pool(4)
    res_list = []
    for i in range(10):
        res = p.apply_async(fun,args=(i,))
        # #同步執行的方法,他會等待你的任務的返回結果,
        res_list.append(res)

    p.close() # 不是關閉進程池,而是不容許再有其餘任務來使用進程池
    p.join()  # 這是感知進程池中任務的方法,進程池中全部的進程隨着主進程的結束而結束了,等待進程池的任務所有執行完
    for e_res in res_list:
        print("結果", e_res.get())
進程池的異步方法apply_async

 

回調函數:

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,可是咱們也能夠經過進程通訊來拿到返回值,進程池的這個回調也是進程通訊的機制完成的。

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果

import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    # print('func1')
    return n*n


def func2(nn):
    print('func2>>',os.getpid())
    # print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主進程:',os.getpid())
    p = Pool(4)
    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()
進程池的回調函數

 進程池版的socket併發聊天代碼示例:

#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count())
#開啓6個客戶端,會發現2個客戶端處於等待狀態
#在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('進程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

複製代碼
sserver 服務端:ftp

 

from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
client 用戶端:ftp

 

進程池中爬蟲示例:

 

from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
爬蟲示例
相關文章
相關標籤/搜索