1、線程和進程python
#multiprocess.py import os print 'Process (%s) start....' % os.getpid() pid=os.fork() if pid==0: print 'I am child process (%s) and my parent is %s ' % (os.getpid(),os.getppid()) else: print 'i am father process'
form multiprocessing import Process import os #子進程要執行的代碼 def run_proc(name): print 'Run child process %s (%s)' %(name,os.getpid()) if __name__=='__main__': print 'Parent process %s' % os.getpid() p=Process(target=run_proc,args=('test',)) print 'Process will start' p.start() p.join() print 'Process end'. 運行結果以下: Parent process 389 Process will start Run child process test (340) Process end
from multiprocessing import Pool import os,time,random def long_time_task(name): print 'Run task %s %s' %(name,os.getpid()) start = time.time() time.sleep(random.random*3) end=time.time() print 'Task %s runs %0.2f seconds' % (name,(end-start)) if __name__=='__main__' print 'Parent process %s' % os.getpid() p=Pool() for i in range(5): p.apply_async(long_time_task,args(i,)) print 'waiting for all subprocesses done.....' p.close() p.join() print 'All subprocesses done'
from multiprocessing import Process ,Queue import os,time,random #寫數據進程執行的代碼 def write(q): for value in ['A','B','C']: print 'Put %s to queue ....' %value q.put(value) time.sleep(random.random()) #讀數據進程執行的代碼: def read(q): while True: value=q.get(True) print 'Get %s from queue' %value if __name__=='__main__': #父進程建立Queue 並傳給各個子進程 q=Queue() pw=Process(target=write,args=(q,)) pr=Process(target=read,args=(q,)) #啓動子進程 pw pw.start() #啓動子進程pr pr.start() #等待pw結束 pw.join() #pr進程裏是死循環 沒法等待結束 只能強行終止 pr.terminate()
#-*-coding:utf-8-*- import time,threading #新線程執行的代碼 def loop(): print 'Thread %s is runing ..' % threading.current_thread().name n=0 while n<5: n=n+1 print 'thread %s >>> %s' % (threading.current_thread().name,n) time.sleep(1) print 'thread %s ended' % threading.current_thread().name print 'thread %s is runing ' % threading.current_thread().name t=threading.Thread(target=loop,name='LoopThread') t.start() t.join() print 'thread %s ended:' % threading.current_thread().name
#-*-coding:utf-8-*- import time,threading #假定這是銀行存款 balance=0 lock=threading.Lock() def change_it(n): #先存後取 結果應該爲0 global balance print '%s balance = %d' %(threading.current_thread().name,balance) balance=balance+n print '%s balance = %d' %(threading.current_thread().name,balance) balance=balance-n print '%s balance = %d' %(threading.current_thread().name,balance) def run_thread(n): for x in range(100000): lock.acquire() try: change_it(n) except Exception, e: raise e finally: lock.release() t1=threading.Thread(target=run_thread,args=(5,)) t2=threading.Thread(target=run_thread,args=(8,)) t1.start() t2.start() t1.join() t2.join() print balance
def process_student(name): std =Student(name) #std 是局部變量 可是每一個函數都要用它 所以必須穿進去 do_task_1(std) do_task_2(std) def do_task_1(std) do_subtask_1(std) do_subtask_2(std) def do_task_2(std) do_subtask_1(std) do_subtask_2(std)
#-*-coding:utf-8-*- import threading #建立全局ThreadLocal對象: local_school=threading.local() def process_student(): print 'Hello ,%s (in %s)' %(local_school.student,threading.current_thread().name) def process_thread(name): #綁定threadlocal的student local_school.student=name process_student() t1=threading.Thread(target=process_thread,args=('Alice',),name='Thread-A') t2=threading.Thread(target=process_thread,args=('Bob',),name='Thread-B') t1.start() t2.start() t1.join() t2.join()
#taskmanager.py import random,time,Queue from multiprocessing.managers import BaseManager #發送任務的隊列 task_queue=Queue.Queue() #接受結果的隊列 result_queue=Queue.Queue() #從basemanager 繼承queuemanager class QueueManager(BaseManager): pass # 把兩個Queue都註冊到網上 callable參數關聯了Queue對象: QueueManager.register('get_task_queue',callable=lambda:task_queue) QueueManager.register('get_result_queue,callable=lambda:result_queue') #綁定端口50000 設置驗證碼 abc manager=QueueManager(address=('',5000),authkey='abc') #啓動Queue manager.start() #得到經過網絡訪問的Queue對象: task=manager.get_task_queue() result=manager.get_result_queue() #放幾個任務進去 for i in range(10): n=random.randint(0,10000) print('Put task %d' %n) task.put(n) #從result隊列讀取結果 print('Try get results....'): for i in range(10): r=result.get(timeout=10) print('Result:%s' % r) #關閉 manager.shutdown()
在另外一臺計算機上啓動:web
#taskworker.py import time,sys,Queue from multiprocessing.managers import BaseManager #建立相似的QueueManager class QueueManager(BaseManager): pass #因爲這個QueueManager 只從網上得到註冊時只提供了名字 QueueManager.register('get_task_queue') QueueManager.register('get_task_result') #連接到服務器 也就是運行taskmanager.py的機器 server_addr='127.0.0.1' print('connect to server %s ..' % server_addr) #端口驗證碼一致 m=QueueManager(address=(server_addr,5000),authkey='abc') #從網絡連接 m.connect() #獲取queue對象 task=m.get_task_queue() result=m.get_task_result() #從task隊列中取出數據 並把結果放到result隊列中 for i in range(10): try: n=task.get(timeout=1) print('run task %d*%d' %(n,n)) r='%d*%d=%d' % (n,n,n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty') #處理結束 print('work exit')
這樣就能夠將任務拆分 並將任務發送到幾臺 幾十臺機器上進行處理。數據庫
2、memcachewindows
理解一些概念:
Memcache是一個自由和開放源代碼、高性能、分配的內存對象緩存系統。用於加速動態web應用程序,減輕數據庫負載。它能夠應對任意多個鏈接,使用非阻塞的網絡IO。因爲它的工做機制是在內存中開闢一塊空間,而後創建一個HashTable,Memcached自管理這 些HashTable。Memcached是簡單而強大的。它簡單的設計促進迅速部署,易於發展所面臨的問題,解決了不少大型數據緩存。它的API可供最流行的語言。Memcache是該系統的項目名稱,Memcached是該系統的主程序文件,以守護程序方式運行於一個或多個服務器中,隨時接受客 戶端的鏈接操做,使用共享內存存取數據。
Memcached最吸引人的一個特性就是支持分佈式部署;也就是說能夠在一羣機器上創建一堆Memcached服務,每一個服務能夠根據具體服務器的硬件配置使用不一樣大小的內存塊,這樣一來,理論上能夠創建一個無限巨大的基於內存的cache storage系統。緩存
1.分別把memcached和libevent下載回來,放到 /tmp 目錄下
Memcache用到了libevent這個庫用於Socket的處理,因此還須要安裝libevent。服務器
cd /tmp wget http://www.danga.com/memcached/dist/memcached-1.2.0.tar.gz wget http://www.monkey.org/~provos/libevent-1.2.tar.gz
先安裝libevent網絡
$ tar zxvf libevent-1.2.tar.gz $ cd libevent-1.2 $ ./configure –prefix=/usr $ make $ make install
3.測試libevent是否安裝成功多線程
$ ls -al /usr/lib | grep libevent
2
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent-1.2.so.1 -> libevent-1.2.so.1.0.3
3
-rwxr-xr-x 1 root root 263546 11?? 12 17:38 libevent-1.2.so.1.0.3
4
-rw-r–r– 1 root root 454156 11?? 12 17:38 libevent.a
5
-rwxr-xr-x 1 root root 811 11?? 12 17:38 libevent.la
6
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent.so -> libevent-1.2.so.1.0.3併發
還不錯,都安裝上了。app
4.安裝memcached,同時須要安裝中指定libevent的安裝位置
1
$ cd /tmp
2
$ tar zxvf memcached-1.2.0.tar.gz
3
$ cd memcached-1.2.0
4
$ ./configure –with-libevent=/usr # 注意這個配置跟着libevent走
5
$ make
6
$ make install
若是中間出現報錯,請仔細檢查錯誤信息,按照錯誤信息來配置或者增長相應的庫或者路徑。
安裝完成後會把memcached放到 /usr/local/bin/memcached ,
5.測試是否成功安裝memcached
1
$ ls -al /usr/local/bin/mem*
2
-rwxr-xr-x 1 root root 137986 11?? 12 17:39 /usr/local/bin/memcached
3
-rwxr-xr-x 1 root root 140179 11?? 12 17:39 /usr/local/bin/memcached-debug
6 安裝Python-memcached安裝
memcached運行參數:
-d選項是啓動一個守護進程,
-m是分配給Memcache使用的內存數量,單位是MB,我這裏是10MB,
-u是運行Memcache的用戶,我這裏是root,
-l是監聽的服務器IP地址,若是有多個地址的話,我這裏指定了服務器的IP地址192.168.22.200(不指定爲本機)
-p是設置Memcache監聽的端口,我這裏設置了12000,最好是1024以上的端口,
-c選項是最大運行的併發鏈接數,默認是1024,我這裏設置了256,按照你服務器的負載量來設定,
-P是設置保存Memcache的pid文件,我這裏是保存在 /tmp/memcached.pid,
python使用例子:
#!/usr/bin/env python import memcache mc = memcache.Client(['127.0.0.1:12000'],debug=0) mc.set("foo","bar") value = mc.get("foo") print value
Python-memcached API總結整個memcache.py只有1241行,至關精簡主要方法以下:@set(key,val,time=0,min_compress_len=0)無條件鍵值對的設置,其中的time用於設置超時,單位是秒,而min_compress_len則用於設置zlib壓縮(注:zlib是提供數據壓縮用的函式庫)@set_multi(mapping,time=0,key_prefix=」,min_compress_len=0)設置多個鍵值對,key_prefix是key的前綴,完整的鍵名是key_prefix+key, 使用方法以下>>> mc.set_multi({‘k1′ : 1, ‘k2′ : 2}, key_prefix=’pfx_’) == []>>> mc.get_multi(['k1', 'k2', 'nonexist'], key_prefix=’pfx_’) == {‘k1′ : 1, ‘k2′ : 2}@add(key,val,time=0,min_compress_len=0)添加一個鍵值對,內部調用_set()方法@replace(key,val,time=0,min_compress_len=0)替換value,內部調用_set()方法@get(key)根據key去獲取value,出錯返回None@get_multi(keys,key_prefix=」)獲取多個key的值,返回的是字典。keys爲key的列表@delete(key,time=0)刪除某個key。time的單位爲秒,用於確保在特定時間內的set和update操做會失敗。若是返回非0則表明成功@incr(key,delta=1)自增變量加上delta,默認加1,使用以下>>> mc.set(「counter」, 「20″)>>> mc.incr(「counter」)@decr(key,delta=1)自減變量減去delta,默認減1