由於項目緣由選擇了gearman做爲任務委派的中間件,但原生的python拓展包只支持單進程,期間爲了將gearman改形成自適應多進程的方式在實現方式上走進了些誤區,故在此記錄這些誤區的坑以及目前的最優解決方案。python
實現方式redis
先說說第一種實現方式的優缺點app
優勢:socket
缺點:async
再來講說第二種實現方式的優缺點oop
優勢:spa
缺點:線程
Show me the coderest
# -*- coding: utf-8 -*- import os import signal import threading import multiprocessing import redis from gearman.worker import GearmanWorker, POLL_TIMEOUT_IN_SECONDS WORKER_PROCESS_PID = '/tmp/multi_gearman_worker.pid' class MultiGearmanWorker(GearmanWorker): """ 多進程gearman worker""" def __init__(self, host_list=None, redis_host=None, redis_port=None, pid=WORKER_PROCESS_PID): super(MultiGearmanWorker, self).__init__(host_list=host_list) self.redis_host = redis_host self.redis_port = redis_port self.pid = pid def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS, process=multiprocessing.cpu_count()): """ 開始做業,進程阻塞 :param poll_timeout: int gearman的鏈接時間,時間越短子進程worker召回越快但請求越頻繁 :param process: int 工做進程數,默認爲CPU個數 :return: """ print('Clear last process.') self.gearman_worker_exit() print('Ready to start %d process for work.' % process) gm_poll = multiprocessing.Pool(process) for x in range(0, process): gm_poll.apply_async(gearman_work, (self, poll_timeout, self.pid)) gm_poll.close() gm_poll.join() # 正常退出則刪除子進程PID文件 if os.path.isfile(self.pid): os.remove(self.pid) print('Multi gearman worker exit.') def gearman_worker_exit(self): """ 結束子進程 """ if not os.path.isfile(self.pid): return True with open(self.pid, 'r+') as f: for pid in f.readlines(): pid = int(pid) try: os.kill(pid, signal.SIGKILL) print('Kill process %d.' % pid) except OSError: print('Process %d not exists' % pid) continue os.remove(self.pid) print('Remove process pid file.') return True # 子進程使用的gearman工做開關標識 GEARMAN_CONTINUE_WORK = True def gearman_work(gm_worker, poll_timeout=POLL_TIMEOUT_IN_SECONDS, pid=WORKER_PROCESS_PID): """ 以多進程的方式開啓gearman的worker """ try: # 記錄子進程pid以便主進程被supervisor重啓後清除上次未退出的子進程 with open(pid, 'a+') as f: f.write("%d%s" % (os.getpid(), os.linesep)) print('Chile process start for work.') continue_working = True worker_connections = [] d = threading.Thread(name='monitor', target=gearman_monitor, args=(gm_worker.redis_host, gm_worker.redis_port)) d.start() def continue_while_connections_alive(any_activity): return gm_worker.after_poll(any_activity) # Shuffle our connections after the poll timeout while continue_working and GEARMAN_CONTINUE_WORK: worker_connections = gm_worker.establish_worker_connections() continue_working = gm_worker.poll_connections_until_stopped( worker_connections, continue_while_connections_alive, timeout=poll_timeout) # If we were kicked out of the worker loop, we should shutdown all our connections for current_connection in worker_connections: current_connection.close() print('Gearman worker closed') return None except Exception as e: print(e) def gearman_monitor(redis_host, redis_port): """ 監聽動態更新指令 """ global GEARMAN_CONTINUE_WORK print('Start gearman monitor.') while GEARMAN_CONTINUE_WORK: # 防止運行異常致使線程掛死後沒法監聽redis響應,異常處理放在此處,發生異常後從新監聽 try: sub = redis.StrictRedis(redis_host, redis_port).pubsub() sub.subscribe('hot') for i in sub.listen(): if isinstance(i.get('data'), str): if i.get('data') == 'exit': # worker退出的過程當中將沒法響應其餘數據修改請求 print('Gearman monitor receive restart signal.') GEARMAN_CONTINUE_WORK = False sub.unsubscribe('hot') break # 因線程間變量共享,故此處可用於多進程gearman worker運行中數據的更改 except Exception as e: print(e) try: sub.unsubscribe('hot') except Exception: pass print('Gearman monitor closed') if __name__ == '__main__': def test_multi_gearman_worker(worker, job): print(worker) print(job) gearman_worker = MultiGearmanWorker(('127.0.0.1:4730', ), '127.0.0.1', 6379) gearman_worker.register_task('test_multi_gearman_worker', test_multi_gearman_worker) gearman_worker.work()
Gearman - 維基百科code
源地址 By佐柱
轉載請註明出處,也歡迎偶爾逛逛個人小站,謝謝 :)