GearmanWorker的多進程實現

前言

由於項目緣由選擇了gearman做爲任務委派的中間件,但原生的python拓展包只支持單進程,期間爲了將gearman改形成自適應多進程的方式在實現方式上走進了些誤區,故在此記錄這些誤區的坑以及目前的最優解決方案。python

實現思路

實現方式redis

  1. 主進程接收任務,子進程處理任務。以一個主進程做爲任務委派的接收進程,接收到任務後將任務分派給子進程進行處理,處理完成後由該子進程直接返回任務結果給gearman。
  2. 多進程接收並處理任務。批量fork多個子進程註冊任務,子進程間互不影響,各自完成接收、處理任務的過程。

先說說第一種實現方式的優缺點app

優勢:socket

  • 因爲worker較多的時間是消耗在等待接收請求上,所以主進程只單一的進行輪訓任務接收能夠提升單條gearman請求通道的利用率。
  • 由子進程直接返回任務結果可分離主進程與子進程的做業,主進程無需關心任務的結果而只專一於接收任務。

缺點:async

  • 主進程接收到任務請求後將請求轉發給子進程處理任務,因爲子進程處理任務完成後須要將任務結果返回給gearman,所以子進程須要將該任務請求對應的gearman socket傳遞給子進程,而該過程實現起來過於複雜。(一般具備socket的實例沒法經過pickle傳遞給子進程,雖然Unix的sendmsg能夠用來傳遞socket,但將傳遞的socket構形成一個GearmanWorker又是另一件痛苦的事情)
  • 子進程經過傳遞的socket構造出GearmanWorker後,因爲原socket的句柄仍被父進程持有,因此在等待結果的任務請求方沒法收到子進程所返回的處理結果。

再來講說第二種實現方式的優缺點oop

優勢:spa

  • 等價於fork多個原進程,邏輯、做業方式均無改變。
  • 可在fork子進程以前完成公有資源的加載而無需每一個GearmanWorker勻加載一次。

缺點:線程

  • 子進程異常退出後主進程沒法正確感知,雖然主進程會維持相同的子進程數,可是異常退出所重啓的子進程沒有正確註冊到gearman接收任務。
  • 主進程異常退出後子進程沒法感知,將致使出現殭屍進程。

解決方案

  1. 利用PID文件記錄每一個子進程的pid,確保主進程退出後仍能經過PID文件退出子進程。
  2. 利用Redis的發佈訂閱模式實現GearmanWorker的正常退出。

Show me the coderest

# -*- coding: utf-8 -*-

import os
import signal
import threading
import multiprocessing

import redis
from gearman.worker import GearmanWorker, POLL_TIMEOUT_IN_SECONDS

WORKER_PROCESS_PID = '/tmp/multi_gearman_worker.pid'


class MultiGearmanWorker(GearmanWorker):
    """ 多進程gearman worker"""
    def __init__(self, host_list=None, redis_host=None, redis_port=None, pid=WORKER_PROCESS_PID):
        super(MultiGearmanWorker, self).__init__(host_list=host_list)
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.pid = pid

    def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS, process=multiprocessing.cpu_count()):
        """
        開始做業,進程阻塞
        :param poll_timeout: int gearman的鏈接時間,時間越短子進程worker召回越快但請求越頻繁
        :param process: int 工做進程數,默認爲CPU個數
        :return:
        """
        print('Clear last process.')
        self.gearman_worker_exit()
        print('Ready to start %d process for work.' % process)
        gm_poll = multiprocessing.Pool(process)
        for x in range(0, process):
            gm_poll.apply_async(gearman_work, (self, poll_timeout, self.pid))
        gm_poll.close()
        gm_poll.join()
        # 正常退出則刪除子進程PID文件
        if os.path.isfile(self.pid):
            os.remove(self.pid)

        print('Multi gearman worker exit.')

    def gearman_worker_exit(self):
        """ 結束子進程 """
        if not os.path.isfile(self.pid):
            return True

        with open(self.pid, 'r+') as f:
            for pid in f.readlines():
                pid = int(pid)
                try:
                    os.kill(pid, signal.SIGKILL)
                    print('Kill process %d.' % pid)
                except OSError:
                    print('Process %d not exists' % pid)
                    continue
        os.remove(self.pid)
        print('Remove process pid file.')
        return True

# 子進程使用的gearman工做開關標識
GEARMAN_CONTINUE_WORK = True


def gearman_work(gm_worker, poll_timeout=POLL_TIMEOUT_IN_SECONDS, pid=WORKER_PROCESS_PID):
    """ 以多進程的方式開啓gearman的worker """
    try:
        # 記錄子進程pid以便主進程被supervisor重啓後清除上次未退出的子進程
        with open(pid, 'a+') as f:
            f.write("%d%s" % (os.getpid(), os.linesep))

        print('Chile process start for work.')
        continue_working = True
        worker_connections = []
        d = threading.Thread(name='monitor', target=gearman_monitor,
                             args=(gm_worker.redis_host, gm_worker.redis_port))
        d.start()

        def continue_while_connections_alive(any_activity):
            return gm_worker.after_poll(any_activity)

        # Shuffle our connections after the poll timeout
        while continue_working and GEARMAN_CONTINUE_WORK:
            worker_connections = gm_worker.establish_worker_connections()
            continue_working = gm_worker.poll_connections_until_stopped(
                worker_connections, continue_while_connections_alive, timeout=poll_timeout)

        # If we were kicked out of the worker loop, we should shutdown all our connections
        for current_connection in worker_connections:
            current_connection.close()

        print('Gearman worker closed')
        return None
    except Exception as e:
        print(e)


def gearman_monitor(redis_host, redis_port):
    """ 監聽動態更新指令 """
    global GEARMAN_CONTINUE_WORK
    print('Start gearman monitor.')
    while GEARMAN_CONTINUE_WORK:
        # 防止運行異常致使線程掛死後沒法監聽redis響應,異常處理放在此處,發生異常後從新監聽
        try:
            sub = redis.StrictRedis(redis_host, redis_port).pubsub()
            sub.subscribe('hot')
            for i in sub.listen():
                if isinstance(i.get('data'), str):
                    if i.get('data') == 'exit':
                        # worker退出的過程當中將沒法響應其餘數據修改請求
                        print('Gearman monitor receive restart signal.')
                        GEARMAN_CONTINUE_WORK = False
                        sub.unsubscribe('hot')
                        break
                # 因線程間變量共享,故此處可用於多進程gearman worker運行中數據的更改

        except Exception as e:
            print(e)
            try:
                sub.unsubscribe('hot')
            except Exception:
                pass

    print('Gearman monitor closed')


if __name__ == '__main__':
    def test_multi_gearman_worker(worker, job):
        print(worker)
        print(job)

    gearman_worker = MultiGearmanWorker(('127.0.0.1:4730', ), '127.0.0.1', 6379)
    gearman_worker.register_task('test_multi_gearman_worker', test_multi_gearman_worker)
    gearman_worker.work()

附錄

Gearman - 維基百科code

Pub/Sub – Redis

源地址 By佐柱

轉載請註明出處,也歡迎偶爾逛逛個人小站,謝謝 :)

相關文章
相關標籤/搜索