linux 環境下監控thriftserver 運行內存(python)

最近發現thriftserver 運行時,運行內存有時超過配置文件 thriftserver.conf 中SPARK_EXCUTOR_MEM配置的內存,致使thriftserver執行查詢異常。因此寫了小程序,定時監控thriftserver的運行狀況,當運行內存大於配置文件的內存時,將thriftserver重啓。
 
一、配置遠程ssh 命令執行接口。
由於spark 一般是集羣部署,因此thriftserver的配置文件 或者 部署不是在咱們所監控的機器上,因此不可避免須要遠程執行命令。ssh 接口以下:
def ssh_execmd(hostname, port, username, password):
    client = paramiko.SSHClient()   # 須要 import paramiko 包
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
    try:
        client.connect(hostname=hostname, port=port, username=username, password=password)
        return client
    except Exception as e:
        return None
 
後面執行遠程代碼,只須要調用鏈接接口,用 stdin, stdout, stderr = conn.exec_command(cmd) 執行對應的cmd命令就能夠了,三個變量分別記錄 命令的輸入、輸出、錯誤信息。調用完以後,記得調用conn.close()方法,關閉已經創建的鏈接,避免重複創建白白消耗網絡資源。
 
二、獲取進程pid
根據應用名稱獲取進程的id 代號,有時間一個名稱可能對應多個進程的id,這種狀況在grep 後就須要經過awk 命令對結果進行過濾,按指定的規則輸出。獲取進程pid 代碼以下:
 
def get_thrift_pid():
    cmd = "ps -ef |grep thriftserver|awk '$0 ~/memory/ {print $2}' | tr -s '\n' ' ' "
    (status, pid_list) = commands.getstatusoutput(" %s " %cmd)  #須要import commands 包
    result = commands.getoutput(cmd)
 
commands 包中的getstatusoutput 方法,執行linux 命令的時候返回兩個變量,status表示命令執行狀態,0表示成功,其餘狀態表示執行不成功。pid_list表示執行結果,可能有多個。
awk 的字段解釋網上不少,也很詳細,基本上就是對查找結果的一個過濾,這裏的$0表示整個當前行,$2每行的第二個字段,~表示匹配後面的字符;同理,!~表示不匹配,這裏的不是精確的比較。後面的tr -s 表示將輸出的內容中包含的換行符替換成空格。
關於執行命令有多個pid返回,因爲執行的python 代碼中可能包含pid,因此執行時返回了一個當前python進程的pid,而這個pid不是咱們真正須要的,怎麼去除掉呢?個人作法是將這個命令執行兩次,兩次返回的結果中相同的那個pid就是咱們真正把須要的進行pid,完整代碼以下,最後返回的是咱們須要的進程的pid:
 
def get_thrift_pid():
    cmd = "ps -ef |grep thriftserver|awk '$0 ~/memory/ {print $2}' | tr -s '\n' ' ' "
    (status, pid_list) = commands.getstatusoutput(" %s " %cmd)
    result = commands.getoutput(cmd)
    pid_list = pid_list.split(" ")[:-1]
    result_list = result.split(" ")[:-1]
    for pid in pid_list:
        if pid in result_list:
            return int(pid)
    return None
 
三、獲取程序運行時的佔用內存
根據前面獲取的進程pid 獲取當前進程執行時的資源佔用狀況,獲取資源佔用,能夠根據top -p 進程號 ,在線查看當前pid的資源佔用狀況,固然此種方式不太適宜程序中直接讀取。事實上,查看 /pro/進程號/statm 能夠直接讀取,這裏的進程號就是獲取的pid 號。查詢佔用內存的程序代碼以下:
 
def  get_running_memory(pid):
    file_name = "/proc/%d/statm" %pid
    (status, output) = commands.getstatusoutput("cat %s" %file_name)
    use_mem = None
    if status == 0:
        [size, resident, shared, trs, lrs, drs, dt]  = output.split(" ")
        resident = int(resident)  # 字符串轉換爲整形
        use_mem  = resident * 4   # 使用內存, 每頁佔用4kb 內存
    return use_mem
 
讀取statm獲取的7個變量分別表示的含義以下:
size:任務虛擬地址空間大小
Resident:正在使用的物理內存大小
Shared:共享頁數
Trs:程序所擁有的可執行虛擬內存大小
Lrs:被映像倒任務的虛擬內存空間的庫的大小
Drs:程序數據段和用戶態的棧的大小
dt:髒頁數量
 
四、讀取thriftserver配置文件
獲取了thriftserver運行時的內存,還須要讀取出thriftserver配置文件的內存,將二者進行比較。可能實際環境和我代碼中的環境不一致,只須要修改ssh鏈接的信息便可,代碼以下:
 
def get_thrift_conf():
    conf = ConfUtil().getThriftServerConf()   #這裏是我機器封裝的ConfUtil 類,別的機器可能不能運行!
    hostname = conf['ip']    #獲取thriftserver 安裝主機
    file_name = "cat /home/bsaworker/spark/conf/thriftserver.conf"
    conn = ssh_execmd(hostname, port, username, password)
    stdin, stdout, stderr = conn.exec_command(file_name)
    result = stdout.readlines()
    conn.close()
    conf_memroy= result[5].split('=')[1]
    conf_memroy = conf_memroy.replace('\n','')
    conf_memroy = conf_memroy.lower()
    if conf_memroy.find('t') !=-1:
        conf_memroy = int(conf_memroy.split('t')[0])*1024*1024*1024
    elif conf_memroy.find('g') !=-1:
        conf_memroy = int(conf_memroy.split('g')[0])*1024*1024
    elif conf_memroy.find('m') !=-1:
        conf_memroy = int(conf_memroy.split('m')[0])*1024 
    else:
        return 0
    return conf_memroy
 
這裏單位統一轉換成kb級別的,通常配置文件中不大可能給thriftserver配置kb級別的內存,因此在這裏我直接過濾掉了。另外python能夠ConfigParser類來讀取或者操做配置文件,可是thriftserver的配置文件沒有section 字段,因此沒法直接進行讀取了,我這裏是本身寫的一個方法,固然確定還有其餘方式。
 
五、比較完以後相關操做
運行時的內存和配置的內存都已經獲取到了,接下來咱們就要對獲取的結果進行操做了,因爲咱們的需求是一直要監聽,因此我這裏寫了個死循環,一直運行。固然也能夠配置到cron中去,讓程序定時運行。
 
def process_thrift():
    try:
        while True:
            pid = get_thrift_pid()
            running_memory = get_running_memory(pid)
            conf_memroy = get_thrift_conf()
            if running_memory > conf_memroy:
                commands.getoutput("kill -9 %d" %pid)
                print "running memory is exceed conf memory, the thriftserver will restart"
                time.sleep(100)   #100 秒後再執行
     # 執行restart 操做,由於咱們這裏只須要kill 就完了,因此就沒有restart 命令,實際狀況中可能restart 比較好
            else:
                time.sleep(30)    # 30秒以後再運行
                print "the thriftserver process is normal "
    except Exception as e:
        print "檢測程序運行異常,exit! "
        return 
 
這裏須要捕獲異常信息了,處理方式還能夠有其餘方式,根據具體的需求能夠再修改。
 
花了半天時間研究這個其實以爲仍是蠻有趣的,先記錄下來~
相關文章
相關標籤/搜索