Python RPC 遠程調用腳本之 RPyC 實踐

最近有個監控需求,須要遠程執行集羣每一個節點上的腳本,並獲取腳本執行結果,爲了安全起見不須要帳號密碼登錄節點主機,要求只須要調用遠程腳本模塊的方法就能實現。html

總結下python進行遠程調用腳本方法:python

  • 登錄主機執行腳本,python模塊支持如 pssh、pexpect、paramiko、ansiblenginx

  • 遠程方法調用(不須要登錄主機),python模塊 rpyc,支持分佈式shell

  • socket 方式,稍顯複雜,須要熟悉網絡協議,起點比較高編程

rpyc支持遠程調用、分佈式計算,以較少代碼量實現複雜socket編程,本文主要介紹 rpyc 並用它來實現一個 demo。安全

以代碼方式介紹:網絡

需求:分別執行集羣每一個節點上 server 端的腳本,並返回執行結果給 client 端多線程

Monitor_RPC_Client.py

#!/usr/bin/env python
# coding=utf-8
# 測試utf-8編碼
# python exec_cmd.py "ls -lrt /opt/data1/logs/nginx/pc/track/`date +'%Y%m%d'`|awk '{s+=\$5}END{print s}'"
# python exec_cmd.py "wc -l /opt/data1/logs/nginx/pc/track/`date +'%Y%m%d'`/*|awk '{s+=\$1}END{print s}'"
import sys

reload(sys)
sys.setdefaultencoding('utf-8')

import rpyc
from pyUtil import *
from multiprocessing.dummy import Pool as ThreadPool

hostDict = {
    '192.168.1.216': 12345,
    '192.168.1.217': 12345,
    '192.168.1.218': 12345
}

localResultDict = {}


def rpc_client(host_port_cmd):
    host = host_port_cmd[0]
    port = host_port_cmd[1]
    cmd = host_port_cmd[2]
    c = rpyc.connect(host, port)
    result = c.root.exposed_execCmd(cmd)
    localResultDict[host] = result
    c.close()


def exec_cmd(cmd_str):
    host_port_list = []
    for (host, port) in hostDict.items():
        host_port_list.append((host, port, cmd_str))

    pool = ThreadPool(len(hostDict))
    results = pool.map(rpc_client, host_port_list)
    pool.close()
    pool.join()
    for ip, result in sorted(localResultDict.iteritems(), key=lambda d: int(d[0].replace(".", ""))):
        print ip + ":\t" + result

if __name__ == "__main__":

    if len(sys.argv) == 2 and sys.argv[1] != "-h":
        print "======================"
        print "    Your command is:\t" + sys.argv[1]
        print "======================"
        cmd_str = sys.argv[1]
    else:
        print """
            該腳本能夠在集羣中批量執行任意命令並返回結果,但需注意如下幾點:
            一、命令請先單機測試經過,而後提交給腳本批量執行;
            二、不要執行 rm 等危險 || 極其耗時 || 影響機器性能的命令;
            三、命令請用雙引號引發來,另外命令中有 $ 符號須要轉義成 \$ 不然會被 Shell 當作變量解析掉,具體請參見下面的例子。
            Usage && for example:
            python exec_cmd.py "ls -lrt /opt/data1/logs/nginx/pc/track/{}|awk '{{s+=\$5}}END{{print s}}'"
            python exec_cmd.py "wc -l /opt/data1/logs/nginx/pc/track/{}/*|awk '{{s+=\$1}}END{{print s}}'"
        """.format(yesterday, yesterday)
        sys.exit(1)

    exec_cmd(cmd_str)


Monitor_RPC_Server.py

#!/usr/bin/env python
# coding=utf-8
# 測試utf-8編碼
# cd /opt/script/rpcMonitorFlume
# pkill -f flumeFileMonitor_RPC_Server.py
# nohup python -u flumeFileMonitor_RPC_Server.py >> logs/flumeFileMonitor_RPC_Server.log 2>&1 &
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

import os, commands, glob, re
import datetime

from rpyc import Service
from rpyc.utils.server import ThreadedServer

from pyUtil import getNowTime, get_ip_address

class remote_call_func(Service):

    def on_connect(self):
        print "[{0}]\t--------------<<< on_connect".format(getNowTime())

    def on_disconnect(self):
        print "[{0}]\t-------------->>> on_disconnect".format(getNowTime())

    def exposed_execCmd(self, cmd):
        exitCode, execResult = commands.getstatusoutput(cmd)
        nowTime = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S")
        print "[{0}] → {1} → {2}".format(nowTime, cmd, execResult)
        return execResult


rpycServer = ThreadedServer(remote_call_func, hostname=get_ip_address('eth0'), port=11111, auto_register=False)
rpycServer.start()

官方文檔中相似例子不少,就不詳細介紹了,需注意3點:併發

  • server端定義方法須要被client調用,必須定義以exposed 開頭的方法,否則會報錯AttributeError: ‘remote_call_script’ object has no attribute ‘exposed_iamshell’app

  • server端默認不設認證機制,若是須要認證有推薦兩種方法: ThreadedServer的authenticator參數與SSL模塊

  • pip install rpyc ,若是 import rpyc 報錯則 yum install openssl-devel,而後從新編譯、安裝 python

固然還須要考慮不少異常處理,如超時、驗證失敗等。

Refer:

[1] python遠程調用腳本(一)

http://www.dbunix.com/?p=3262

http://rpyc.readthedocs.org/en/latest/tutorial.html

[2] python學習——python中執行shell命令

http://zhou123.blog.51cto.com/4355617/1312791

[3] celery實現任務統一收集、分發執行 

http://blog.csdn.net/vintage_1/article/details/47664187

[4] Timeout function if it takes too long to finish [duplicate]

http://stackoverflow.com/questions/2281850/timeout-function-if-it-takes-too-long-to-finish

[5] 源碼之Queue

http://www.cnblogs.com/liqxd/p/5104051.html

[6] python多線程編程(9) Queue模塊

http://beginman.cn/python/2015/12/01/python-threading-queue/

[7] Python 並行任務技巧

http://my.oschina.net/leejun2005/blog/194270?fromerr=mNcoWQlp

[8] 利用 Python yield 建立協程將異步編程同步化

http://my.oschina.net/leejun2005/blog/501448?fromerr=ynpLsTXB

[9] Python 多線程教程:併發與並行

http://my.oschina.net/leejun2005/blog/398826

[10] 理解 Python 中的多線程

http://my.oschina.net/leejun2005/blog/179265

[11] paramiko小記

http://www.cnblogs.com/liqxd/p/5077803.html

相關文章
相關標籤/搜索