多線程和多進程

1、線程和進程python

  多任務的實現方式有3中:
                        1:多進程模式
                        2:多線程模式
                        3:多進程+多線程模式
Python既支持多線程又支持多進程。
1:多進程:
Unix/Linux操做系統中提供一個fork()函數 在Python的OS模塊中就封裝了常見的系統調用 其中就包括 fork() 能夠再Python中輕鬆的建立子進程。
    #multiprocess.py
 import os
                    
 print 'Process (%s) start....' % os.getpid()
 pid=os.fork()
 if pid==0:
    print 'I am child process (%s) and my parent is %s ' %       (os.getpid(),os.getppid())
 else:
    print 'i am father process'
 因爲windows上面沒有fork()故上述代碼不能夠在windows上面運行。
            
因爲Python是跨平臺的 天然也提供一個跨平臺的多進程支持 multiprocessing 模塊就是跨平臺版本的多進程支持
                
multiprocessing 模塊提供了一個Process類來表明 一個進程對象 下面的例子演示了啓動一個子進程並等待其結束
 form multiprocessing import Process
                    import os
                
                    #子進程要執行的代碼
                    def run_proc(name):
                        print 'Run child process %s (%s)' %(name,os.getpid())
        
                    if __name__=='__main__':
                        print 'Parent process %s' % os.getpid()
                        p=Process(target=run_proc,args=('test',))
                        print 'Process will start'
                        p.start()
                        p.join()
                        print 'Process end'.
                    運行結果以下:
                    Parent process 389
                    Process will start
                    Run child process test (340)
                    Process end
 建立子進程時候 只須要傳入一個執行函數和函數的參數便可 穿件一個process實例 而後用start()函數啓動 join()函數等待子進程結束後在繼續往下運行。
                    
 若是要啓動大量的子進程 能夠用進程池的方式批量建立子進程:
  from multiprocessing import Pool
                            import os,time,random
            
                            def long_time_task(name):
                                print 'Run task %s %s' %(name,os.getpid())
                                start = time.time()
                                time.sleep(random.random*3)
                                end=time.time()
                                print 'Task %s runs %0.2f seconds' % (name,(end-start))
                
                            if __name__=='__main__'
                                print 'Parent process %s' % os.getpid()
                                p=Pool()
                                for i in range(5):
                                    p.apply_async(long_time_task,args(i,))
                                print 'waiting for all subprocesses done.....'
                                p.close()
                                p.join()
                                print 'All subprocesses done'
 調用join以前先調用close 調用close以後就不可以繼續添加新的Process 了
            
                    進程間的通行:
                        Python的multiprocessing模塊包裝了底層的機制 提供了Queue Pipes 等多種方式來交換數據                
                        以Queue爲例 在父進程中建立兩個子進程 一個往Queue中寫數據 一個從Queue中讀數據
from multiprocessing import Process ,Queue
                            import os,time,random
                
                            #寫數據進程執行的代碼
                            def write(q):
                                for value in ['A','B','C']:
                                    print 'Put %s to queue ....' %value
                                    q.put(value)
                                    time.sleep(random.random())
                            #讀數據進程執行的代碼:
                            def read(q):
                                while True:
                                    value=q.get(True)
                                    print 'Get %s from queue'  %value
                    
                            if __name__=='__main__':
                                #父進程建立Queue 並傳給各個子進程
                                q=Queue()
                                pw=Process(target=write,args=(q,))
                                pr=Process(target=read,args=(q,))
                                #啓動子進程 pw
                                pw.start()
                                #啓動子進程pr
                                pr.start()
                                #等待pw結束
                                pw.join()
                                #pr進程裏是死循環 沒法等待結束 只能強行終止
                                pr.terminate()
2:多線程:
                        一個進程至少有一個線程 Python的線程是真正的 Posix Thread 而不是模擬出來的線程
                        Python 的標準庫提供了兩個模塊 thread和threading 。thread是低級模塊 threading是高級模塊 對thread進行了封裝 絕大多數狀況下 咱們只須要使用threading這個高級模塊
                        啓動一個線程就是把這個函數傳入並建立thread實例 而後調用start()開始執行
 #-*-coding:utf-8-*-
import time,threading
#新線程執行的代碼
def loop():
 print 'Thread %s is runing ..' % threading.current_thread().name
 n=0
 while n<5:
  n=n+1
  print 'thread %s >>> %s' % (threading.current_thread().name,n)
  time.sleep(1)
 print 'thread %s ended' % threading.current_thread().name
