python 協程池和pool.map用法

1、問題描述

如今有一段代碼,須要掃描一個網段內的ip地址,是否能夠ping通。html

執行起來效率太慢,須要使用協程。python

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()


def custom_print(content,colour='white'):
    """
    寫入日誌文件
    :param content: 內容
    :param colour: 顏色
    :return: None
    """
    # 顏色代碼
    colour_dict = {
        'red': 31,  # 紅色
        'green': 32,  # 綠色
        'yellow': 33,  # 黃色
        'blue': 34,  # 藍色
        'purple_red': 35,  # 紫紅色
        'bluish_blue': 36, # 淺藍色
        'white': 37,  # 白色
    }
    choice = colour_dict.get(colour)  # 選擇顏色

    info = "\033[1;{};1m{}\033[0m".format(choice, content)
    print(info)


def execute_linux2(cmd, timeout=10, skip=False):
    """
    執行linux命令,返回list
    :param cmd: linux命令
    :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒
    :param skip: 是否跳過超時限制
    :return: list
    """
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

    t_beginning = time.time()  # 開始時間
    while True:
        if p.poll() is not None:
            break
        seconds_passed = time.time() - t_beginning
        if not skip:
            if seconds_passed > timeout:
                # p.terminate()
                # p.kill()
                # raise TimeoutError(cmd, timeout)
                custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red")
                # 當shell=True時,只有os.killpg才能kill子進程
                try:
                    # time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)
                except Exception as e:
                    pass
                return False

    result = p.stdout.readlines()  # 結果輸出列表
    return result


class NetworkTest(object):
    def __init__(self):
        self.flag_list = []

    def check_ping(self,ip):
        """
        檢查ping
        :param ip: ip地址
        :return: none
        """
        cmd = "ping %s -c 2" % ip
        # print(cmd)
        # 本機執行命令
        res = execute_linux2(cmd,2)
        # print(res)
        if not res:
            custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red")
            self.flag_list.append(False)
            return False

        res.pop()  # 刪除最後一個元素
        last_row = res.pop().decode('utf-8').strip()  # 再次獲取最後一行結果
        if not last_row:
            custom_print("錯誤,執行命令: {} 異常","red")
            self.flag_list.append(False)
            return False

        res = last_row.split()  # 切割結果
        # print(res,type(res),len(res))
        if len(res) <10:
            custom_print("錯誤,切割 ping 結果異常","red")
            self.flag_list.append(False)
            return False

        if res[5] == "0%":  # 判斷丟包率
            custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green")
        else:
            self.flag_list.append(False)
            custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red")

    def main(self):
        """
        主程序
        :return:
        """
        for num in range(1, 256):
            ip = '192.168.10.{}'.format(num)
            self.check_ping(ip)


if __name__ == '__main__':
    startime = time.time()  # 開始時間

    NetworkTest().main()

    endtime = time.time()
    take_time = endtime - startime

    if take_time < 1:  # 判斷不足1秒時
        take_time = 1  # 設置爲1秒
    # 計算花費時間
    m, s = divmod(take_time, 60)
    h, m = divmod(m, 60)

    custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
View Code

 

改形成,協程執行。linux

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()


def custom_print(content,colour='white'):
    """
    寫入日誌文件
    :param content: 內容
    :param colour: 顏色
    :return: None
    """
    # 顏色代碼
    colour_dict = {
        'red': 31,  # 紅色
        'green': 32,  # 綠色
        'yellow': 33,  # 黃色
        'blue': 34,  # 藍色
        'purple_red': 35,  # 紫紅色
        'bluish_blue': 36, # 淺藍色
        'white': 37,  # 白色
    }
    choice = colour_dict.get(colour)  # 選擇顏色

    info = "\033[1;{};1m{}\033[0m".format(choice, content)
    print(info)


def execute_linux2(cmd, timeout=10, skip=False):
    """
    執行linux命令,返回list
    :param cmd: linux命令
    :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒
    :param skip: 是否跳過超時限制
    :return: list
    """
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

    t_beginning = time.time()  # 開始時間
    while True:
        if p.poll() is not None:
            break
        seconds_passed = time.time() - t_beginning
        if not skip:
            if seconds_passed > timeout:
                # p.terminate()
                # p.kill()
                # raise TimeoutError(cmd, timeout)
                custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red")
                # 當shell=True時,只有os.killpg才能kill子進程
                try:
                    # time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)
                except Exception as e:
                    pass
                return False

    result = p.stdout.readlines()  # 結果輸出列表
    return result


