Python成長之路 paramiko與線程

python3的paramiko模塊

介紹

paramiko是一個用於遠程控制的模塊,使用該模塊對遠程服務器進行命令或者文件操做python

模塊的安裝

paramiko模塊咱們能夠經過pip3 install paramiko的方式來安裝,不過我是用的是編譯器pycharm內置的功能來安裝這個模塊的,有時候安裝有可能出現錯誤,不過強大的同行會告訴你如何解決和安裝的,這裏我就不說了編程

paramiko模塊的使用

遠程鏈接分爲兩種:服務器

  • 基於用戶名和密碼的鏈接
  • 基於公鑰私鑰的鏈接

使用paramiko遠程操做的方式有兩種多線程

  • 使用SSHClient
  • 本身建立一個 transport(一般使用它來作sftp)

SSH基於用戶名和密碼的鏈接

#!/usr/bin/env python
#-*-coding:utf-8-*-

import paramiko
# 實例化SSH(建立SSH對象)
ssh = paramiko.SSHClient()
# 容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname='192.168.132.66', port=22, username='yang', password='redhat')
# 執行命令
stdin, stdout, stderr = ssh.exec_command("ls")
# 獲取命令的結果
result = stdout.read().decode("utf-8")  # 接收的結果是bytes類型
err = stderr.read().decode("utf-8")
# 判斷stderr輸出是否爲空,爲空則打印執行結果,不爲空打印報錯信息
if not err:
    print("result:", result)
else:
    print("err", err)
ssh.close()

  

使用paramiko模塊中的transport建立sftp

#!/usr/bin/env python
#-*-coding:utf-8-*-

import paramiko
# 實例化SSH(建立SSH對象)
transport = paramiko.Transport('192.168.1132.66', 22)
transport.connect(username='yang', password='redhat')
sftp = paramiko.SFTPClient.from_transport(transport)
# 將F:\python\aaa.txt 上傳至服務器 /tmp目錄下並命名爲test.py
sftp.put(r'F:\python\aaa.txt', '/tmp/test.txt')
# 將'/tmp/qw.txt'下載到本地 F:\python目錄下並命名test.txt
sftp.get('/tmp/qw.txt', r'F:\python\test.txt')
transport.close()

  

SSH使用祕鑰登陸

#!/usr/bin/env python
#-*-coding:utf-8-*-

import paramiko
private_key = paramiko.RSAKey.from_private_key_file('id_rsa31.txt')
#
# 建立SSH對象
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 鏈接服務器
ssh.connect(hostname='192.168.132.88', port=22, username='rhce', pkey=private_key)
stdin, stdout, stderr = ssh.exec_command('df')
res_out = stdout.read()
print(res_out.decode())
ssh.close()

  關於上面的祕鑰問題咱們先要生成一對公鑰和私鑰,把公鑰放到咱們要登陸的服務器的指定目錄上,上面的id_rsa31.txt就是私鑰。關於公鑰和私鑰是怎麼生成的、放到什麼位置這個能夠百度一下上面會告訴你怎麼作的,這裏我就不詳細做說了。併發

 

sftp使用祕鑰登陸

#!/usr/bin/env python
#-*-coding:utf-8-*-

import paramiko
# 實例化SSH(建立SSH對象)
private_key = paramiko.RSAKey.from_private_key('id_rsa31.txt')
transport = paramiko.Transport('192.168.1132.66', 22)
transport.connect(username='yang', pkey=private_key)
sftp = paramiko.SFTPClient.from_transport(transport)
# 將F:\python\aaa.txt 上傳至服務器 /tmp目錄下並命名爲test.py
sftp.put(r'F:\python\aaa.txt', '/tmp/test.txt')
# 將'/tmp/qw.txt'下載到本地 F:\python目錄下並命名test.txt
sftp.get('/tmp/qw.txt', r'F:\python\test.txt')
transport.close()

  上面我只是簡單的說了下關於paramiko的遠程鏈接和sftp的實現,要是看的不是很明白的的能夠本身在網上搜一搜哪裏不懂,對照這看看就好了,這個我的感受不是很難。難點就是祕鑰哪裏比較難點,主要是之前沒有了解過祕鑰登陸方面因此我本身感受相比較其它的是比較難的。若是你用過祕鑰的話那就不是那麼難了,其它的都還好。app

 

線程

再說線程以前咱們說說什麼叫作進程。dom

程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。ssh

在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行。這是這樣的設計,大大提升了CPU的利用率。進程的出現讓每一個用戶感受到本身獨享CPU,所以,進程就是爲了在CPU上實現多道編程而提出的函數

然而線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。咱們能夠認爲進程是資源的集合,而線程是操做系統最小的調度單位。線程與進程運行的速度是無法比較的,由於進程自己不可以運行,是經過操做線程來實現的,因此有可比性。測試

在python3中實現多線程支持的是threading模塊,使用這個模塊能夠建立多個線程程序,而且在多線程之間進行同步和通信。

python3中建立線程的兩個方法:

  • 經過threading.Thread直接在行程中運行函數
  • 經過繼承threading.Tread類來建立線程

方法一:直接經過threading.Tread