print 'thread %s is runing ' % threading.current_thread().name
t=threading.Thread(target=loop,name='LoopThread')
t.start()
t.join()
print 'thread %s ended:' % threading.current_thread().name
因爲任何進程默認啓動一個線程,這個線程就是主線程,MainTread 主線程又能夠啓動新的線程 Python的threading模塊有個current_thread()函數 它返回
                        當前線程的實例。子線程的名字在建立的時候指定,名字僅僅在打印的時候顯示 徹底沒有其餘的意義。
                            
                    lock:
                        多線程和多進程的最大的不一樣在與,多進程中 同一個變量,各自有一份拷貝存在於每一個進程中,互不影響,而多線程中全部的變量都有線程共享,因此 任何一個變量均可以被任何一個
                        線程修改,所以 吸納成之間共享的數據最大的危險在與多個線程同時修改一個變量 把內容該亂了
                        爲了保證多個線程不能同時執行同一條語句咱們增長 了鎖 的概念:
#-*-coding:utf-8-*-
import time,threading
#假定這是銀行存款
balance=0
lock=threading.Lock()
def change_it(n):
 #先存後取 結果應該爲0
 global balance
 print '%s balance = %d' %(threading.current_thread().name,balance)
 balance=balance+n
 print '%s balance = %d' %(threading.current_thread().name,balance)
 balance=balance-n
 print '%s balance = %d' %(threading.current_thread().name,balance)
def run_thread(n):
 for x in range(100000):
  lock.acquire()
  try:
   change_it(n)
  except Exception, e:
   raise e
  finally:
   lock.release()