class NetworkTest(object):
    def __init__(self):
        self.flag_list = []

    def check_ping(self,ip):
        """
        檢查ping
        :param ip: ip地址
        :return: none
        """
        cmd = "ping %s -c 2" % ip
        # print(cmd)
        # 本機執行命令
        res = execute_linux2(cmd,2)
        # print(res)
        if not res:
            custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red")
            self.flag_list.append(False)
            return False

        res.pop()  # 刪除最後一個元素
        last_row = res.pop().decode('utf-8').strip()  # 再次獲取最後一行結果
        if not last_row:
            custom_print("錯誤,執行命令: {} 異常","red")
            self.flag_list.append(False)
            return False

        res = last_row.split()  # 切割結果
        # print(res,type(res),len(res))
        if len(res) <10:
            custom_print("錯誤,切割 ping 結果異常","red")
            self.flag_list.append(False)
            return False

        if res[5] == "0%":  # 判斷丟包率
            custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green")
        else:
            self.flag_list.append(False)
            custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red")

    def main(self):
        """
        主程序
        :return:
        """
        process_list = []
        for num in range(1, 256):
            ip = '192.168.10.{}'.format(num)
            # self.check_ping(ip)
            # 將任務加到列表中
            process_list.append(gevent.spawn(self.check_ping, ip))

        gevent.joinall(process_list)  # 等待全部協程結束


if __name__ == '__main__':
    startime = time.time()  # 開始時間

    NetworkTest().main()

    endtime = time.time()
    take_time = endtime - startime

    if take_time < 1:  # 判斷不足1秒時
        take_time = 1  # 設置爲1秒
    # 計算花費時間
    m, s = divmod(take_time, 60)
    h, m = divmod(m, 60)

    custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
View Code

 

執行輸出:shell

...
錯誤, 命令: ping 192.168.10.250 -c 2,本地執行超時!
錯誤, 執行命令: ping 192.168.10.250 -c 2 失敗
錯誤, 命令: ping 192.168.10.255 -c 2,本地執行超時!
錯誤, 執行命令: ping 192.168.10.255 -c 2 失敗
本次花費時間 00:00:07

 

注意:切勿在windows系統中運行,不然會報錯windows

AttributeError: module 'os' has no attribute 'setsid'

 

2、使用協程池

上面直接將全部任務加到列表中,而後一次性,所有異步執行。那麼同一時刻,最多有多少任務執行呢?服務器

不知道,可能有256個吧?併發

注意:若是這個一個很耗CPU的程序,可能會致使服務器,直接卡死。app

 

那麼,咱們應該要限制它的併發數。這個時候,須要使用協程池,固定併發數。異步

好比:固定爲100個ide

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()


def custom_print(content,colour='white'):
    """
    寫入日誌文件
    :param content: 內容
    :param colour: 顏色
    :return: None
    """
    # 顏色代碼
    colour_dict = {
        'red': 31,  # 紅色
        'green': 32,  # 綠色
        'yellow': 33,  # 黃色
        'blue': 34,  # 藍色
        'purple_red': 35,  # 紫紅色
        'bluish_blue': 36, # 淺藍色
        'white': 37,  # 白色
    }
    choice = colour_dict.get(colour)  # 選擇顏色

    info = "\033[1;{};1m{}\033[0m".format(choice, content)
    print(info)


def execute_linux2(cmd, timeout=10, skip=False):
    """
    執行linux命令,返回list
    :param cmd: linux命令
    :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒
    :param skip: 是否跳過超時限制
    :return: list
    """
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

    t_beginning = time.time()  # 開始時間
    while True:
        if p.poll() is not None:
            break
        seconds_passed = time.time() - t_beginning
        if not skip:
            if seconds_passed > timeout:
                # p.terminate()
                # p.kill()
                # raise TimeoutError(cmd, timeout)
                custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red")
                # 當shell=True時,只有os.killpg才能kill子進程
                try:
                    # time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)
                except Exception as e:
                    pass
                return False

    result = p.stdout.readlines()  # 結果輸出列表
    return result


class NetworkTest(object):
    def __init__(self):
        self.flag_list = []

    def check_ping(self,ip):
        """
        檢查ping
        :param ip: ip地址
        :return: none
        """
        cmd = "ping %s -c 2" % ip
        # print(cmd)
        # 本機執行命令
        res = execute_linux2(cmd,2)
        # print(res)
        if not res:
            custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red")
            self.flag_list.append(False)
            return False

        res.pop()  # 刪除最後一個元素
        last_row = res.pop().decode('utf-8').strip()  # 再次獲取最後一行結果
        if not last_row:
            custom_print("錯誤,執行命令: {} 異常","red")
            self.flag_list.append(False)
            return False

        res = last_row.split()  # 切割結果
        # print(res,type(res),len(res))
        if len(res) <10:
            custom_print("錯誤,切割 ping 結果異常","red")
            self.flag_list.append(False)
            return False

        if res[5] == "0%":  # 判斷丟包率
            custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green")
        else:
            self.flag_list.append(False)
            custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red")

    def main(self):
        """
        主程序
        :return:
        """
        process_list = []
        pool= gevent.pool.Pool(100)  # 協程池固定爲100個
        for num in range(1, 256):
            ip = '192.168.10.{}'.format(num)
            # self.check_ping(ip)
            # 將任務加到列表中
            process_list.append(pool.spawn(self.check_ping, ip))

        gevent.joinall(process_list)  # 等待全部協程結束


