day09 threading, paramiko, queue 模塊

1 模擬sshpython

2 鎖 內部鎖,程序鎖,信號量mysql

3 多線程linux

4  簡單消息隊列sql

 

先來看模擬ssh  ,python 的強大之處就是由於有不少模塊,能夠很簡單的完成複雜的事情,今天咱們用paramiko 模塊來模擬一個ssh 的交互shell

ssh:  只可遠程執行linux 服務(或者是有ssh 服務的系統)服務器

1 先簡單執行命令測試下網絡

#!/usr/bin/env python3
# Author: Shen Yang
import paramiko
#help(paramiko)
#實例化一個客戶端
ssh = paramiko.SSHClient()
#容許未登錄過的登錄
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#鏈接遠程主機
ssh.connect(hostname='192.168.81.133',port=22,username='root',password='7758521')
#執行命令,收集三種結果,標準輸入,標準輸出,標準錯誤
stdin,stdout,stderr = ssh.exec_command('uptime')
#獲取結果,轉碼
result = stdout.read().decode()
result_err = stderr.read().decode()
#打印
print(result)
if result_err:
    print('error!',result_err)

 

是否是很激動! 確實,這樣感受太好了,有點相似於slat 的感受了!多線程

解釋一下上面的:併發

stdin,stdout,stderr = ssh.exec_command('uptime')

咱們看到三個變量,app

stdin 是獲取輸入的值,這裏並無。

stdout 是獲取標準輸出,就像咱們在shell 下執行命令獲取到的正確結果,爲何說是正確結果呢?由於

stderr 是獲取當命令執行錯誤後的結果的。

 

這裏並不支持交互性的命令好比top 什麼的,若是非用top 記得加上 -bn 1

 

scp: 能夠從linux上傳和下載  遠端服務器必須有ssh 服務
import paramiko
#創建一個通道
transport = paramiko.Transport(('192.168.81.133',22))
transport.connect(username='root',password='7758521')
#從這個通道開始傳輸文件
sftp = paramiko.SFTPClient.from_transport(transport)
# 將本地文件scp_client.py傳輸至遠程服務器/tmp/yuanduan.py
sftp.put('scp_client.py','/tmp/yuanduan.py')
#將遠端文件/tmp/yuanduan.py下載至本地/tmp/yuanduan.py
sftp.get('/tmp/yuanduan.py','/tmp/yuanduan.py')

結果:

 

上面的是使用密碼方式,那麼咱們是否可使用密鑰方式來鏈接遠端服務器呢?答案是確定的

使用密鑰鏈接ssh
#!/usr/bin/env python3
# Author: Shen Yang
import paramiko
#指定Key 文件路徑
key_file = paramiko.RSAKey.from_private_key_file('mykey')
host = '192.168.1.201'

#建立客戶端對象
ssh = paramiko.SSHClient()
#容許鏈接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#鏈接服務器
ssh.connect(hostname=host,port=22,username='root', key_filename='mykey')

#執行命令
stdin,stdout,stderr = ssh.exec_command('uptime')

#獲取命令結果
result = stdout.read()
print(result.decode())
#關閉鏈接
ssh.close()

  

好了,關於模擬ssh 的paramiko 模咱們先用到這裏,之後再用到其餘功能再細研究吧。。

 下面開始講線程和進程,比較重要!

先將線程,什麼是線程呢(thread)?

線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務

 畫個圖來表示一下:

每個程序的內存是獨立的

每個程序一個進程:

例如:  qq 要以一個總體的形式暴露給操做系統管理,裏面包含對各類資源的調用,內存的管理,網絡接口的調用。

對各類資源管理的集合就能夠稱爲 進程。

進程要操做cpu  必需要先建立一個線程。
進程是資源的集合,不能執行,要執行,必須先生成一個線程。

線程: 是操做系統最小的調度單位,是一串指令的集合。

 

一個進程建立一個線程 而後經過這個線程建立多個線程,子線程還能夠建立更多線程。
 
他們的關係是平等的   線程之間是獨立的 沒有隸屬關係,新建立的線程不會由於建立他的線程關閉而關閉。
 
 
進程和線程沒有可比性,不是一個東西。
可是:
啓動一個線程比啓動一個進程快!

 

對於一個主線程的修改可能會影響同一進程下的其餘線程。

 

 建立子進程至關於對父進程的克隆

 

多個子進程之間的數據不一樣,不是共享的。
修改進程1 的數據不會改變進程2的數據。。  

 

讓咱們實踐的操做一下,下面是一個簡單的事例,證實一下這是一個併發的操做:

#!/usr/bin/env python3
# Author: Shen Yang
import threading,time
def run(n):
    print('this is ',n)
    time.sleep(2)
#實例化兩個任務,   target 是要執行的任務, args 是傳入的參數,是一個元組,即使是一個參數也要使用,分割
t1 = threading.Thread(target=run,args=('t1',))
t2 = threading.Thread(target=run,args=('t2',))
#運行
t1.start()
t2.start()
結果:
sleep 2秒是同時sleep的

 

 

第二種使用方式, 不經常使用
經過類調用使用:
 
1 先寫一個類,繼承Thread
2 執行新類便可
#定義一個類,繼承Thread
class MyThread(threading.Thread):
    def __init__(self,n):
        super(MyThread, self).__init__()
        self.n = n
    def run(self):
        print('running task ',self.n)
        time.sleep(2)

#實例化兩個任務
t1 = MyThread('t1')
t2 = MyThread('t2')

#啓動
t1.start()
t2.start()

結果同上

 

那麼咱們是否能夠計算全部線程的結束時間嗎,能夠的:

#!/usr/bin/env python3
# Author: Shen Yang
import threading,time

#定義任務過程
def run(n):
    print('this is ',n)
    time.sleep(2)

#記住開始時間
start_time = time.time()

#設置空對象列表
obj_list = []
#生成每一個對象執行並把對象加入列表中
for i in range(50):
    t = threading.Thread(target=run,args=(i,))
    t.start()
    obj_list.append(t)
#循環列表裏的對象來等待全部對象執行完畢
for t in obj_list:
    t.join()
#等全部對象執行完畢後執行計算時間
print('cost:',time.time() - start_time)

 

打印主線程:

 

打印子線程:

 

打印活動的線程個數:

 

 

 

 守護進程:

守護進程是僕人,主進程是主人,主人退出,守護進程也要退出。

 

 socket  server  能夠設置守護線程,這樣在手動退出的時候不會等待其餘線程結束就退出。

 

 實踐:

#!/usr/bin/env python3
# Author: Shen Yang
import threading,time

end_list = []

def run(n):
    print('this is ',n)
    time.sleep(1)
    end_list.append(n)

start_time = time.time()

#設置空對象列表
obj_list = []
#生成每一個對象執行並把對象加入列表中
for i in range(5000):
    t = threading.Thread(target=run,args=(i,))
    t.setDaemon(True)
    t.start()
    obj_list.append(t)

#等全部對象執行完畢後執行計算時間
print('cost:',time.time() - start_time)
time.sleep(0.8)

#print(end_list)
print(len(end_list))

能夠看到,執行了一半就終止了,其餘的被強制退出了

 

接下來,講一下lock

不管你有多少核,python 同時只能運行一個線程,一個缺陷
 
 
同一時間,只有一個線程能夠在任意cpu核心執行,是gil 控制了這一點
 
pypy  jit  能夠提早把代碼預編譯爲機器碼,速度更快
 
jpython 也沒有這個鎖,由於是在jre環境運行的
 
 
鎖的存在會出現一個問題:
解釋器每100條指令(不是pyhton 命令,而是更底層的指令)切換一次鎖
因此當某個線程在執行大於100條數據的時候還沒等執行完就處於等待狀態了,就是一個變量還沒修改就被第二個線程掉取,致使數據出錯
 
解決辦法:
在用戶層再加一層鎖,保證同一時間修改同一個數據

 

 

若是有多層鎖,就會有概率用錯鑰匙解鎖,致使進入不能出來鎖,
因此須要用到 Rlock()

 

線程鎖(互斥鎖Mutex)

一個進程下能夠啓動多個線程,多個線程共享父進程的內存空間,也就意味着每一個線程能夠訪問同一份數據,此時,若是2個線程同時要修改同一份數據,會出現什麼情況?

 

 

其實信號量就是一個特殊的鎖,能夠容許多個線程的鎖

 

說白了就是和鎖相反:同一時間最多能夠有幾個線程修改數據

能夠控制同時又多少個線程在運行
 
涉及到鏈接池,和線程數量
能夠控制mysql 的鏈接數, 和socketserver的同一時間鏈接的client數量
#!/usr/bin/env python3
# Author: Shen Yang
import threading,time
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print('run the thread:{_n}\n'.format(_n=n))
    semaphore.release()
if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)
    for i in range(22):
        t = threading.Thread(target=run,args=(i,))
        t.start()
while threading.active_count() != 1:
    pass
else:
    print('-----all threads done-------')

  

看下效果:

 

事件:

Events

用於線程之間的數據同步,

event  就是在設置一個全局變量,只不過是再封裝了一層
另外一個線程不斷檢查這個變量,來根據這個變量進行相應的操做

 

 一個紅燈停綠燈行的例子:

實踐一下:

#!/usr/bin/env python3
# Author: Shen Yang
import threading,time

#實例化一個event
event = threading.Event()

#定義紅綠燈
def lighter():
    count = 0
    event.set() #設置標誌位
    while True:
        if count >5 and count <10:  #判斷更改標誌位
            event.clear()   #清理
            print('\033[41;1mred light is on ...\033[0m')
        elif count >10:
            event.set() #設置
            count = 0 #歸0
        else:    #其餘爲設置狀態
            print('\033[46;1mgreen light is on ...\033[0m')
        time.sleep(1)
        count += 1

#定義汽車
def car(name):
    while True:
        if event.is_set(): #有設置
            print('{_name} running ...'.format(_name = name))
            time.sleep(1)
        else:
            print('{_name} see the red light ,waiting ...'.format(_name = name))
            event.wait()
            print('\033[46;1m{_name} see green light is on,start going ...\033[0m'.format(_name = name))

#啓動燈
light = threading.Thread(target=lighter,)
light.start()

#啓動汽車
car1 = threading.Thread(target=car,args=('Tesla',))
car2 = threading.Thread(target=car,args=('Fit',))
car3 = threading.Thread(target=car,args=('Civic',))
car1.start()
car2.start()
car3.start()

看下效果:

 

 

 

 

做用:
1 解耦
2 提升工做效率
 
 
和列表相比,就是數據自動消失
 
 
數據存在內存中
 
隊列有幾種方式,以下
1 先進先出:

 

使用 get_nowait() 經過判斷異常知道隊列爲空

 

也能夠實現:

 

 能夠等待:

 

 能夠設置隊列長度:

 

卡住了
 
能夠設置阻塞狀態和等待時間 而後拋異常,判斷

後入先出:

 

 

 

設置隊列優先級:
可用於VIP 隊列:

 

優先級越低越優先

 

 

 

生產者:

 

消費者:

 

執行:

 

相關文章
相關標籤/搜索