分佈式任務系統gearman的python實戰

 

Gearman是一個用來把工做委派給其餘機器、分佈式的調用更適合作某項工做的機器、併發的作某項工做在多個調用間作負載均衡、或用來在調用其它語言的函數的系統。Gearman是一個分發任務的程序框架,能夠用在各類場合,開源、多語言支持、靈活、快速、可嵌入、可擴展、無消息大小限制、可容錯,與Hadoop相比,Gearman更偏向於任務分發功能。它的任務分佈很是簡單,簡單得能夠只須要用腳本便可完成。Gearman最初用於LiveJournal的圖片resize功能,因爲圖片resize須要消耗大量計算資 源,所以須要調度到後端多臺服務器執行,完成任務以後返回前端再呈現到界面。html

gearman的任務傳遞模式是一對一的,不能實現一對多,一個client經過job server最後只可以到達一個worker上。若是須要一對多,須要定義多個worker的function,依次向這些worker進行發送,很是的不方便。這一點就不如ZeroMQ,ZeroMQ支持的模式不少,可以知足各類消息隊列需求。他們用在不一樣的場合,Gearman是分佈式任務系統,而ZeroMQ是分佈式消息系統,任務只須要作一次就行。前端

1. Server

1.1 Gearman工做原理

Gearman 服務有不少要素使得它不只僅是一種提交和共享工做的方式,可是主要的系統只由三個組件組成: gearmand 守護進程(server),用於向 Gearman 服務提交請求的 client ,執行實際工做的 worker。其關係以下圖所示:python

Gearmand server執行一個簡單的功能,即從client收集job請求並充當一個註冊器,而worker能夠在此提交關於它們支持的job和操做類型的信息,這樣server實際上就充當了Client和Worker的中間角色。Client將job直接丟給server,而server根據worker提交的信息,將這些job分發給worker來作,worker完成後也可返回結果,server將結果傳回client。舉個例子,在一個公司裏面,有老闆一、老闆二、老闆3(client),他們的任務就是出去喝酒唱歌拉項目(job),將拉來的項目直接交給公司的主管(server),而主管並不親自來作這些項目,他將這些項目分給收手下的員工(worker)來作,員工作完工做後,將結果交給主管,主管將結果報告給老闆們便可。mysql

要使用gearman,首先得安裝server,下載地址:https://launchpad.net/gearmand。當下載安裝完成後,能夠啓動gearmand,啓動有不少參數選項,能夠man gearmand來查看,主要的 選項有:sql

 

  • -b, --backlog=BACKLOG       Number of backlog connections for listen. 
  • -d, --daemon                Daemon, detach and run in the background. 
  • -f, --file-descriptors=FDS  Number of file descriptors to allow for the process                             
  • (total connections will be slightly less). Default     is max allowed for user. 
  • -h, --help                  Print this help menu. 
  • -j, --job-retries=RETRIES   Number of attempts to run the job before the job  server removes it. Thisis helpful to ensure a bad  job does not crash all available workers. Default  is no limit. 
  • -l, --log-file=FILE         Log file to write errors and information to. Turning this option on also forces the first  verbose level to be enabled. 
  • -L, --listen=ADDRESS        Address the server should listen on. Default is  INADDR_ANY. 
  • -p, --port=PORT             Port the server should listen on. 
  • -P, --pid-file=FILE         File to write process ID out to. 
  • -r, --protocol=PROTOCOL     Load protocol module. 
  • -R, --round-robin           Assign work in round-robin order per  workerconnection. The default is to assign work in  the order of functions added by the worker. 
  • -q, --queue-type=QUEUE      Persistent queue type to use. 
  • -t, --threads=THREADS       Number of I/O threads to use. Default=0. 
  • -u, --user=USER             Switch to given user after startup. 
  • -v, --verbose               Increase verbosity level by one. 
  • -V, --version               Display the version of gearmand and exit. 
  • -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received.   The default is to wakeup all available workers.

 

啓動gearmand:數據庫

 

[plain]  view plain  copy
 
  1. sudo gearmand --pid-file=/var/run/gearmand/gearmand.pid --daemon --log-file=/var/log/gearman.log  