if __name__ == '__main__':
    startime = time.time()  # 開始時間

    NetworkTest().main()

    endtime = time.time()
    take_time = endtime - startime

    if take_time < 1:  # 判斷不足1秒時
        take_time = 1  # 設置爲1秒
    # 計算花費時間
    m, s = divmod(take_time, 60)
    h, m = divmod(m, 60)

    custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
View Code

 

再次執行,效果以下:

...
錯誤, 執行命令: ping 192.168.10.254 -c 2 失敗
錯誤, 命令: ping 192.168.10.255 -c 2,本地執行超時!
錯誤, 執行命令: ping 192.168.10.255 -c 2 失敗
本次花費時間 00:00:15

 

能夠,發現花費的時間,明顯要比上面慢了!

 

pool.map 單個參數

其實,還有一種寫法,使用pool.map,語法以下:

pool.map(func,iterator)

好比:

pool.map(self.get_kernel, NODE_LIST)

 

注意:func是一個方法,iterator是一個迭代器。好比:list就是一個迭代器

使用map時,func只能接收一個參數。這個參數就是,遍歷迭代器的每個值。

 

使用map,完整代碼以下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()


def custom_print(content,colour='white'):
    """
    寫入日誌文件
    :param content: 內容
    :param colour: 顏色
    :return: None
    """
    # 顏色代碼
    colour_dict = {
        'red': 31,  # 紅色
        'green': 32,  # 綠色
        'yellow': 33,  # 黃色
        'blue': 34,  # 藍色
        'purple_red': 35,  # 紫紅色
        'bluish_blue': 36, # 淺藍色
        'white': 37,  # 白色
    }
    choice = colour_dict.get(colour)  # 選擇顏色

    info = "\033[1;{};1m{}\033[0m".format(choice, content)
    print(info)


def execute_linux2(cmd, timeout=10, skip=False):
    """
    執行linux命令,返回list
    :param cmd: linux命令
    :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒
    :param skip: 是否跳過超時限制
    :return: list
    """
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

    t_beginning = time.time()  # 開始時間
    while True:
        if p.poll() is not None:
            break
        seconds_passed = time.time() - t_beginning
        if not skip:
            if seconds_passed > timeout:
                # p.terminate()
                # p.kill()
                # raise TimeoutError(cmd, timeout)
                custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red")
                # 當shell=True時,只有os.killpg才能kill子進程
                try:
                    # time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)
                except Exception as e:
                    pass
                return False

    result = p.stdout.readlines()  # 結果輸出列表
    return result


class NetworkTest(object):
    def __init__(self):
        self.flag_list = []

    def check_ping(self,ip):
        """
        檢查ping
        :param ip: ip地址
        :return: none
        """
        cmd = "ping %s -c 2" % ip
        # print(cmd)
        # 本機執行命令
        res = execute_linux2(cmd,2)
        # print(res)
        if not res:
            custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red")
            self.flag_list.append(False)
            return False

        res.pop()  # 刪除最後一個元素
        last_row = res.pop().decode('utf-8').strip()  # 再次獲取最後一行結果
        if not last_row:
            custom_print("錯誤,執行命令: {} 異常","red")
            self.flag_list.append(False)
            return False

        res = last_row.split()  # 切割結果
        # print(res,type(res),len(res))
        if len(res) <10:
            custom_print("錯誤,切割 ping 結果異常","red")
            self.flag_list.append(False)
            return False

        if res[5] == "0%":  # 判斷丟包率
            custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green")
        else:
            self.flag_list.append(False)
            custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red")

    def main(self):
        """
        主程序
        :return:
        """
        pool= gevent.pool.Pool(100)  # 協程池固定爲100個
        ip_list = ["192.168.10.{}".format(i) for i in range(1, 256)]
        # 使用pool.map,語法:pool.map(func,iterator)
        pool.map(self.check_ping, ip_list)


if __name__ == '__main__':
    startime = time.time()  # 開始時間

    NetworkTest().main()

    endtime = time.time()
    take_time = endtime - startime

    if take_time < 1:  # 判斷不足1秒時
        take_time = 1  # 設置爲1秒
    # 計算花費時間
    m, s = divmod(take_time, 60)
    h, m = divmod(m, 60)

    custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
