WEEK9:Python 多線程、進程

  • 數據庫操做與paramiko模塊
    • 該模塊基於ssh用於鏈接遠程服務器並執行相關操做
      • SSHClient #用於鏈接遠程服務器並執行基本命令
        • 基於用戶名和密碼鏈接
           1 #########執行命令
           2 import paramiko
           3 # 建立SSH對象
           4 ssh = paramiko.SSHClient()
           5 # 容許鏈接不在know_hosts(~/.ssh/known_hosts)文件中的主機
           6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
           7 # 鏈接服務器
           8 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', password='123')
           9 # 執行命令
          10 stdin, stdout, stderr = ssh.exec_command('df')
          11 # 獲取命令結果
          12 result = stdout.read()
          13 # 關閉鏈接
          14 ssh.close()
          15 
          16 #########SSHClient封裝Transport
          17 import paramiko
          18 transport = paramiko.Transport(('hostname', 22))
          19 transport.connect(username='wupeiqi', password='123')
          20 ssh = paramiko.SSHClient()
          21 ssh._transport = transport
          22 stdin, stdout, stderr = ssh.exec_command('df')
          23 print(stdout.read())
          24 transport.close()
        • 基於公鑰密鑰鏈接
           1 #########執行命令
           2 import paramiko
           3 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
           4 # 建立SSH對象
           5 ssh = paramiko.SSHClient()
           6 # 容許鏈接不在know_hosts文件中的主機
           7 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
           8 # 鏈接服務器
           9 ssh.connect(hostname='c1.salt.com', port=22, username='wupeiqi', key=private_key)
          10 # 執行命令
          11 stdin, stdout, stderr = ssh.exec_command('df')
          12 # 獲取命令結果
          13 result = stdout.read()
          14 # 關閉鏈接
          15 ssh.close()
          16 
          17 #########SSHClient 封裝 Transport
          18 import paramiko
          19 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
          20 transport = paramiko.Transport(('hostname', 22))
          21 transport.connect(username='wupeiqi', pkey=private_key)
          22 ssh = paramiko.SSHClient()
          23 ssh._transport = transport
          24 stdin, stdout, stderr = ssh.exec_command('df')
          25 transport.close()
          26 
          27 #########基於私鑰字符串進行鏈接
          28 import paramiko
          29 from io import StringIO
          30 
          31 key_str = """-----BEGIN RSA PRIVATE KEY-----
          32 MIIEpQIBAAKCAQEAq7gLsqYArAFco02/55IgNg0r7NXOtEM3qXpb/dabJ5Uyky/8
          33 NEHhFiQ7deHIRIuTW5Zb0kD6h6EBbVlUMBmwJrC2oSzySLU1w+ZNfH0PE6W6fans
          34 H80whhuc/YgP+fjiO+VR/gFcqib8Rll5UfYzf5H8uuOnDeIXGCVgyHQSmt8if1+e
          35 7hn1MVO1Lrm9Fco8ABI7dyv8/ZEwoSfh2C9rGYgA58LT1FkBRkOePbHD43xNfAYC
          36 tfLvz6LErMnwdOW4sNMEWWAWv1fsTB35PAm5CazfKzmam9n5IQXhmUNcNvmaZtvP
          37 c4f4g59mdsaWNtNaY96UjOfx83Om86gmdkKcnwIDAQABAoIBAQCnDBGFJuv8aA7A
          38 ZkBLe+GN815JtOyye7lIS1n2I7En3oImoUWNaJEYwwJ8+LmjxMwDCtAkR0XwbvY+
          39 c+nsKPEtkjb3sAu6I148RmwWsGncSRqUaJrljOypaW9dS+GO4Ujjz3/lw1lrxSUh
          40 IqVc0E7kyRW8kP3QCaNBwArYteHreZFFp6XmtKMtXaEA3saJYILxaaXlYkoRi4k8
          41 S2/K8aw3ZMR4tDCOfB4o47JaeiA/e185RK3A+mLn9xTDhTdZqTQpv17/YRPcgmwz
          42 zu30fhVXQT/SuI0sO+bzCO4YGoEwoBX718AWhdLJFoFq1B7k2ZEzXTAtjEXQEWm6
          43 01ndU/jhAasdfasdasdfasdfa3eraszxqwefasdfadasdffsFIfAsjQb4HdkmHuC
          44 OeJrJOd+CYvdEeqJJNnF6AbHyYHIECkj0Qq1kEfLOEsqzd5nDbtkKBte6M1trbjl
          45 HtJ2Yb8w6o/q/6Sbj7wf/cW3LIYEdeVCjScozVcQ9R83ea05J+QOAr4nAoGBAMaq
          46 UzLJfLNWZ5Qosmir2oHStFlZpxspax/ln7DlWLW4wPB4YJalSVovF2Buo8hr8X65
          47 lnPiE41M+G0Z7icEXiFyDBFDCtzx0x/RmaBokLathrFtI81UCx4gQPLaSVNMlvQA
          48 539GsubSrO4LpHRNGg/weZ6EqQOXvHvkUkm2bDDJAoGATytFNxen6GtC0ZT3SRQM
          49 WYfasdf3xbtuykmnluiofasd2sfmjnljkt7khghmghdasSDFGQfgaFoKfaawoYeH
          50 C2XasVUsVviBn8kPSLSVBPX4JUfQmA6h8HsajeVahxN1U9e0nYJ0sYDQFUMTS2t8
          51 RT57+WK/0ONwTWHdu+KnaJECgYEAid/ta8LQC3p82iNAZkpWlGDSD2yb/8rH8NQg
          52 9tjEryFwrbMtfX9qn+8srx06B796U3OjifstjJQNmVI0qNlsJpQK8fPwVxRxbJS/
          53 pMbNICrf3sUa4sZgDOFfkeuSlgACh4cVIozDXlR59Z8Y3CoiW0uObEgvMDIfenAj
          54 98pl3ZkCgYEAj/UCSni0dwX4pnKNPm6LUgiS7QvIgM3H9piyt8aipQuzBi5LUKWw
          55 DlQC4Zb73nHgdREtQYYXTu7p27Bl0Gizz1sW2eSgxFU8eTh+ucfVwOXKAXKU5SeI
          56 +MbuBfUYQ4if2N/BXn47+/ecf3A4KgB37Le5SbLDddwCNxGlBzbpBa0=
          57 -----END RSA PRIVATE KEY-----"""
          58 
          59 private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
          60 transport = paramiko.Transport(('10.0.1.40', 22))
          61 transport.connect(username='wupeiqi', pkey=private_key)
          62 ssh = paramiko.SSHClient()
          63 ssh._transport = transport
          64 stdin, stdout, stderr = ssh.exec_command('df')
          65 result = stdout.read()
          66 transport.close()
          67 print(result)
      • SFTPClient #用於鏈接遠程服務器並執行上傳下載
        • 基於用戶名和密碼上傳下載
          1 import paramiko
          2 transport = paramiko.Transport(('hostname',22))
          3 transport.connect(username='wupeiqi',password='123')
          4 sftp = paramiko.SFTPClient.from_transport(transport)
          5 # 將location.py 上傳至服務器 /tmp/test.py
          6 sftp.put('/tmp/location.py', '/tmp/test.py')
          7 # 將remove_path 下載到本地 local_path
          8 sftp.get('remove_path', 'local_path')
          9 transport.close()
        • 基於公鑰密鑰上傳下載
           1 import paramiko
           2 private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')
           3 transport = paramiko.Transport(('hostname', 22))
           4 transport.connect(username='wupeiqi', pkey=private_key )
           5 sftp = paramiko.SFTPClient.from_transport(transport)
           6 # 將location.py 上傳至服務器 /tmp/test.py
           7 sftp.put('/tmp/location.py', '/tmp/test.py')
           8 # 將remove_path 下載到本地 local_path
           9 sftp.get('remove_path', 'local_path')
          10 transport.close()
  • 進程與線程
    線程是操做系統可以進行運算調度的最小單位,他被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。
    • 進程與線程的關係
      • 一個線程只能屬於一個進程,而一個進程能夠有多個線程,但至少有一個線程
      • 資源分配給進程,同一進程的全部線程共享該進程的全部資源
      • 線程在執行過程當中,須要協做同步。不一樣進程的線程間要利用消息通訊的辦法實現同步
      • 處理機分給線程,即真正在處理機上運行的是線程
      • 線程是指進程內的一個執行單元,也是進程內的可調度實體
      • 二者都可併發執行
    • 進程與線程的區別
      • 調度:線程做爲調度和分配的基本單位,進程做爲擁有資源的基本單位。線程是處理器調度的基本單位,可是進程不是
      • 併發性:不只進程之間能夠併發執行,同一個進程的多個線程之間也能夠併發執行
      • 擁有資源:同一進程內的線程共享本進程的資源如內存、I/O、cpu等,可是進程之間的資源是獨立的,進程是擁有資源的一個獨立單位,線程不擁有系統資源,但能夠訪問隸屬於進程的資源
      • 地址空間:同一進程的線程共享本進程的地址空間,而進程之間則是獨立的地址空間
      • 系統開銷:在建立或撤銷進程的時候,因爲系統都要爲之分配和回收資源,致使系統的明顯大於建立或撤銷線程時的開銷。但進程有獨立的地址空間,進程崩潰後,在保護模式下不會對其餘的進程產生影響,而線程只是一個進程中的不一樣的執行路徑。線程有本身的堆棧和局部變量,但線程之間沒有單獨的地址空間,一個線程死掉就等於整個進程死掉,因此多進程的程序要比多線程的程序健壯,可是在進程切換時,耗費的資源較大,效率要差些。若是要求同時進行而且又要共享某些變量的併發操做,只能用線程不能用進程
      • 執行過程:每一個獨立的進程程有一個程序運行的入口、順序執行序列和程序入口。可是線程不能獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
    • 其餘說明
      • 線程的劃分尺度小於進程,使得多線程程序的併發性高。進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大的提升了程序運行效率。線程在執行過程當中,每一個獨立的線程有一個程序運行的入口,順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,有應用程序提供多個線程執行控制。多線程的意義在於一個應用程序中,有多個執行部分能夠同時執行。但操做系統並無將多個線程看作多個獨立的應用,來實現進程的調度和管理以及資源分配。
      • 線程執行開銷小,可是不利於資源的管理和保護,線程適合在SMP機器(雙CPU系統)上運行。進程執行開銷大,可是可以很好的進行資源管理和保護,進程能夠跨機器前移。
      • 什麼時候使用多進程,什麼時候使用多線程?對資源的管理和保護要求高,不限制開銷和效率時,使用多進程。要求效率高,頻繁切換時,資源的保護管理要求不是很高時,使用多線程。
    • 直接調用:
      同時啓動50個線程,區分子線程和主線程
      守護線程即不重要的線程,主線程不等待守護線程,直接執行下面的線程。
       1 import threading
       2 import time
       3 
       4 def run(n):
       5     print("task ",n )
       6     time.sleep(2)
       7     print("task done",n,,threading.current_thread())
       8 
       9 start_time = time.time()
      10 t_objs = [] #存線程實例
      11 for i in range(50):
      12     t = threading.Thread(target=run,args=("t-%s" %i ,))
      t.setDaemon(True) #把當前線程設置爲守護線程,必須在線程start以前設置
      13 t.start() 14 t_objs.append(t) #爲了避免阻塞後面線程的啓動,不在這裏join,先放到一個列表裏 15 16 for t in t_objs: #循環線程實例列表,等待全部線程執行完畢 17 t.join() 18 19 print("----------all threads has finished...",threading.current_thread(),threading.active_count()) 20 print("cost:",time.time() - start_time)
    • 繼承式調用
       1 import threading
       2 import time
       3 
       4 class MyThread(threading.Thread):
       5     def __init__(self,n,sleep_time):
       6         super(MyThread,self).__init__()
       7         self.n =  n
       8         self.sleep_time = sleep_time
       9     def run(self):
      10         print("runnint task ",self.n )
      11         time.sleep(self.sleep_time)
      12         print("task done,",self.n )
      13 
      14 
      15 t1 = MyThread("t1",2)
      16 t2 = MyThread("t2",4)
      17 
      18 t1.start()
      19 t2.start()
      20 
      21 t1.join() #=wait()
      22 t2.join()
      23 
      24 print("main thread....")
    • 線程鎖Mutex
      • 互斥鎖Lock:修改全局變量或者操做同一內存時只有一個線程操做,即同一時間只容許一個線程修改數據
         1 import threading
         2 import time
         3 
         4 def run(n):
         5     lock.acquire() #修改數據前加鎖
         6     global  num
         7     num +=1
         8     time.sleep(1)
         9     lock.release() #修改後釋放
        10 
        11 
        12 lock = threading.Lock() #生成全局鎖(互斥鎖)
        13 num = 0 #設定一個共享變量
        14 t_objs = [] #存線程實例
        15 for i in range(50):
        16     t = threading.Thread(target=run,args=("t-%s" %i ,))
        17     t.start()
        18     t_objs.append(t) #爲了避免阻塞後面線程的啓動,不在這裏join,先放到一個列表裏
        19 
        20 for t in t_objs: #循環線程實例列表,等待全部線程執行完畢
        21     t.join()
        22 
        23 print("----------all threads has finished...",threading.current_thread(),threading.active_count())
        24 
        25 print("num:",num)
      • 遞歸鎖RLock:在一個大鎖中包含子鎖
         1 import threading, time
         2 
         3 def run1():
         4     print("grab the first part data")
         5     lock.acquire()
         6     global num
         7     num += 1
         8     lock.release()
         9     return num
        10 
        11 def run2():
        12     print("grab the second part data")
        13     lock.acquire()
        14     global num2
        15     num2 += 1
        16     lock.release()
        17     return num2
        18 
        19 def run3():
        20     lock.acquire()
        21     res = run1()
        22     print('--------between run1 and run2-----')
        23     res2 = run2()
        24     lock.release()
        25     print(res, res2)
        26 
        27 num, num2 = 0, 0
        28 lock = threading.RLock() #遞歸鎖
        29 for i in range(10):
        30     t = threading.Thread(target=run3)
        31     t.start()
        32 
        33 while threading.active_count() != 1:
        34     print(threading.active_count())
        35 else:
        36     print('----all threads done---')
        37     print(num, num2)
      • 信號量Semaphore:互斥鎖同一時間只容許一個線程修改數據,而信號量是同時容許必定數量的線程更改數據。
         1 import threading, time
         2 
         3 def run(n):
         4     semaphore.acquire()
         5     time.sleep(1)
         6     print("run the thread: %s\n" % n)
         7     semaphore.release()
         8 
         9 if __name__ == '__main__':
        10     semaphore = threading.BoundedSemaphore(5)  # 最多容許5個線程同時運行
        11     for i in range(22):
        12         t = threading.Thread(target=run, args=(i,))
        13         t.start()
        14 while threading.active_count() != 1:
        15     pass
        16 else:
        17     print('----all threads done---')

         

  • 事件Events
    同進程的同樣,線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象,而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行 。
    • event.isSet():返回event的狀態值。獲取內置標誌狀態,返回True或False
    • event.wait(timeout):若是 event.isSet()==False將阻塞線程。若是標誌爲True將當即返回,不然阻塞線程至等待阻塞狀態,等待其餘線程調用set()
    • event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度。將標誌設爲True,並通知全部處於等待阻塞狀態的線程恢復運行狀態
    • event.clear():恢復event的狀態值爲False。將標誌設爲False
       1 import time
       2 import threading
       3 
       4 event = threading.Event()
       5 
       6 def lighter():
       7     count = 0
       8     event.set() #先設置綠燈
       9     while True:
      10         if count >5 and count < 10: #改爲紅燈
      11             event.clear() #把標誌位清了
      12             print("red light is on....")
      13         elif count >10:
      14             event.set() #變綠燈
      15             count = 0
      16         else:
      17             print("green light is on....")
      18         time.sleep(1)
      19         count +=1
      20         print(count)
      21 def car(name):
      22     while True:
      23         if event.is_set(): #表明綠燈
      24             print("[%s] running..."% name )
      25             time.sleep(1)
      26         else:
      27             print("[%s] sees red light , waiting...." %name)
      28             event.wait()
      29             print("[%s] green light is on, start going..." %name)
      30 
      31 
      32 light = threading.Thread(target=lighter,)
      33 light.start()
      34 
      35 car1 = threading.Thread(target=car,args=("Tesla",))
      36 car1.start()
  • 隊列
    • 做用
      解耦:使程序直接實現鬆耦合,修改一個函數,不會有串聯關係
      提升處理效率:FIFO(現進先出),LIFO(後入先出)
    • 隊列
      隊列能夠併發的派多個線程,對排列的線程處理,並切每一個須要處理線程只須要將請求的數據放入隊列容器的內存中,線程不須要等待,當排列完畢處理完數據後,線程在準時來取數據便可。請求數據的線程只與這個隊列容器存在關係,處理數據的線程down掉不會影響到請求數據的線程,隊列會派給其餘線程處理這分數據,它實現瞭解耦,提升效率。隊列內會有一個有順序的容器,列表與這個容器是有區別的,列表中數據雖然是排列的,但數據被取走後還會保留,而隊列中這個容器的數據被取後將不會保留。當必須在多個線程之間安全地交換信息時,隊列在線程編程中特別有用。
    • 四種類型的隊例
      • Queue:先進先出隊列
         1 from queue import Queue #LILO隊列
         2 #基本FIFO隊列  先進先出 FIFO即First in First Out,先進先出
         3 #maxsize設置隊列中,數據上限,小於或等於0則不限制,容器中大於這個數則阻塞,直到隊列中的數據被消掉
         4 q = Queue(maxsize=0)
         5 #寫入隊列數據
         6 q.put(0)
         7 q.put(1)
         8 q.put(2)
         9 #返回隊列大小
        10 print(q.qsize())
        11 #若是隊列爲空,返回True
        12 print(q.empty())
        13 #若是隊列滿了,返回True,大小與maxsize的值對應
        14 print(q.full())
        15 #不等待,直接獲取數據,如隊列爲空,則不等待隊列放入信息後取出數據,直接報錯
        16 print(q.get_nowait())
        17 #不等待,直接寫入數據,如隊列已滿,則不等待隊列信息取出後再放入,直接報錯
        18 q.put_nowait(3)
        19 #輸出當前隊列全部數據
        20 print(q.queue)
        21 #刪除隊列數據,並返回該數據
        22 q.get()
        23 #輸也全部隊列數據
        24 print(q.queue)
      • LifoOueue:後進先出隊列
         1 from queue import LifoQueue #LIFO隊列
         2 #LIFO即Last in First Out,後進先出。與棧的相似,使用也很簡單,maxsize用法同上
         3 lq = LifoQueue(maxsize=0)
         4 #隊列寫入數據
         5 lq.put(0)
         6 lq.put(1)
         7 lq.put(2)
         8 #輸出隊列全部數據
         9 print(lq.queue)
        10 #刪除隊尾數據,並返回該數據
        11 lq.get()
        12 #輸出隊列全部數據
        13 print(lq.queue)
      • PriorityQueue:優先隊列
         1 from queue import PriorityQueue #優先隊列
         2 # 存儲數據時可設置優先級的隊列,優先級設置數越小等級越高
         3 pq = PriorityQueue(maxsize=0)
         4 #寫入隊列,設置優先級
         5 pq.put((9,'a'))
         6 pq.put((7,'c'))
         7 pq.put((1,'d'))
         8 #輸出隊例所有數據
         9 print(pq.queue)
        10 #取隊例數據,能夠看到,是按優先級取的。
        11 pq.get()
        12 pq.get()
        13 print(pq.queue)
      • deque:雙端隊列
         1 from collections import deque   #雙端隊列
         2 #雙邊隊列
         3 dq = deque(['a','b'])
         4 #增長數據到隊尾
         5 dq.append('c')
         6 #增長一組數據到隊尾
         7 dq.extend(['d','e'])
         8 #增長數據到隊頭
         9 dq.appendleft('f')
        10 #增長一組數據到隊頭
        11 dq.extendleft(['g','h'])
        12 #指定位置插入
        13 dq.insert(2,'i')
        14 #輸出隊列全部數據
        15 print(dq)
        16 #移除隊尾並返回
        17 print(dq.pop())
        18 #移除隊頭並返回
        19 print(dq.popleft())
        20 #循環右移2次並返回
        21 print(dq.rotate(2))
        22 #循環左移3次並返回
        23 print(dq.rotate(-3))
相關文章
相關標籤/搜索