若提示沒有/var/log/gearman.log這個文件的話,本身新建一個就能夠了。

 

1.2 實例化queue與容錯

Gearman默認是將queue保存在內存中的,這樣可以保障速速,可是遇到宕機或者server出現故障時,在內存中緩存在queue中的任務將會丟失。Gearman提供了了queue實例化的選項,可以將queue保存在數據庫中,好比:SQLite三、Drizzle、MySQL、PostgresSQL、Redis(in dev)、MongoDB(in dev).在執行任務前,先將任務存入持久化隊列中,當執行完成後再將該任務從持久化隊列中刪除。要使用db來實例化queue,除了在啓動時加入-q參數和對應的數據庫以外,還須要根據具體的數據庫使用相應的選項,例如使用sqlit3來實例化queue,並指明使用用來存儲queue的文件:json

 

[plain]  view plain  copy
 
  1. gearmand -d -q libsqlite3 --libsqlite3-db=/tmp/demon/gearman.db --listen=localhost --port=4370  

 

再如使用mysql來實例化queue,選項爲:後端

 

[plain]  view plain  copy
 
  1. <pre name="code" class="plain">/usr/local/gearmand/sbin/gearmand  -d  -u root \  
  2. –queue-type=MySQL \  
  3. –mysql-host=localhost \  
  4. –mysql-port=3306 \  
  5. –mysql-user=gearman \  
  6. –mysql-password=123456 \  
  7. –mysql-db=gearman \  
  8. –mysql-table=gearman_queue  
 
   

還要建立相應的數據庫和表,並建立gearman用戶,分配相應的權限:緩存

 