#!/usr/bin/env python
# -*-coding-*-
import threading
import time


def run(test):
    print('This is {}'.format(test))
    time.sleep(2)

if __name__ == "__main__":
    t1 = threading.Thread(target=run, args=("test1" ,))  # 建立一個t1線程,用來執行run()
    t2 = threading.Thread(target=run, args=("test2" ,))  # 建立一個t2線程,用來執行run()
    t1.start()  # 運行t1線程
    t2.start()  # 運行t2線程
    print('全部而測試完畢') 

  

方法二:經過繼承threading.Thread類來建立線程

#!/usr/bin/env python
# -*-coding-*-
import threading
import time


class Mythread(threading.Thread):
    def __init__(self,name):
        super(Mythread, self).__init__()
        self.name = name

    def run(self):
        print("This is {}".format(self.name))
        time.sleep(2)

if __name__ == "__main__":
    t1 = Mythread("test1")  
    t2 = Mythread("test2")

    t1.start()
    t2.start()
    print("全部測試運行完畢")

 下面咱們來看一個例子

#!/usr/bin/env python
# -*-coding-*-

import threading, time


def run(s):
    print("test {}".format(s))
    time.sleep(2)
    print("----done----")

if __name__ == "__main__":
    start_time = time.time()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
    print("time", time.time() - start_time)  

# 運行結果 test 0 test 1 test 2 test 3 test 4 test 5 test 6 test 7 test 8 test 9 time 0.0020012855529785156 ----done---- ----done---- ----done---- ----done---- ----done---- ----done---- ----done---- ----done---- ----done---- ----done----

  

從上面的看出主線程結束時子線程尚未結束,在主線程啓動了子線程以後,子線程和主線程沒有什麼關係它們相互獨立的,是並行的。

現實中有時候必需要等某一個線程結束了在開始其它的線程,因此在這裏咱們引進了join()的方法。

join()方法

join()方法是阻塞程序知道某一線程執行完畢纔開始繼續的執行程序。

#!/usr/bin/env python
# -*-coding-*-

import threading, time


def run(s):
    print("test {}".format(s))
    time.sleep(2)
    print("----done----")

if __name__ == "__main__":
    t1 = threading.Thread(target=run, args=('test1',))
    t2 = threading.Thread(target=run, args=("test2",))
    t1.start()
    # t1.join()
    t2.start()
    print("測試完畢")

結果爲
test test1
test test2
測試完畢
----done----
----done----

當咱們添加t1.join()時結果爲
test test1
----done----
test test2
測試完畢
----done----

上面能夠看出當咱們添加join()方法時只有等t1線程執行結束時纔會繼續執行下面的程序。
#!/usr/bin/env python
# -*-coding-*-

import threading, time


def run(s, t):
    print("test {}".format(s))
    time.sleep(t)
    print("----done----")

if __name__ == "__main__":
    t1 = threading.Thread(target=run, args=('test1', 2))
    t2 = threading.Thread(target=run, args=("test2", 4))
    t1.start()
    t2.start()
    t1.join()
    print("測試完畢")

#結果爲
test test1
test test2
----done----
測試完畢
----done----

  

上面2個例子咱們能夠知道當咱們在某個位置爲某個線程使用join()時程序走到join()時程序就會阻塞,只有等join()的那個線程結束時,纔回執行join()下面的程序。不會等其它的線程結束,因此要想等其它的線程結束那麼就要給其它的線程也使用join()方法。

有的時候在一個循環裏面啓動線程,又要等全部的線程執行結束那咱們能夠利用

#!/usr/bin/env python
# -*-coding-*-

import threading, time


def run(s):
    print("test {}".format(s))
    time.sleep(2)
    print("----done----")

if __name__ == "__main__":
    s = []
    for i in range(10):
        t = threading.Thread(target=run, args=("test {}".format(i),))
        t.start()
        s.append(t)
    for n in s:
        n.join()

    print("測試完畢")

  上面咱們能夠在實驗中發現咱們的主線程結束後子線程若是沒有結束那麼程序會等到子線程結束後,程序纔會結束。若是咱們把子線程變爲主線程的守護線程的話,那麼不過子線程有沒有結束當咱們的主線程結束時,程序就會結束。

#!/usr/bin/env python
# -*-coding-*-

import threading, time


def run(s):
    print("test {}".format(s))
    time.sleep(2)
    print("----done----")

if __name__ == "__main__":
    s = []
    for i in range(10):
        t = threading.Thread(target=run, args=("test {}".format(i),))
        t.setDaemon(True)  # 把子線程變爲主線程的守護線程
     #設置爲守護進程必定要在start以前 t.start() s.append(t) print("測試完畢")

  

線程鎖

當一個進程擁有多個線程以後,若是他們各作各的任務互沒有關係還行,但既然屬於同一個進程,他們之間老是具備必定關係的。好比多個線程都要對某個數據進行修改,則可能會出現不可預料的結果。爲保證操做正確,就須要引入鎖來進行線程間的同步。

python3中給出了threading模塊中的RLock鎖(能夠叫作遞歸鎖下面是一個例子)

#!/usr/bin/env python
# -*-coding:utf-8-*-

