前段時間遇到了這樣一個需求html
某一客戶端向一個服務器提交任務python
服務器再將分發下去由對於的工做人員來完成linux
前輩告訴我用gearman搭建一個分佈式系統。gearman有三個部分,client、service和worker
服務器
client:提交任務多線程
service:分配任務分佈式
worker:執行任務函數
它能夠實現的效果就是一臺機器上搭建好了服務器測試
另外能夠多臺機器做爲客戶端,能夠一塊兒提交任務,而後服務器會用個隊列來存起來spa
而後就是起多臺機器做爲worker去服務器要任務線程
把每一個步驟都分佈到了不一樣的主機上,這就是典型的分佈式系統。
(感受這個這就是一個增強版的多線程機制,一個線程提交任務到隊列,起多個線程去隊列中去。就是一個網遊一個是單機)
詳細介紹能夠看官網Gearman,官網上還有各類語言的例子
配置過程的話跟着網上來就能夠,前輩說linux就是配置特別麻煩,等配置好了以後,用起來就方便了
gearman client
提交任務用法很簡單,如下是用例
gm_client = gearman.GearmanClient(['localhost:4730']) gm_client.submit_job(GRARMAN_TASK_NAME, data, priority=gearman.PRIORITY_HIGH, background=True)
記得先引包 import gearman
['localhost:4730'] 就是gearman服務器的位置,端口默認是4730
而後client有一個submit_job的方法,下面有該函數的源碼,有一堆參數意思就和名字同樣。如background 參數就是提交後臺任務False就是等待返回結果,適用於大量提交任務。
其中最重要的參數 就是 task 在測試代碼中我填寫是 GRARMAN_TASK_NAME 。這個是任務的惟一標識,就是能夠有多個client提交任務,但服務器怎麼識別任務呢,就是靠這參數,固然worker也是靠這個參數識別。(我第一次寫的時候把這個參數寫成了「echo」結果提交了一坨任務,但我本身的worker只收到幾個,我調試了很久才發現。。。)
gearman.client.submit_job 源碼
1 def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None): 2 """Submit a single job to any gearman server""" 3 job_info = dict(task=task, data=data, unique=unique, priority=priority) 4 completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout) 5 return gearman.util.unlist(completed_job_list)
gearman worker
worker裏面的話,除了GRARMAN_TASK_NAME 和 ['localhost:4730']值得注意意外,還有一個地方就是每一個worker都須要註冊一下本身要作的任務,就是下面代碼中的
gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)
註冊的名字須要和對應client中的一致,後面一個參數是一個方法,寫的格式也是代碼中的那樣,至關於就是拿到了任務怎麼幹,這個怎麼幹的過程就寫到方法裏面
gm_worker = gearman.GearmanWorker(['localhost:4730']) def task_listener_reverse(gearman_worker, gearman_job): #gearman_job 就是client端傳過來的數據 print "這裏是想要乾的事" return gearman_job.data[::-1]#返回數據的逆序 #GRARMAN_TASK_NAME 這個名字須要是任務的惟一標識 gm_worker.register_task(GRARMAN_TASK_NAME, task_listener_reverse)
GearmanAdminClient
今天有一個需求
獲得gearman服務上有多少個job,又有多少worker正在工做
而後根據job和worker的數量進行一些相應的調整工做
忽然發現gearman中GearmanAdminClient有如下兩個方法,瞬間完成任務
def get_workers(self): """Retrieves a list of workers and reports what tasks they're operating on""" self.establish_admin_connection() self.current_handler.send_text_command (GEARMAN_SERVER_COMMAND_WORKERS) return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_WORKERS) def get_status(self): """Retrieves a list of all registered tasks and reports how many items/workers are in the queue""" self.establish_admin_connection() self.current_handler.send_text_command(GEARMAN_SERVER_COMMAND_STATUS) return self.wait_until_server_responds(GEARMAN_SERVER_COMMAND_STATUS)
測試代碼
ad_client=gearman.GearmanAdminClient(['localhost:4730']) list=ad_client.get_workers() for row in list: print row print "\n" list=ad_client.get_status() for row in list: print row
部分結果展現
{'file_descriptor': '34', 'tasks': (), 'client_id': '-', 'ip': '127.0.0.1'} {'file_descriptor': '50', 'tasks': ('resize', 'like', 'dislike'), 'client_id': '-', 'ip': '127.0.0.1'} {'file_descriptor': '46', 'tasks': ('resize', 'like', 'dislike'), 'client_id': '-', 'ip': '127.0.0.1'} {'file_descriptor': '59', 'tasks': ('add_phone_info',), 'client_id': '-', 'ip': '127.0.0.1'} {'file_descriptor': '55', 'tasks': ('add_phone_info',), 'client_id': '-', 'ip': '127.0.0.1'} {'workers': 0, 'running': 0, 'task': 'apkcrawler', 'queued': 22028} {'workers': 0, 'running': 0, 'task': 'reverse', 'queued': 0} {'workers': 1, 'running': 0, 'task': 'echo', 'queued': 0} {'workers': 10, 'running': 0, 'task': 'add_phone_info', 'queued': 0} {'workers': 0, 'running': 0, 'task': 'write_hbase', 'queued': 0} {'workers': 0, 'running': 0, 'task': 'write_amazon', 'queued': 0} {'workers': 10, 'running': 0, 'task': 'dislike', 'queued': 0}