[sql]  view plain  copy
 
  1. CREATE DATABASE gearman;  
  2. CREATE TABLE `gearman_queue` (  
  3. `id` int(10) unsigned NOT NULL AUTO_INCREMENT,  
  4. `unique_key` varchar(64) NOT NULL,  
  5. `function_name` varchar(255) NOT NULL,  
  6. `when_to_run` int(10) NOT NULL,  
  7. `priority` int(10) NOT NULL,  
  8. `data` longblob NOT NULL,  
  9. PRIMARY KEY (`id`),  
  10. UNIQUE KEY `unique_key_index` (`unique_key`,`function_name`)  
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;  
  12.   
  13. create USER gearman@localhost identified by ’123456′;  
  14. GRANT ALL on gearman.* to gearman@localhost;  

 

能夠在gearman的配置文件中加入相關配置,以避免每次啓動都須要寫一堆東西:

 

 

[plain]  view plain  copy
 
  1. # /etc/conf.d/gearmand: config file for /etc/init.d/gearmand  
  2.   
  3. # Persistent queue store  
  4. # The following queue stores are available:  
  5. # drizzle|memcache|mysql|postgre|sqlite|tokyocabinet|none  
  6. # If you do not wish to use persistent queues, leave this option commented out.  
  7. # Note that persistent queue mechanisms are mutally exclusive.  
  8. PERSISTENT="mysql"  
  9.   
  10. # Persistent queue settings for drizzle, mysql and postgre  
  11. #PERSISTENT_SOCKET=""  
  12. PERSISTENT_HOST="localhost"  
  13. PERSISTENT_PORT="3306"  
  14. PERSISTENT_USER="gearman"  
  15. PERSISTENT_PASS="your-pass-word-here"  
  16. PERSISTENT_DB="gearman"  
  17. PERSISTENT_TABLE="gearman_queue"  
  18.   
  19. # Persistent queue settings for sqlite  
  20. #PERSISTENT_FILE=""  
  21.   
  22. # Persistent queue settings for memcache  
  23. #PERSISTENT_SERVERLIST=""  
  24.   
  25. # General settings  
  26. #  
  27. # -j, --job-retries=RETRIES   Number of attempts to run the job before the job  
  28. #                             server removes it. Thisis helpful to ensure a bad  
  29. #                             job does not crash all available workers. Default  
  30. #                             is no limit.  
  31. # -L, --listen=ADDRESS        Address the server should listen on. Default is  
  32. #                             INADDR_ANY.  
  33. # -p, --port=PORT             Port the server should listen on. Default=4730.  
  34. # -r, --protocol=PROTOCOL     Load protocol module.  
  35. # -t, --threads=THREADS       Number of I/O threads to use. Default=0.  
  36. # -v, --verbose               Increase verbosity level by one.  
  37. # -w, --worker-wakeup=WORKERS Number of workers to wakeup for each job received.  
  38. #                             The default is to wakeup all available workers.  
  39. GEARMAND_PARAMS="-L 127.0.0.1 --verbose=DEBUG"  

這其實並非一個很好的方案,由於當使用數據庫來實例化queue時,會增長兩個步驟:Client和worker必須鏈接到server上去讀寫job,而且數據庫在處理的速度上也會大大下降。在大併發任務量的狀況下,性能會受到直接影響,你會發現SQLite或者mysql並不能知足處理大量BLOB的性能要求,job會不斷地積攢而得不處處理,給一個任務猶如石牛如同樣海毫無反應。歸根結底,須要根據本身的應用場景,合理設計一些測試用例和自動化腳本,經過實際的運行狀態進行參數的調整。服務器

job分佈式系統一個基本的特色就是要有單點容錯能力(no  single point failure),還不能有單點性能瓶頸(no single point of bottleneck)。即:一個節點壞了不影響整個系統的業務,一個節點的性能不能決定整個系統的性能。那若是server掛了該怎麼辦?解決方法是使用多個server:

 

[plain]  view plain  copy
 
  1. gearmand -d -q libsqlite3  --listen=localhost --port=4370  
[plain]  view plain  copy
 
  1. gearmand -d -q libsqlite3  --listen=localhost --port=4371  

 

每一個client鏈接多個server,並使用負載最低的那個server,當該server掛掉以後,gearman會自動切換到另外一個server上,以下圖所示:

 

 

1.3 輪詢調度

當job不斷地增長時,咱們可能須要增長worker服務器來增長處理能力,但你可能會發現任務並非均勻地分佈在各個worker服務器上,由於server分配任務給worker的方式默認按照循序分配的,好比你現有worker-A,在server上註冊了5個worker進程,隨着任務的增長,又加了一臺worker-B,並向同一個server註冊了5個worker進程。默認狀況下,server會按照worker註冊的前後順序進行調度,即:只有給worker-A分配滿任務後纔會給worker-B分配任務,即分配方式是wA, wA,wA, wA,wA,wB, wB,wB, wB, wB。爲了可以給worker-A和worker-B均勻地分配任務,server能夠採用輪詢的方式給worker服務器分配任務,即分配方式爲: wA, wB, wA, wB ...,那麼在啓動server時加上選項:-R或者--round-robin

 

1.4 受限喚醒

 

根據gearman協議的設計,Worker 若是發現隊列中沒有任務須要處理,是能夠經過發送 PRE_SLEEP 命令給服務器,告知說本身將進入睡眠狀態。在這個狀態下,Worker 不會再去主動抓取任務,只有服務器發送 NOOP 命令喚醒後,纔會恢復正常的任務抓取和處理流程。所以 Gearmand 在收到任務時,會去嘗試喚醒足夠的 Worker 來抓取任務;此時若是 Worker 的總數超過可能的任務數,則有可能產生驚羣效應。而經過 –worker-wakeup 參數,則能夠指定收到任務時,須要喚醒多少個 Worker 進行處理,避免在 Worker 數量很是大時,發送大量沒必要要的 NOOP 報文,試圖喚醒全部的 Worker。

 

1.6 線程模型

Gearman中有三種線程:

  1. 監聽和管理線程。只有一個(負責接收鏈接,而後分配給I/O線程來處理,若是有多個I/O線程的話,同時也負責啓動和關閉服務器,採用libevent來管理socket和信號管道)
  2. I/O線程。能夠有多個(負責可讀可寫的系統調用和對包初步的解析,將初步解析的包放入各自的異步隊列中,每一個I/O線程都有本身的隊列,因此競爭不多,經過-t選項來指定I/O線程數)
  3. 處理線程。只有一個(負責管理各類信息列表和哈希表,好比跟蹤惟一鍵、工做跟蹤句柄、函數、工做隊列等。將處理結果信息包返回給I/O線程,I/O線程將該包挑選出來並向該鏈接發送數據)
其中第1, 3種線程對全局處理性能沒有直接影響,雖然處理線程有可能成爲瓶頸,但他的工做足夠簡單消耗可忽略不計,所以咱們的性能調優主要目標是在IO線程的數量。對每一個IO線程來講,它都會有一個libevent的實例;全部Gearman的操做會以異步任務方式提交處處理線程,並由IO線程獲取完成實際操做,所以IO線程的數量是與可並行處理任務數成正比。Gearmand 提供 -t 參數調整總IO線程數,須要使用 libevent 1.4 以上版本提供多線程支持。

 

 

進程句柄數

另一個影響大規模部署的是進程句柄數,Gearman會爲每個註冊的Worker分配一個fd(文件描述符),而這個fd的總數是受用戶限制的,可使用 ulimit -n 命令查看當前限制
[plain]  view plain  copy
 
  1. flier@debian:~$ ulimit -n  
  2. 1024  
  3. flier@debian:~$ ulimit -HSn 4096 // 設置進程句柄數的最大軟硬限制  
  4. 4096  
也就是說gearman缺省配置下,最多容許同時有小於1024個worker註冊上來,fd用完以後的Worker和Client會出現鏈接超時或無響應等異常狀況。所以,發生相似狀況時,咱們應首先檢查 /proc/[PID]/fd/ 目錄下的數量,是否已經超過 ulimit -n 的限制,並根據須要進行調整。而全系統的打開文件設置,能夠參考 /proc/sys/fs/file-max 文件,並經過 sysctl -w fs.file-max=[NUM] 進行修改。
[plain]  view plain  copy
 
  1. flier@debian:~$ cat /proc/sys/fs/file-max  
  2. 24372  
  3. flier@debian:~# sysctl -w fs.file-max=100000  
  4. 100000  
Gearmand 自己也提供了調整句柄數量限制的功能,啓動時則能夠經過 -f或者–file-descriptors 參數指定,但非特權進程不能設置超過soft limit的數額。"The soft limit is the value that the kernel enforces for the corresponding resource. The hard limit acts as a ceiling for the soft limit: an unprivileged process may only set its soft limit to a value in the range from 0 up to the hard limit, and (irreversibly) lower its hard limit. A privileged process (under Linux: one with the
CAP_SYS_RESOURCE capability) may make arbitrary changes to either limit value."