View Code

 

注意:方法只有一個參數的狀況下,使用pool.map,一行就能夠搞定。這樣看起來,比較精簡!

 

pool.map 多參數

若是方法,有多個參數,須要借用偏函數實現。

完整代碼以下:

#!/usr/bin/env python3
# coding: utf-8

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import os
import time
import signal
import subprocess
import gevent
import gevent.pool
from gevent import monkey;monkey.patch_all()
from functools import partial

def custom_print(content,colour='white'):
    """
    寫入日誌文件
    :param content: 內容
    :param colour: 顏色
    :return: None
    """
    # 顏色代碼
    colour_dict = {
        'red': 31,  # 紅色
        'green': 32,  # 綠色
        'yellow': 33,  # 黃色
        'blue': 34,  # 藍色
        'purple_red': 35,  # 紫紅色
        'bluish_blue': 36, # 淺藍色
        'white': 37,  # 白色
    }
    choice = colour_dict.get(colour)  # 選擇顏色

    info = "\033[1;{};1m{}\033[0m".format(choice, content)
    print(info)


def execute_linux2(cmd, timeout=10, skip=False):
    """
    執行linux命令,返回list
    :param cmd: linux命令
    :param timeout: 超時時間,生產環境, 特別卡, 所以要3秒
    :param skip: 是否跳過超時限制
    :return: list
    """
    p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,shell=True,close_fds=True,preexec_fn=os.setsid)

    t_beginning = time.time()  # 開始時間
    while True:
        if p.poll() is not None:
            break
        seconds_passed = time.time() - t_beginning
        if not skip:
            if seconds_passed > timeout:
                # p.terminate()
                # p.kill()
                # raise TimeoutError(cmd, timeout)
                custom_print('錯誤, 命令: {},本地執行超時!'.format(cmd),"red")
                # 當shell=True時,只有os.killpg才能kill子進程
                try:
                    # time.sleep(1)
                    os.killpg(p.pid, signal.SIGUSR1)
                except Exception as e:
                    pass
                return False

    result = p.stdout.readlines()  # 結果輸出列表
    return result


class NetworkTest(object):
    def __init__(self):
        self.flag_list = []

    def check_ping(self,ip,timeout):
        """
        檢查ping
        :param ip: ip地址
        :param ip: 超時時間
        :return: none
        """
        cmd = "ping %s -c 2 -W %s" %(ip,timeout)
        # print(cmd)
        # 本機執行命令
        res = execute_linux2(cmd,2)
        # print("res",res,"ip",ip,"len",len(res))
        if not res:
            custom_print("錯誤, 執行命令: {} 失敗".format(cmd), "red")
            self.flag_list.append(False)
            return False

        if len(res) != 7:
            custom_print("錯誤,執行命令: {} 異常".format(cmd), "red")
            self.flag_list.append(False)
            return False

        res.pop()  # 刪除最後一個元素
        last_row = res.pop().decode('utf-8').strip()  # 再次獲取最後一行結果
        if not last_row:
            custom_print("錯誤,執行命令: {} 獲取結果異常","red")
            self.flag_list.append(False)
            return False

        res = last_row.split()  # 切割結果
        # print(res,type(res),len(res))
        if len(res) <10:
            custom_print("錯誤,切割 ping 結果異常","red")
            self.flag_list.append(False)
            return False

        if res[5] == "0%":  # 判斷丟包率
            custom_print("正常, ip: {} ping正常 丟包率0%".format(ip), "green")
        else:
            self.flag_list.append(False)
            custom_print("錯誤, ip: {} ping異常 丟包率100%".format(ip), "red")

    def main(self):
        """
        主程序
        :return:
        """
        pool= gevent.pool.Pool(100)  # 協程池固定爲100個
        ip_list = ["192.168.0.{}".format(i) for i in range(1, 256)]
        # 使用協程池,執行任務。語法: pool.map(func,iterator)
        # partial使用偏函數傳遞參數
        # 注意:has_null第一個參數,必須是迭代器遍歷的值
        pool.map(partial(self.check_ping, timeout=1), ip_list)


if __name__ == '__main__':
    startime = time.time()  # 開始時間

    NetworkTest().main()

    endtime = time.time()
    take_time = endtime - startime

    if take_time < 1:  # 判斷不足1秒時
        take_time = 1  # 設置爲1秒
    # 計算花費時間
    m, s = divmod(take_time, 60)
    h, m = divmod(m, 60)

    custom_print("本次花費時間 %02d:%02d:%02d" % (h, m, s),"green")
View Code

 

執行腳本,效果同上

 

 

本文參考連接:

https://www.cnblogs.com/c-x-a/p/9049651.html

相關文章
相關標籤/搜索