t1=threading.Thread(target=run_thread,args=(5,))
t2=threading.Thread(target=run_thread,args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print balance
當多個線程執行到lock.acquire()時 只有一個線程可以成功的獲取鎖 而後繼續執行代碼 其餘線程等待知道得到鎖爲止。
                        得到鎖的線程用完後必定要釋放鎖 不然那些苦苦等待的進程將永遠等待下去 成爲死進程 
                        鎖保證了某段關鍵代碼只能有一個線程從頭至尾的完整執行 壞處就是阻止 了多線程的併發 下降了效率 其次就是因爲能夠存在多個鎖 不一樣線程持有不一樣的鎖 可能致使死鎖的發生。
 
                    多核CPU:
                        一個死循環會100%佔用一個CPU 若是有兩個死循環 在多核CPU中 能夠監控到會佔用200%的CPU 也就是佔用兩個CPU核心
                        要想把N個CPU的核心都爆滿 就必須啓動N個死循環線程
                        因爲Python中代碼有一個GIL鎖 任何Python線程執行前 必須得到GIL鎖 而後每執行100條字節碼 解釋器自動釋放GIL鎖 讓別的線程又機會執行。這是歷史遺留問題
                        所以 在Python中可使用多線程 可是不要期望可以有效的利用多核  能夠經過多進程來實現多核任務。
                   
                4:ThreadLocal
                    在多線程的環境下 每一個線程都有本身的數據 一個線程使用本身的局部變量比使用全局變量好 由於局部變量只有本身可以看見 不會影響其餘的線程而全局變量的修改必須加鎖
                    可是局部變量也有問題 就是在函數調用的時候 傳遞起來麻煩
def process_student(name):
                        std =Student(name)
                        #std 是局部變量 可是每一個函數都要用它 所以必須穿進去
                        do_task_1(std)
                        do_task_2(std)
            
                    def do_task_1(std)
                        do_subtask_1(std)
                        do_subtask_2(std)
            
                    def do_task_2(std)
                        do_subtask_1(std)
                        do_subtask_2(std)
 這樣一級一級傳遞極爲麻煩
                    所以 ThreadLocal 就應運而生 
#-*-coding:utf-8-*-
import threading
#建立全局ThreadLocal對象:
local_school=threading.local()
def process_student():
 print 'Hello ,%s (in %s)' %(local_school.student,threading.current_thread().name)
def process_thread(name):
 #綁定threadlocal的student
 local_school.student=name
 process_student()
t1=threading.Thread(target=process_thread,args=('Alice',),name='Thread-A')
t2=threading.Thread(target=process_thread,args=('Bob',),name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
全局變量local_student 就是一個ThreadLocal 對象 每一個thread對他均可以讀寫student屬性 可是相互之間不影響 
                            
                   
             5 分佈式進程:
                    在thread和process中應當優先選擇process 由於process跟穩定 並且process能夠分佈到多臺機器上 而thread最多隻能在一臺機器的多個CPU上
                    Python的multiprocess模塊不但支持多進程 其中。managers子模塊還支持把多個進程分佈到多臺機器上 一個服務進程做爲調度 將任務分佈到其餘的進程中 依靠網絡通訊
                    因爲managers 封裝的很好 沒必要了解網絡通訊的細節 就能夠很容易的編寫分佈式多進程程序
                    例如:  把發送任務的進程和處理任務的進程分佈到兩個機器上:
                    服務進程 啓動Queue 把Queue 註冊到網上 而後往   Queue裏面寫入任務:
 #taskmanager.py
                        import random,time,Queue
                        from multiprocessing.managers import BaseManager
            
                        #發送任務的隊列
                        task_queue=Queue.Queue()
                        #接受結果的隊列
                        result_queue=Queue.Queue()
                    
                        #從basemanager 繼承queuemanager
                        class QueueManager(BaseManager):
                            pass
                
                        # 把兩個Queue都註冊到網上 callable參數關聯了Queue對象:
                        QueueManager.register('get_task_queue',callable=lambda:task_queue)
                        QueueManager.register('get_result_queue,callable=lambda:result_queue')
            
                        #綁定端口50000 設置驗證碼 abc
                        manager=QueueManager(address=('',5000),authkey='abc')
                
                        #啓動Queue
                        manager.start()
                        
                        #得到經過網絡訪問的Queue對象:
                        task=manager.get_task_queue()
                        result=manager.get_result_queue()
                        #放幾個任務進去
                        for i in range(10):
                            n=random.randint(0,10000)
                            print('Put task %d' %n)
                            task.put(n)
                        #從result隊列讀取結果
                        print('Try get results....'):
                        for i in range(10):
                            r=result.get(timeout=10)
                            print('Result:%s' % r)
                        #關閉
                        manager.shutdown()

在另外一臺計算機上啓動:web

 #taskworker.py
                        import time,sys,Queue
                        from multiprocessing.managers import BaseManager
                        #建立相似的QueueManager
                        class QueueManager(BaseManager):
                            pass
            
                        #因爲這個QueueManager 只從網上得到註冊時只提供了名字
                        QueueManager.register('get_task_queue')
                        QueueManager.register('get_task_result')
                    
                        #連接到服務器 也就是運行taskmanager.py的機器
                        server_addr='127.0.0.1'
                        print('connect to server %s ..' % server_addr)
        
                        #端口驗證碼一致
                        m=QueueManager(address=(server_addr,5000),authkey='abc')
                        #從網絡連接
                        m.connect()
                        #獲取queue對象
                        task=m.get_task_queue()
                        result=m.get_task_result()
                        #從task隊列中取出數據 並把結果放到result隊列中
                        for i in range(10):
                            try:
                                n=task.get(timeout=1)
                                print('run task %d*%d' %(n,n))
                                r='%d*%d=%d' % (n,n,n*n)
                                time.sleep(1)
                                result.put(r)
                            except Queue.Empty:
                                print('task queue is empty')
                        #處理結束
                        print('work exit')

這樣就能夠將任務拆分 並將任務發送到幾臺 幾十臺機器上進行處理。數據庫

2、memcachewindows

理解一些概念:
Memcache是一個自由和開放源代碼、高性能、分配的內存對象緩存系統。用於加速動態web應用程序,減輕數據庫負載。它能夠應對任意多個鏈接,使用非阻塞的網絡IO。因爲它的工做機制是在內存中開闢一塊空間,而後創建一個HashTable,Memcached自管理這 些HashTable。Memcached是簡單而強大的。它簡單的設計促進迅速部署,易於發展所面臨的問題,解決了不少大型數據緩存。它的API可供最流行的語言。Memcache是該系統的項目名稱,Memcached是該系統的主程序文件,以守護程序方式運行於一個或多個服務器中,隨時接受客 戶端的鏈接操做,使用共享內存存取數據。
Memcached最吸引人的一個特性就是支持分佈式部署;也就是說能夠在一羣機器上創建一堆Memcached服務,每一個服務能夠根據具體服務器的硬件配置使用不一樣大小的內存塊,這樣一來,理論上能夠創建一個無限巨大的基於內存的cache storage系統。緩存

1.分別把memcached和libevent下載回來,放到 /tmp 目錄下
Memcache用到了libevent這個庫用於Socket的處理,因此還須要安裝libevent。服務器

cd /tmp
wget http://www.danga.com/memcached/dist/memcached-1.2.0.tar.gz
 wget http://www.monkey.org/~provos/libevent-1.2.tar.gz

先安裝libevent網絡

$ tar zxvf libevent-1.2.tar.gz
$ cd libevent-1.2
$ ./configure –prefix=/usr
$ make
$ make install

3.測試libevent是否安裝成功多線程

$ ls -al /usr/lib | grep libevent
2
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent-1.2.so.1 -> libevent-1.2.so.1.0.3
3
-rwxr-xr-x 1 root root 263546 11?? 12 17:38 libevent-1.2.so.1.0.3
4
-rw-r–r– 1 root root 454156 11?? 12 17:38 libevent.a
5
-rwxr-xr-x 1 root root 811 11?? 12 17:38 libevent.la
6
lrwxrwxrwx 1 root root 21 11?? 12 17:38 libevent.so -> libevent-1.2.so.1.0.3併發

還不錯,都安裝上了。app

4.安裝memcached,同時須要安裝中指定libevent的安裝位置

1
$ cd /tmp
2
$ tar zxvf memcached-1.2.0.tar.gz
3
$ cd memcached-1.2.0
4
$ ./configure –with-libevent=/usr # 注意這個配置跟着libevent走
5
$ make
6
$ make install

若是中間出現報錯,請仔細檢查錯誤信息,按照錯誤信息來配置或者增長相應的庫或者路徑。
安裝完成後會把memcached放到 /usr/local/bin/memcached ,
5.測試是否成功安裝memcached

1
$ ls -al /usr/local/bin/mem*
2
-rwxr-xr-x 1 root root 137986 11?? 12 17:39 /usr/local/bin/memcached
3
-rwxr-xr-x 1 root root 140179 11?? 12 17:39 /usr/local/bin/memcached-debug

6 安裝Python-memcached安裝

$ sudo apt-get install python-memcache

memcached運行參數:

/usr/local/memcached/bin/memcached -d -m 128 -u root -l 192.168.0.97 -c 256 -P /tmp/memcached.pid

-d選項是啓動一個守護進程,
-m是分配給Memcache使用的內存數量,單位是MB,我這裏是10MB,
-u是運行Memcache的用戶,我這裏是root,
-l是監聽的服務器IP地址,若是有多個地址的話,我這裏指定了服務器的IP地址192.168.22.200(不指定爲本機)
-p是設置Memcache監聽的端口,我這裏設置了12000,最好是1024以上的端口,
-c選項是最大運行的併發鏈接數,默認是1024,我這裏設置了256,按照你服務器的負載量來設定,
-P是設置保存Memcache的pid文件,我這裏是保存在 /tmp/memcached.pid,

python使用例子:

#!/usr/bin/env python
import memcache
mc = memcache.Client(['127.0.0.1:12000'],debug=0)
mc.set("foo","bar")
value = mc.get("foo")
print value

 

Python-memcached API總結整個memcache.py只有1241行,至關精簡主要方法以下:@set(key,val,time=0,min_compress_len=0)無條件鍵值對的設置,其中的time用於設置超時,單位是秒,而min_compress_len則用於設置zlib壓縮(注:zlib是提供數據壓縮用的函式庫)@set_multi(mapping,time=0,key_prefix=」,min_compress_len=0)設置多個鍵值對,key_prefix是key的前綴,完整的鍵名是key_prefix+key, 使用方法以下>>> mc.set_multi({‘k1′ : 1, ‘k2′ : 2}, key_prefix=’pfx_’) == []>>> mc.get_multi(['k1', 'k2', 'nonexist'], key_prefix=’pfx_’) == {‘k1′ : 1, ‘k2′ : 2}@add(key,val,time=0,min_compress_len=0)添加一個鍵值對,內部調用_set()方法@replace(key,val,time=0,min_compress_len=0)替換value,內部調用_set()方法@get(key)根據key去獲取value,出錯返回None@get_multi(keys,key_prefix=」)獲取多個key的值,返回的是字典。keys爲key的列表@delete(key,time=0)刪除某個key。time的單位爲秒,用於確保在特定時間內的set和update操做會失敗。若是返回非0則表明成功@incr(key,delta=1)自增變量加上delta,默認加1,使用以下>>> mc.set(「counter」, 「20″)>>> mc.incr(「counter」)@decr(key,delta=1)自減變量減去delta,默認減1

相關文章
相關標籤/搜索