2. Client

對於發送單個job,python-gearman提供了一個簡單的函數:submit_job,能夠將job發送到server,其定義以下:
GearmanClient. submit_job (task, data, unique=None, priority=None, background=False, wait_until_complete=True, max_retries=0,poll_timeout=None )
 
下面來看看gearman的一個簡單樣例:
[python]  view plain  copy
 
  1. import gearman  
  2. import time  
  3. from gearman.constants import JOB_UNKNOWN  
  4.   
  5. def check_request_status(job_request):  
  6.     """check the job status"""  
  7.     if job_request.complete:  
  8.         print 'Job %s finished! Result: %s - %s' % (job_request.job.unique, job_request.state, job_request.result)  
  9.     elif job_request.time_out:  
  10.         print 'Job %s timed out!' % job_request.unique  
  11.     elif job_request.state == JOB_UNKNOWN:  
  12.         print "Job %s connection failed!" % job_request.unique  
  13.   
  14. gm_client = gearman.GearmanClient(['localhost:4730','localhost:4731'])  
  15.   
  16. complete_job_request = gm_client.submit_job("reverse", "Hello World!")  
  17. check_request_status(complete_job_request)  
gm_client鏈接到本地的4730/4731端口的server上,而後用submit_job函數將」reverse「和參數「Hello World!"傳給server,返回一個request,最後用check_request_status()函數檢查這個request的狀態。是否是很簡單?
 

