SSHClienthtml
用於鏈接遠程服務器並執行基本命令python
基於用戶名密碼鏈接:編程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import
paramiko
# 建立SSH對象
ssh
=
paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname
=
'c1.salt.com'
, port
=
22
, username
=
'wupeiqi'
, password
=
'123'
)
# 執行命令
stdin, stdout, stderr
=
ssh.exec_command(
'df'
)
# 獲取命令結果
result
=
stdout.read()
# 關閉鏈接
ssh.close()
|
import paramiko transport = paramiko.Transport(('hostname', 22)) transport.connect(username='wupeiqi', password='123') ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') print stdout.read() transport.close()
基於公鑰密鑰鏈接:服務器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import
paramiko
private_key
=
paramiko.RSAKey.from_private_key_file(
'/home/auto/.ssh/id_rsa'
)
# 建立SSH對象
ssh
=
paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname
=
'c1.salt.com'
, port
=
22
, username
=
'wupeiqi'
, key
=
private_key)
# 執行命令
stdin, stdout, stderr
=
ssh.exec_command(
'df'
)
# 獲取命令結果
result
=
stdout.read()
# 關閉鏈接
ssh.close()
|
import paramiko private_key = paramiko.RSAKey.from_private_key_file('/home/auto/.ssh/id_rsa') transport = paramiko.Transport(('hostname', 22)) transport.connect(username='wupeiqi', pkey=private_key) ssh = paramiko.SSHClient() ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('df') transport.close()
SFTPClient多線程
用於鏈接遠程服務器並執行上傳下載併發
基於用戶名密碼上傳下載app
1
2
3
4
5
6
7
8
9
10
11
12
|
import
paramiko
transport
=
paramiko.Transport((
'hostname'
,
22
))
transport.connect(username
=
'wupeiqi'
,password
=
'123'
)
sftp
=
paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put(
'/tmp/location.py'
,
'/tmp/test.py'
)
# 將remove_path 下載到本地 local_path
sftp.get(
'remove_path'
,
'local_path'
)
transport.close()
|
基於公鑰密鑰上傳下載less
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import
paramiko
private_key
=
paramiko.RSAKey.from_private_key_file(
'/home/auto/.ssh/id_rsa'
)
transport
=
paramiko.Transport((
'hostname'
,
22
))
transport.connect(username
=
'wupeiqi'
, pkey
=
private_key )
sftp
=
paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put(
'/tmp/location.py'
,
'/tmp/test.py'
)
# 將remove_path 下載到本地 local_path
sftp.get(
'remove_path'
,
'local_path'
)
transport.close()
|
線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務dom
A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.ssh
Suppose you're reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.
If you have a roommate, and she's using the same technique, she can take the book while you're not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.
Threads work in the same way. A CPU is giving you the illusion that it's doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.
On a more technical level, an execution context (therefore a thread) consists of the values of the CPU's registers.
Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.
Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).
An executing instance of a program is called a process.
Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
上面的核心意思就是,不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行,擦。。。,那這還叫什麼多線程呀?莫如此早的下結結論,聽我現場講。
首先須要明確的一點是GIL
並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就比如C++是一套語言(語法)標準,可是能夠用不一樣的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也同樣,一樣一段代碼能夠經過CPython,PyPy,Psyco等不一樣的Python執行環境來執行。像其中的JPython就沒有GIL。然而由於CPython是大部分環境下默認的Python執行環境。因此在不少人的概念裏CPython就是Python,也就想固然的把GIL
歸結爲Python語言的缺陷。因此這裏要先明確一點:GIL並非Python的特性,Python徹底能夠不依賴於GIL
這篇文章透徹的剖析了GIL對python多線程的影響,強烈推薦看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf
線程有2種調用方式,以下:
直接調用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import
threading
import
time
def
sayhi(num):
#定義每一個線程要運行的函數
print
(
"running on number:%s"
%
num)
time.sleep(
3
)
if
__name__
=
=
'__main__'
:
t1
=
threading.Thread(target
=
sayhi,args
=
(
1
,))
#生成一個線程實例
t2
=
threading.Thread(target
=
sayhi,args
=
(
2
,))
#生成另外一個線程實例
t1.start()
#啓動線程
t2.start()
#啓動另外一個線程
print
(t1.getName())
#獲取線程名
print
(t2.getName())
|
繼承式調用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import
threading
import
time
class
MyThread(threading.Thread):
def
__init__(
self
,num):
threading.Thread.__init__(
self
)
self
.num
=
num
def
run(
self
):
#定義每一個線程要運行的函數
print
(
"running on number:%s"
%
self
.num)
time.sleep(
3
)
if
__name__
=
=
'__main__'
:
t1
=
MyThread(
1
)
t2
=
MyThread(
2
)
t1.start()
t2.start()
|
Join & Daemon
Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it's okay to kill them off once the other, non-daemon, threads have exited.
Without daemon threads, you'd have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
#_*_coding:utf-8_*_
__author__
=
'Alex Li'
import
time
import
threading
def
run(n):
print
(
'[%s]------running----\n'
%
n)
time.sleep(
2
)
print
(
'--done--'
)
def
main():
for
i
in
range
(
5
):
t
=
threading.Thread(target
=
run,args
=
[i,])
t.start()
t.join(
1
)
print
(
'starting thread'
, t.getName())
m
=
threading.Thread(target
=
main,args
=
[])
m.setDaemon(
True
)
#將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啓動的其它子線程會同時退出,不論是否執行完任務
m.start()
m.join(timeout
=
2
)
print
(
"---main thread done----"
)
|
Note:Daemon threads are abruptly stopped at shutdown. Their resources (such as open files, database transactions, etc.) may not be released properly. If you want your threads to stop gracefully, make them non-daemonic and use a suitable signalling mechanism such as an .Event
線程鎖(互斥鎖Mutex)
一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import
time
import
threading
def
addNum():
global
num
#在每一個線程中都獲取這個全局變量
print
(
'--get num:'
,num )
time.sleep(
1
)
num
-
=
1
#對此公共變量進行-1操做
num
=
100
#設定一個共享變量
thread_list
=
[]
for
i
in
range
(
100
):
t
=
threading.Thread(target
=
addNum)
t.start()
thread_list.append(t)
for
t
in
thread_list:
#等待全部線程執行完畢
t.join()
print
(
'final num:'
, num )
|
正常來說,這個num結果應該是0, 但在python 2.7上多運行幾回,會發現,最後打印出來的num結果不老是0,爲何每次運行的結果不同呢? 哈,很簡單,假設你有A,B兩個線程,此時都 要對num 進行減1操做, 因爲2個線程是併發同時運行的,因此2個線程頗有可能同時拿走了num=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時CPU運算的結果再賦值給num變量後,結果就都是99。那怎麼辦呢? 很簡單,每一個線程在要修改公共數據時,爲了不本身在還沒改完的時候別人也來修改此數據,能夠給這個數據加一把鎖, 這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。
*注:不要在3.x上運行,不知爲何,3.x上的結果老是正確的,多是自動加了鎖
加鎖版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import
time
import
threading
def
addNum():
global
num
#在每一個線程中都獲取這個全局變量
print
(
'--get num:'
,num )
time.sleep(
1
)
lock.acquire()
#修改數據前加鎖
num
-
=
1
#對此公共變量進行-1操做
lock.release()
#修改後釋放
num
=
100
#設定一個共享變量
thread_list
=
[]
lock
=
threading.Lock()
#生成全局鎖
for
i
in
range
(
100
):
t
=
threading.Thread(target
=
addNum)
t.start()
thread_list.append(t)
for
t
in
thread_list:
#等待全部線程執行完畢
t.join()
print
(
'final num:'
, num )
|
GIL VS Lock
機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 注意啦,這裏的lock是用戶級的lock,跟那個GIL不要緊 ,具體咱們經過下圖來看一下+配合我現場講給你們,就明白了。
RLock(遞歸鎖)
說白了就是在一個大鎖中還要再包含子鎖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
import
threading,time
def
run1():
print
(
"grab the first part data"
)
lock.acquire()
global
num
num
+
=
1
lock.release()
return
num
def
run2():
print
(
"grab the second part data"
)
lock.acquire()
global
num2
num2
+
=
1
lock.release()
return
num2
def
run3():
lock.acquire()
res
=
run1()
print
(
'--------between run1 and run2-----'
)
res2
=
run2()
lock.release()
print
(res,res2)
if
__name__
=
=
'__main__'
:
num,num2
=
0
,
0
lock
=
threading.RLock()
for
i
in
range
(
10
):
t
=
threading.Thread(target
=
run3)
t.start()
while
threading.active_count() !
=
1
:
print
(threading.active_count())
else
:
print
(
'----all threads done---'
)
print
(num,num2)
|
Semaphore(信號量)
互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import
threading,time
def
run(n):
semaphore.acquire()
time.sleep(
1
)
print
(
"run the thread: %s\n"
%
n)
semaphore.release()
if
__name__
=
=
'__main__'
:
num
=
0
semaphore
=
threading.BoundedSemaphore(
5
)
#最多容許5個線程同時運行
for
i
in
range
(
20
):
t
=
threading.Thread(target
=
run,args
=
(i,))
t.start()
while
threading.active_count() !
=
1
:
pass
#print threading.active_count()
else
:
print
(
'----all threads done---'
)
print
(num)
|
Events
An event is a simple synchronization object;
the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.
event = threading.Event()
# a client thread can wait for the flag to be set
event.wait()
# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.
經過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
import
threading,time
import
random
def
light():
if
not
event.isSet():
event.
set
()
#wait就不阻塞 #綠燈狀態
count
=
0
while
True
:
if
count <
10
:
print
(
'\033[42;1m--green light on---\033[0m'
)
elif
count <
13
:
print
(
'\033[43;1m--yellow light on---\033[0m'
)
elif
count <
20
:
if
event.isSet():
event.clear()
print
(
'\033[41;1m--red light on---\033[0m'
)
else
:
count
=
0
event.
set
()
#打開綠燈
time.sleep(
1
)
count
+
=
1
def
car(n):
while
1
:
time.sleep(random.randrange(
10
))
if
event.isSet():
#綠燈
print
(
"car [%s] is running.."
%
n)
else
:
print
(
"car [%s] is waiting for the red light.."
%
n)
if
__name__
=
=
'__main__'
:
event
=
threading.Event()
Light
=
threading.Thread(target
=
light)
Light.start()
for
i
in
range
(
3
):
t
=
threading.Thread(target
=
car,args
=
(i,))
t.start()
|
這裏還有一個event使用的例子,員工進公司門要刷卡, 咱們這裏設置一個線程是「門」, 再設置幾個線程爲「員工」,員工看到門沒打開,就刷卡,刷完卡,門開了,員工就能夠經過。
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
queue.
Queue
(maxsize=0) #先入先出
queue.
LifoQueue
(maxsize=0) #last in fisrt out
queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]
). A typical pattern for entries is a tuple in the form: (priority_number, data)
.
queue.
Empty
Exception raised when non-blocking get()
(or get_nowait()
) is called on a Queue
object which is empty.
queue.
Full
Exception raised when non-blocking put()
(or put_nowait()
) is called on a Queue
object which is full.
Queue.
qsize
()
Queue.
empty
() #return True if empty
Queue.
full
() # return True if full
Queue.
put
(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full
exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full
exception (timeout is ignored in that case).
Queue.
put_nowait
(item)
Equivalent to put(item, False)
.
Queue.
get
(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty
exception (timeout is ignored in that case).
Queue.
get_nowait
()
Equivalent to get(False)
.
Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.
Queue.
task_done
()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get()
used to fetch a task, a subsequent call to task_done()
tells the queue that the processing on the task is complete.
If a join()
is currently blocking, it will resume when all items have been processed (meaning that a task_done()
call was received for every item that had been put()
into the queue).
Raises a ValueError
if called more times than there were items placed in the queue.
Queue.
join
() block直到queue被消費完畢
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
下面來學習一個最基本的生產者消費者模型的例子
1 import threading 2 import queue 3 4 def producer(): 5 for i in range(10): 6 q.put("骨頭 %s" % i ) 7 8 print("開始等待全部的骨頭被取走...") 9 q.join() 10 print("全部的骨頭被取完了...") 11 12 13 def consumer(n): 14 15 while q.qsize() >0: 16 17 print("%s 取到" %n , q.get()) 18 q.task_done() #告知這個任務執行完了 19 20 21 q = queue.Queue() 22 23 24 25 p = threading.Thread(target=producer,) 26 p.start() 27 28 c1 = consumer("xxx")