python走起之第九話

Paramiko模塊 

SSHClienthtml

用於鏈接遠程服務器並執行基本命令python

基於用戶名密碼鏈接:編程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import  paramiko
  
# 建立SSH對象
ssh  =  paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname = 'c1.salt.com' , port = 22 , username = 'wupeiqi' , password = '123' )
  
# 執行命令
stdin, stdout, stderr  =  ssh.exec_command( 'df' )
# 獲取命令結果
result  =  stdout.read()
  
# 關閉鏈接
ssh.close()
複製代碼
import paramiko

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', password='123')

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')
print stdout.read()

transport.close()
複製代碼

基於公鑰密鑰鏈接:服務器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import  paramiko
 
private_key  =  paramiko.RSAKey.from_private_key_file( '/home/auto/.ssh/id_rsa' )
 
# 建立SSH對象
ssh  =  paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname = 'c1.salt.com' , port = 22 , username = 'wupeiqi' , key = private_key)
 
# 執行命令
stdin, stdout, stderr  =  ssh.exec_command( 'df' )
# 獲取命令結果
result  =  stdout.read()
 
# 關閉鏈接
ssh.close()
複製代碼
import paramiko

private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa')

transport = paramiko.Transport(('hostname', 22))
transport.connect(username='wupeiqi', pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command('df')

transport.close()
複製代碼

SFTPClient多線程

用於鏈接遠程服務器並執行上傳下載併發

基於用戶名密碼上傳下載app

1
2
3
4
5
6
7
8
9
10
11
12
import  paramiko
 
transport  =  paramiko.Transport(( 'hostname' , 22 ))
transport.connect(username = 'wupeiqi' ,password = '123' )
 
sftp  =  paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put( '/tmp/location.py' '/tmp/test.py' )
# 將remove_path 下載到本地 local_path
sftp.get( 'remove_path' 'local_path' )
 
transport.close()

基於公鑰密鑰上傳下載less

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import  paramiko
 
private_key  =  paramiko.RSAKey.from_private_key_file( '/home/auto/.ssh/id_rsa' )
 
transport  =  paramiko.Transport(( 'hostname' 22 ))
transport.connect(username = 'wupeiqi' , pkey = private_key )
 
sftp  =  paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put( '/tmp/location.py' '/tmp/test.py' )
# 將remove_path 下載到本地 local_path
sftp.get( 'remove_path' 'local_path' )
 
transport.close()

 

進程與線程

什麼是線程(thread)?

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務dom

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.ssh

Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

什麼是進程(process)?

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

進程與線程的區別?

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

Python GIL(Global Interpreter Lock)  

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行,擦。。。,那這還叫什麼多線程呀?莫如此早的下結結論,聽我現場講。

首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL

這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf 

 

Python threading模塊

線程有2種調用方式,以下:

直接調用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import  threading
import  time
 
def  sayhi(num):  #定義每一個線程要運行的函數
 
     print ( "running on number:%s"  % num)
 
     time.sleep( 3 )
 
if  __name__  = =  '__main__' :
 
     t1  =  threading.Thread(target = sayhi,args = ( 1 ,))  #生成一個線程實例
     t2  =  threading.Thread(target = sayhi,args = ( 2 ,))  #生成另外一個線程實例
 
     t1.start()  #啓動線程
     t2.start()  #啓動另外一個線程
 
     print (t1.getName())  #獲取線程名
     print (t2.getName())

繼承式調用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  threading
import  time
 
 
class  MyThread(threading.Thread):
     def  __init__( self ,num):
         threading.Thread.__init__( self )
         self .num  =  num
 
     def  run( self ): #定義每一個線程要運行的函數
 
         print ( "running on number:%s"  % self .num)
 
         time.sleep( 3 )
 
if  __name__  = =  '__main__' :
 
     t1  =  MyThread( 1 )
     t2  =  MyThread( 2 )
     t1.start()
     t2.start()

Join & Daemon

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#_*_coding:utf-8_*_
__author__  =  'Alex Li'
 
import  time
import  threading
 
 
def  run(n):
 
     print ( '[%s]------running----\n'  %  n)
     time.sleep( 2 )
     print ( '--done--' )
 
def  main():
     for  in  range ( 5 ):
         =  threading.Thread(target = run,args = [i,])
         t.start()
         t.join( 1 )
         print ( 'starting thread' , t.getName())
 
 
=  threading.Thread(target = main,args = [])
m.setDaemon( True #將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,不論是否執行完任務
m.start()
m.join(timeout = 2 )
print ( "---main thread done----" )

  

Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an .Event

  

 

 

線程鎖(互斥鎖Mutex)

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  time
import  threading
 
def  addNum():
     global  num  #在每一個線程中都獲取這個全局變量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     num   - = 1  #對此公共變量進行-1操做
 
num  =  100   #設定一個共享變量
thread_list  =  []
for  in  range ( 100 ):
     =  threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
 
for  in  thread_list:  #等待全部線程執行完畢
     t.join()
 
 
print ( 'final num:' , num )

正常來說,這個num結果應該是0, 但在python 2.7上多運行幾回,會發現,最後打印出來的num結果不老是0,爲何每次運行的結果不同呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。 

*注:不要在3.x上運行,不知爲何,3.x上的結果老是正確的,多是自動加了鎖

加鎖版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import  time
import  threading
 
def  addNum():
     global  num  #在每一個線程中都獲取這個全局變量
     print ( '--get num:' ,num )
     time.sleep( 1 )
     lock.acquire()  #修改數據前加鎖
     num   - = 1  #對此公共變量進行-1操做
     lock.release()  #修改後釋放
 
num  =  100   #設定一個共享變量
thread_list  =  []
lock  =  threading.Lock()  #生成全局鎖
for  in  range ( 100 ):
     =  threading.Thread(target = addNum)
     t.start()
     thread_list.append(t)
 
for  in  thread_list:  #等待全部線程執行完畢
     t.join()
 
print ( 'final num:' , num )

 

GIL VS Lock 

機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體咱們經過下圖來看一下+配合我現場講給你們,就明白了。

 

  

RLock(遞歸鎖)

說白了就是在一個大鎖中還要再包含子鎖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import  threading,time
 
def  run1():
     print ( "grab the first part data" )
     lock.acquire()
     global  num
     num  + = 1
     lock.release()
     return  num
def  run2():
     print ( "grab the second part data" )
     lock.acquire()
     global   num2
     num2 + = 1
     lock.release()
     return  num2
def  run3():
     lock.acquire()
     res  =  run1()
     print ( '--------between run1 and run2-----' )
     res2  =  run2()
     lock.release()
     print (res,res2)
 
 
if  __name__  = =  '__main__' :
 
     num,num2  =  0 , 0
     lock  =  threading.RLock()
     for  in  range ( 10 ):
         =  threading.Thread(target = run3)
         t.start()
 
while  threading.active_count() ! =  1 :
     print (threading.active_count())
else :
     print ( '----all threads done---' )
     print (num,num2)

  

Semaphore(信號量)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  threading,time
 
def  run(n):
     semaphore.acquire()
     time.sleep( 1 )
     print ( "run the thread: %s\n"  % n)
     semaphore.release()
 
if  __name__  = =  '__main__' :
 
     num =  0
     semaphore   =  threading.BoundedSemaphore( 5 #最多容許5個線程同時運行
     for  in  range ( 20 ):
         =  threading.Thread(target = run,args = (i,))
         t.start()
 
while  threading.active_count() ! =  1 :
     pass  #print threading.active_count()
else :
     print ( '----all threads done---' )
     print (num)

 

Events

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

經過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import  threading,time
import  random
def  light():
     if  not  event.isSet():
         event. set ()  #wait就不阻塞 #綠燈狀態
     count  =  0
     while  True :
         if  count <  10 :
             print ( '\033[42;1m--green light on---\033[0m' )
         elif  count < 13 :
             print ( '\033[43;1m--yellow light on---\033[0m' )
         elif  count < 20 :
             if  event.isSet():
                 event.clear()
             print ( '\033[41;1m--red light on---\033[0m' )
         else :
             count  =  0
             event. set ()  #打開綠燈
         time.sleep( 1 )
         count  + = 1
def  car(n):
     while  1 :
         time.sleep(random.randrange( 10 ))
         if   event.isSet():  #綠燈
             print ( "car [%s] is running.."  %  n)
         else :
             print ( "car [%s] is waiting for the red light.."  % n)
if  __name__  = =  '__main__' :
     event  =  threading.Event()
     Light  =  threading.Thread(target = light)
     Light.start()
     for  in  range ( 3 ):
         =  threading.Thread(target = car,args = (i,))
         t.start()

這裏還有一個event使用的例子,員工進公司門要刷卡, 咱們這裏設置一個線程是「門」, 再設置幾個線程爲「員工」,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就能夠經過。

  View Code

 

  

  

queue隊列 

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue. qsize ()
Queue. empty () #return True if empty  
Queue. full () # return True if full 
Queue. put (itemblock=Truetimeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue. put_nowait (item)

Equivalent to put(item, False).

Queue. get (block=Truetimeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue. get_nowait ()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue. task_done ()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue. join () block直到queue被消費完畢

生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

 

下面來學習一個最基本的生產者消費者模型的例子

複製代碼
 1 import threading
 2 import queue
 3 
 4 def producer():
 5     for i in range(10):
 6         q.put("骨頭 %s" % i )
 7 
 8     print("開始等待全部的骨頭被取走...")
 9     q.join()
10     print("全部的骨頭被取完了...")
11 
12 
13 def consumer(n):
14 
15     while q.qsize() >0:
16 
17         print("%s 取到" %n  , q.get())
18         q.task_done() #告知這個任務執行完了
19 
20 
21 q = queue.Queue()
22 
23 
24 
25 p = threading.Thread(target=producer,)
26 p.start()
27 
28 c1 = consumer("xxx")
相關文章
相關標籤/搜索