2.1  task與job

task與job是有區別的區別主要在於:
  1. Task是一組job,在下發後會執行並返回結果給調用方
  2. Task內的子任務悔下發給多個work並執行
  3. client下放給server的任務爲job,而整個下方並返回結果的過程爲task,每一個job會在一個work上執行
  4. task是一個動態的概念,而job是一個靜態的概念。這有點相似「進程」和「程序」概念的區別。既然是動態的概念,就有完成(complete)、超時(time_out)、攜帶的job不識別(JOB_UNKNOWN)等狀態

 

2.2 job優先級(priority)

client在發送job的時候,能夠設定job的優先級,只須要在submit_job函數中添加選項「priority=gearman.PRIORITY_HIGH」便可建立高優先級task,priority能夠有三個選項:PRIORITY_HIGH、PRIORITY_LOW、PRIORITY_NONE(default)
 

2.3 同步與異步(background)

默認狀況下,client以同步方式發送job到server,所謂的同步,即client在向server發送完job後,不停地詢問該(組)job執行的狀況,直到server返回結果。而異步方式則是client在得知task建立完成以後,無論該task的執行結果。要使client採用異步方式,則在submit_job加入參數「background=True」便可。下面展現了gearman同步/異步的方式時的時序圖。
由上面的同步時序圖可知,client端在job執行的整個過程當中,與job server端的連接都是保持着的,這也給job完成後job server返回執行結果給client提供了通路。同時,在job執行過程中,client端還能夠發起job status的查詢。固然,這須要worker端的支持的。
由上面的異步時序圖可知,client提交完job,job server成功接收後返回JOB_CREATED響應以後,client就斷開與job server之間的連接了。後續不管發生什麼事情,client都是不關心的。一樣,job的執行結果client端也沒辦法經過Gearman消息框架 得到。
 

2.4 阻塞與非阻塞(wait_until_complete)

client建立task時,默認狀況下使用的是阻塞模式,所謂的阻塞模式在進程上的表現爲:在執行完submit_job後,卡在此處等待server返回結果。而非阻塞模式則是一旦job被server接收,程序能夠繼續向下執行,咱們能夠在後面適當的位置(程序最後或者須要用到返回結果的地方)來檢查並取回這些task的狀態和結果。要使用非阻塞模式,則在submit_job里加入選項「wait_until_complete=False」便可。
 

2.5 送多個job

 

  • GearmanClient.submit_multiple_jobs(jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None)
Takes a list of jobs_to_submit with dicts of{‘task’: task, ‘data’: data, ‘unique’: unique, ‘priority’: priority}
這裏jobs_to_submit是一組job,每一個job是上述格式的字典,這裏解釋一下unique,unique是設置task的unique key,即在小結2.1中的job_request.job.unique的值,若是不設置的話,會自動分配。
  • GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have moved to STATE_PENDING
  • GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have completed or failed
  • GearmanClient.submit_multiple_requests(job_requests, wait_until_complete=True, poll_timeout=None)
Take Gearman JobRequests, assign them connections, and request that they be done.
  • GearmanClient.wait_until_jobs_accepted(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have moved to STATE_PENDING
  • GearmanClient.wait_until_jobs_completed(job_requests, poll_timeout=None)
Go into a select loop until all our jobs have completed or failed

 

下面是官網給的一個同步非阻塞方式發送多個job的例子,在該例子的最後,在取得server返回結果以前,用了wait_until_jobs_completed函數來等待task中的全部job返回結果:

 

[python]  view plain  copy
 
  1. import time  
  2. gm_client = gearman.GearmanClient(['localhost:4730'])  
  3.   
  4. list_of_jobs = [dict(task="task_name", data="binary data"), dict(task="other_task", data="other binary data")]  
  5. submitted_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False, wait_until_complete=False)  
  6.   
  7. # Once we know our jobs are accepted, we can do other stuff and wait for results later in the function  
  8. # Similar to multithreading and doing a join except this is all done in a single process  
  9. time.sleep(1.0)  
  10.   
  11. # Wait at most 5 seconds before timing out incomplete requests  
  12. completed_requests = gm_client.wait_until_jobs_completed(submitted_requests, poll_timeout=5.0)  
  13. for completed_job_request in completed_requests:  
  14.     check_request_status(completed_job_request)  