import threading
import time


class Mythread(threading.Thread):
    def run(self):
        global num  # 聲明一個全局變量
        lock.acquire()  # 上鎖,acquire()和release()之間的語句一次只能有一個線程進入,其他線程在acquire()處等待
        num += 5
        print('%s:%d' % (self.name, num))
        lock.release()  # 解鎖





if __name__ == '__main__':
    num = 0
    lock = threading.RLock()  # 建立 可重入鎖
    l = []
    for i in range(5):
        t = Mythread() 
        l.append(t)  # 建立 5 個線程,並把他們放到一個列表中
    for i in l:
        i.start()  # 開啓列表中的全部線程

semaphore(信號量)

#!/usr/bin/env python
# -*-coding:utf-8-*-
import threading
import time


def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("This is {}".format(n))
    semaphore.release()


if __name__ == "__main__":
    semaphore = threading.BoundedSemaphore(5)  # 最多一次同時運行5個線程
    for i in range(30):
        t = threading.Thread(target=run, args=(i,))
        t.start()
        

while threading.active_count() != 1:
    pass
else:
    print("運行完畢")
這個執行的時候你會看到五個打印運行run()的結果這個表示一次性最多運行五個線程

  

isAlive()方法

這個是用於當前的線程有沒有在運行,有的話返回True沒有的話返回Flash

#!/usr/bin/env python
# -*-coding:utf-8-*-
import threading
import time


def run():
    time.sleep(1)

t = threading.Thread(target=run)
print(t.isAlive())  # 結果爲Flash 由於線程t還沒開始
t.start()
print(t.isAlive())  # 結果爲True 由於線程t正在運行
time.sleep(3)
print(t.isAlive())  # 結果爲Flash 由於線程t結束運行

  

現成的同步(Event)

經過Event來實現兩個或多個線程之間的交互。

下面是一個紅綠燈的例子

import threading,time
import random


def light():
    if not event.isSet():
        event.set()
    count = 0
    while True:
        if count > 8 and count < 13:
            if event.isSet():
                event.clear()  # 修改標誌清空變爲紅燈
            print("\033[41;1m**紅燈**\033[0m")
        else:
            print("\033[42;1m**綠燈**\033[0m")
            if count > 12:
                count = 0
                event.set()  # 變爲綠燈
        time.sleep(1)
        count += 1


def car():
    while 1:
        time.sleep(random.randrange(10))  # 隨機多少秒來一輛車
        if event.isSet():
            print("汽車開始啓動經過馬路")
        else:
            print("汽車開始等待紅燈")

if __name__ == "__main__":
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    Car = threading.Thread(target=car)
    Car.start()

  

queue隊列

在這裏我只是簡單的說一下隊列的使用的分類。

隊列的種類:

  • 先進先出 (queue.Queue())
  • 後進先出 (queue.LifoQueue())
  • 根據優先級判斷誰先出來 (queue.PriorityQueue())

queue的一些經常使用的方法:

empty :若是隊列爲空則返回True

full:若是隊列滿了,返回True

put : 方靜一個元素到隊列中

put_nowait:當即放入一個元素,不等待

get:取出一個元素從隊列中

get_nowait: 當即取出一個元素,不等待

join: 阻塞調用線程,直到隊列中的全部任務被處理掉

qsize: 返回隊列中元素的個數

task_done : 在完成一項任務以後,向任務以完成的隊列發送一個信號。

下面是關於隊列的簡單示例:

import queue

q = queue.Queue(maxsize=10)  # 先進先出
print(q.empty())  # 判斷隊列是否爲空
for i in range(10):
    q.put("第{}位進來".format(i))
print(q.full())  # 判斷隊列是否滿了
for i in range(20):
    print(q.get())
    if q.empty():
        break

q = queue.LifoQueue(maxsize=10)  # 後進先出
print(q.empty())  # 判斷隊列是否爲空
for i in range(10):
    q.put("第{}位進來".format(i))
print(q.full())  # 判斷隊列是否滿了
for i in range(20):
    print(q.get())
    if q.empty():
        break

q = queue.PriorityQueue()  # 優先級
q.put((1, 'qqq'))
q.put((7, 'www'))
q.put((3, 'eee'))


for i in range(q.qsize()):
    print(q.get())

  

消費者生產者模型

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

    什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

下面的例子基於隊列實現生產者消費者模型,實現的作包子和吃包子的事情

#!/usr/bi n/env python
# -*-coding:utf-8-*-
import threading,queue,time

q = queue.Queue(maxsize=10)


def producer(content):
    i = 1
    while True:
        q.put("麪點師傅作了{}餡的包子".format(content))
        print("作了{}包子".format(i))
        time.sleep(3)
        i += 1


def consumer(name):
    while q.qsize() >= 0:
        print("{}拿了{}餡的包子吃了".format(name, q.get()))
        time.sleep(1)

p = threading.Thread(target=producer, args=('豬肉',))
p.start()

c1 = threading.Thread(target=consumer, args=("小張",))
c2 = threading.Thread(target=consumer, args=("小李",))
c1.start()
c2.start()
相關文章
相關標籤/搜索