下面這個例子中,用到了submit_multiple_requests函數對超時的請求再次檢查。

 

 

[python]  view plain  copy
 
  1. import time  
  2. gm_client = gearman.GearmanClient(['localhost:4730'])  
  3.   
  4. list_of_jobs = [dict(task="task_name", data="task binary string"), dict(task="other_task", data="other binary string")]  
  5. failed_requests = gm_client.submit_multiple_jobs(list_of_jobs, background=False)  
  6.   
  7. # Let's pretend our assigned requests' Gearman servers all failed  
  8. assert all(request.state == JOB_UNKNOWN for request in failed_requests), "All connections didn't fail!"  
  9.   
  10. # Let's pretend our assigned requests' don't fail but some simply timeout  
  11. retried_connection_failed_requests = gm_client.submit_multiple_requests(failed_requests, wait_until_complete=True, poll_timeout=1.0)  
  12.   
  13. timed_out_requests = [job_request for job_request in retried_requests if job_request.timed_out]  
  14.   
  15. # For our timed out requests, lets wait a little longer until they're complete  
  16. retried_timed_out_requests = gm_client.submit_multiple_requests(timed_out_requests, wait_until_complete=True, poll_timeout=4.0)  

 

2.6 序列化

默認狀況下,gearman的client只能傳輸的data只能是字符串格式的,所以,要傳輸python數據結構,必須使用序列化方法。所幸的是,GearmanClient提供了data_encoder,容許定義序列化和反序列化方法,例如:
[python]  view plain  copy
 
  1. import pickle  
  2.   
  3. class PickleDataEncoder(gearman.DataEncoder):  
  4.     @classmethod  
  5.     def encode(cls, encodable_object):  
  6.         return pickle.dumps(encodable_object)  
  7.  
  8.     @classmethod  
  9.     def decode(cls, decodable_string):  
  10.         return pickle.loads(decodable_string)  
  11.   
  12. class PickleExampleClient(gearman.GearmanClient):  
  13.     data_encoder = PickleDataEncoder  
  14.   
  15. my_python_object = {'hello': 'there'}  
  16.   
  17. gm_client = PickleExampleClient(['localhost:4730'])  
  18. gm_client.submit_job("task_name", my_python_object)  


3 worker

3.1 主要API

worker端一樣提供了豐富的API,主要有:
  • GearmanWorker.set_client_id(client_id):設置自身ID
  • GearmanWorker.register_task(task, callback_function):爲task註冊處理函數callback_function,其中callback_function的定義格式爲:
    [python]  view plain  copy
     
    1. def function_callback(calling_gearman_worker, current_job):  
    2.     return current_job.data  
  • GearmanWorker.unregister_task(task):註銷worker上定義的函數
  • GearmanWorker.work(poll_timeout=60.0): 無限次循環, 完成發送過來的job.
  • GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
  • GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None):Send a Gearman JOB_STATUS update for an inflight job
  • GearmanWorker.send_job_warning(current_job, data, poll_timeout=None):Send a Gearman JOB_WARNING update for an inflight job
 

3.2 簡單示例

而worker端其實和client端差很少,也是要鏈接到server端,不一樣的是,worker端須要綁定函數來處理具體的job:
[python]  view plain  copy
 
  1. import gearman  
  2.   
  3. gm_worker = gearman.GearmanWorker(['localhost:4730'])  
  4.   
  5. def task_listener_reverse(gearman_worker, gearman_job):  
  6.     print 'Reversing string:' + gearman_job.data  
  7.     return gearman_job.data[::-1]  
  8.   
  9. gm_worker.set_client_id("worker_revers")  
  10. gm_worker.register_task("reverse", task_listener_reverse)  
  11.   
  12. gm_worker.work()  
能夠看到,在worker一樣要鏈接到本地4730端口的server,給了一個job處理函數,反序job傳來的數據並返回,register_task函數將名爲」reverse「的job和task_listener_reverse函數註冊在一塊兒,說明該函數用來處理名爲」reverse」的job的,最後調用work函數來工做。來,咱們看看效果吧,首先啓用client.py文件,此時由於worker還沒啓動,client在此阻塞住,等待task處理。而後運行worker程序,能夠看到client和worker的輸出:
 



3.2 返回結果

worker提供了3個API能夠在worker函數中發送job的數據、狀態和警告:
  • GearmanWorker.send_job_data(current_job, data, poll_timeout=None): Send a Gearman JOB_DATA update for an inflight job
  • GearmanWorker.send_job_status(current_job, numerator, denominator, poll_timeout=None): Send a Gearman JOB_STATUS update for an inflight job
  • GearmanWorker.send_job_warning(current_job, data, poll_timeout=None): Send a Gearman JOB_WARNING update for an inflight job
下面是來自官網的例子:
[python]  view plain  copy
 
  1. gm_worker = gearman.GearmanWorker(['localhost:4730'])  
  2.   
  3. # See gearman/job.py to see attributes on the GearmanJob  
  4. # Send back a reversed version of the 'data' string through WORK_DATA instead of WORK_COMPLETE  
  5. def task_listener_reverse_inflight(gearman_worker, gearman_job):  
  6.     reversed_data = reversed(gearman_job.data)  
  7.     total_chars = len(reversed_data)  
  8.   
  9.     for idx, character in enumerate(reversed_data):  
  10.         gearman_worker.send_job_data(gearman_job, str(character))  
  11.         gearman_worker.send_job_status(gearman_job, idx + 1, total_chars)  
  12.   
  13.     return None  
  14.   
  15. # gm_worker.set_client_id is optional  
  16. gm_worker.register_task('reverse', task_listener_reverse_inflight)  
  17.   
  18. # Enter our work loop and call gm_worker.after_poll() after each time we timeout/see socket activity  
  19. gm_worker.work()  
 

3.3 數據序列化

同client同樣,worker端也只能發送字符類型的數據,若是想要發送python裏的結構體,必須用序列化將其轉化成字符串。與client同樣,worker也有一個encoder,你一樣能夠在裏面定義序列化和反序列化的方法,不過值得注意的是,worker端的序列化和反序列化方法必須對應,不然client和worker端的發送的數據都不能被彼此爭取的反序列化。下面演示了使用JSON方法來進行序列化:
[python]  view plain  copy
 
  1. import json # Or similarly styled library  
  2. class JSONDataEncoder(gearman.DataEncoder):  
  3.     @classmethod  
  4.     def encode(cls, encodable_object):  
  5.         return json.dumps(encodable_object)  
  6.  
  7.     @classmethod  
  8.     def decode(cls, decodable_string):  
  9.         return json.loads(decodable_string)  
  10.   
  11. class DBRollbackJSONWorker(gearman.GearmanWorker):  
  12.     data_encoder = JSONDataEncoder  
  13.   
  14.     def after_poll(self, any_activity):  
  15.         # After every select loop, let's rollback our DB connections just to be safe  
  16.         continue_working = True  
  17.         # self.db_connections.rollback()  
  18.         return continue_working  

worker端提供了rollback函數,每次輪詢完查看socket是否活躍或者timeout時就會調用這個函數:
GearmanWorker. after_poll ( any_activity )

Polling callback to notify any outside listeners whats going on with the GearmanWorker.

Return True to continue polling, False to exit the work loop


4 admin_client

前面講了Client和Worker,對於server也提供了一些API,能夠對其進行監控和設置,好比:設置queue大小、關閉鏈接、查看狀態、查看worker等,用於操做的對象時GearmanAdminClient,其定義以下:

class gearman.admin_client.GearmanAdminClient(host_list=None,poll_timeout=10.0)
所提供的API有:
  • GearmanAdminClient.send_maxqueue(task, max_size): Sends a request to change the maximum queue size for a given task
  • GearmanAdminClient.send_shutdown(graceful=True): Sends a request to shutdown the connected gearman server
  • GearmanAdminClient.get_status():Retrieves a list of all registered tasks and reports how many items/workers are in the queue
  • GearmanAdminClient.get_version(): Retrieves the version number of the Gearman server
  • GearmanAdminClient.get_workers():Retrieves a list of workers and reports what tasks they’re operating on
  • GearmanAdminClient.ping_server(): Sends off a debugging string to execute an application ping on the Gearman server, return the response time
[python]  view plain  copy
 
  1. gm_admin_client = gearman.GearmanAdminClient(['localhost:4730'])  
  2.   
  3. # Inspect server state  
  4. status_response = gm_admin_client.get_status()  
  5. version_response = gm_admin_client.get_version()  
  6. workers_response = gm_admin_client.get_workers()  
  7. response_time = gm_admin_client.ping_server()  


5. job對象

5.1 GearmanJob

GearmanJob對象提供了發送到server的job的最基本信息,其定義以下:
class gearman.job.GearmanJob(connection, handle, task, unique, data)

 server信息

當咱們獲得一個job對象後,想知道與之相連的server信息時,就能夠調用以下兩個屬性:
  • GearmanJob.connection: GearmanConnection - Server assignment. Could be None prior to client job submission
  • GearmanJob.handle:string - Job’s server handle. Handles are NOT interchangeable across different gearman servers
 
 job參數
  • GearmanJob.task:string - Job’s task
  • GearmanJob.unique:string - Job’s unique identifier (client assigned)
  • GearmanJob.data:binary - Job’s binary payload
 

5.2  GearmanJobRequest

GearmanJobRequest是job請求的狀態跟蹤器,表明一個job請求,可用於GearmanClient中,其定義以下:
class gearman.job.GearmanJobRequest(gearman_jobinitial_priority=Nonebackground=False,max_attempts=1) 

跟蹤job發送

  • GearmanJobRequest.gearman_job:             GearmanJob - Job that is being tracked by this GearmanJobRequest object
  • GearmanJobRequest.priority:                PRIORITY_NONE [default]、PRIORITY_LOW、PRIORITY_HIGH
  • GearmanJobRequest.background:              boolean - Is this job backgrounded?
  • GearmanJobRequest.connection_attempts:     integer - Number of attempted connection attempts
  • GearmanJobRequest.max_connection_attempts: integer - Maximum number of attempted connection attempts before raising an exception
 

跟蹤job執行過程

  • GearmanJobRequest.result:    binary - Job’s returned binary payload - Populated if and only if JOB_COMPLETE
  • GearmanJobRequest.exception:    binary - Job’s exception binary payload
  • GearmanJobRequest.state:     
  • GearmanJobRequest.timed_out:    boolean - Did the client hit its polling_timeout prior to a job finishing?
  • GearmanJobRequest.complete:     boolean - Does the client need to continue to poll for more updates from this job?
其中GearmanJobRequest.state的返回值能夠有:
  • JOB_UNKNOWN - Request state is currently unknown, either unsubmitted or connection failed
  • JOB_PENDING - Request has been submitted, pending handle
  • JOB_CREATED - Request has been accepted
  • JOB_FAILED - Request received an explicit job failure (job done but errored out)
  • JOB_COMPLETE - Request received an explicit job completion (job done with results)
 

跟蹤運行中的job狀態

某些特定的GearmanJob在實際完成以前就可能發回數據。GearmanClient用一些隊列來保存跟蹤這些發回數據的時間和內容等
  • GearmanJobRequest.warning_updates: collections.deque - Job’s warning binary payloads
  • GearmanJobRequest.data_updates:        collections.deque - Job’s data binary payloads
  • GearmanJobRequest.status:                       dictionary - Job’s status
其中,GearmanJobRequest.status返回job的狀態是一個字典,內容有:
  • handle - string - Job handle
  • known - boolean - Is the server aware of this request?
  • running - boolean - Is the request currently being processed by a worker?
  • numerator - integer
  • denominator - integer
  • time_received - integer - Time last updated
相關文章
相關標籤/搜索