Python學習day07 多線程多進程及主機管理

#Created on 2016年12月30日python

第一課 課前廢話 28minutesshell

能夠經過Queue來限制線程的數量,就像生產者和消費者緩存

第二課 上節內容回顧 20minutes
上節回顧:
socket文件傳送
多線程:
計算密集型:多進程,須要CPU
I/O密集型:多線程 ,不需CPU
線程安全:線程鎖 GLL
event:線程間通訊
線程事件 event.set(),event.wait()
兩種寫法:直接調用函數,另外一種直接繼承類
生產者消費者模型:加快效率,鬆耦合
paramiko
ssh通訊原理:認證加密,通訊加密

本節內容:
多進程
Paramiko
審計開發
Select異步模型安全

第三課 多進程使用 14minutes
Multiprocessing
進程:由系統控制bash

簡單的多進程
a = range(10)

def main(n):
time.sleep(1)
print n*n
return n*n

if __name__ == '__main__':
p = Pool(5)
print p.map(main,a)多線程


第四課 子進程與父進程的關係 25minutes併發

每個程序都有主進程和子進程
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys,time,re,os
from multiprocessing import Pool,Process

def aa(x):
print x
print 'ppid:',os.getppid()
time.sleep(1)
print 'pid:',os.getpid()

aa('fffff')
print '-----------'
p = Process(target=aa,args=('sssss',))
p.start()
p.join()

結果:
[root@Rsyslog day7]# python multiprocess.py
fffff
ppid: 7730
pid: 7760
-----------
sssss
ppid: 7760
pid: 7761
[root@Rsyslog day7]# app


第五課 進程間的內存同步方法Queue 31minutesssh

如下例子多進程沒法通訊的,但在Win下老死機,須要Linux測試
def aa(info_list,n):
info_list.append(n)
print info_list

info = []
for i in range(10):
#進程不共享內存,因此每次只出來一個
p = Process(target=aa,args=(info,i))
p.start()
[root@Rsyslog day7]# python multiprocess-1.py
[1]
[0]
[2]
[3]
[4]
[5]
[7]
[6]
[8]
[9]
[root@Rsyslog day7]#


#若是用多線程,它由於是共享內存的,因此會出來不少
p = threading.Thread(target=aa,args=(info,i))
p.start()


[root@Rsyslog day7]# python multiprocess-1.py
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5, 6, 7, 8]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

--------------------------------------------------
問題:若是經過Queue解決上面的問題
下面例子中,主進程和子進程的Q是不同的結果,相互獨立
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys,time,re,os
from multiprocessing import Pool,Process,Queue異步

def f(q,n):
q.put([n,'hello'])
print q.get()

q = Queue()
for i in range(5):
p = Process(target=f,args=(q,i))
p.start()
p.join()


[root@Rsyslog day7]# python multi-2.py
[0, 'hello']
[1, 'hello']
[2, 'hello']
[3, 'hello']
[4, 'hello']
[root@Rsyslog day7]#
-------------------------------------------------------

第六課 進程間的內存同步方法manager 29minutes

進程裏的會直接Copy外部變量的數據,因此子進程內部和外面的就是結果是不同

共享內存方法:Value,Array-------須要練習
只能進行數字和列表共享
from multiprocessing import Array,Value

def run(v,a):
v.value = 3.1415926
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
v = Value('d',0.0)
a = Array('i',range(10))

p = Process(target=run,args=(v,a))
p.start()
p.join()

print v.value
print a[:]
結果:
3.1415926
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

manager 支持更多的共享
可以使用Manager方式更換數據
def run(d,l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()

if __name__ == '__main__':
man= Manager()
d = man.Dict()
l = man.list(range(10))
p = Process(target=run,args=(d,l))
p.start()
p.join()

print d
print l
結果:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


#Created on 2017年01月01日
第七課 經過Pool產生多進程1 20minutes
第8課 經過Pool產生多進程2 9minutes

多進程數量和CPU核數相對便可,不可太多.

def run(x):
print x*x
time.sleep(1)

pool = Pool(processes=4)
re_list = []
# for i in range(10):
# res = pool.apply_async(run, [i,]) #異步
# #res = pool.apply(run, [i,]) #同步,基本不用
# res.get()
#上面For循環等同於如下命令
print pool.map(run,range(10))
-----------------------------------------------
一個簡單的多進程的池的練習
[root@Rsyslog test_script]# cat process_test_ping.py
#!/usr/bin/env python
import multiprocessing
import subprocess
host_list = ['192.168.100.254','1.1.1.1','192.168.100.253','114.28.127.2','114.28.127.72','114.28.127.70','114.28.127.12','114.28.127.56','114.28.127.102']
if len(host_list) > 30:
process_number = 30
else:
process_number = len(host_list)
def ping_host(ipaddr):
if subprocess.call('ping -c1 -W 1 %s > /dev/null' % ipaddr, shell=True) == 0:
print '%s is OK' % ipaddr
else:
print '%s is DOWN' % ipaddr
pool = multiprocessing.Pool(processes=process_number)
for ip in host_list:
pool.apply_async(ping_host,(ip,))
#pool.map(ping_host,host_list)
pool.close()
pool.join()
[root@Rsyslog test_script]#

#Created on 2017年01月03日
第九課 開發審計堡壘機 56minutes

主要用Paramiko源碼下的demo.py文件

第十課 開發審計堡壘機修改Paramiko源碼記錄操做 29minutes
修改Demo下的一個文件

第十一課 審計堡壘的安全控制 15minutes

登陸後自動啓動腳本
將腳本放到用戶的環境變量中
在/home/allan/.bashrc 用戶環境變量
將用戶的腳本放到用戶的環境變量中
shellinabox -t 不用證書


第十二課 select-Poll-epoll介紹 47minutes
Select VS poll &epoll
異步I/O模型
Nginx可經過異步支持上萬個併發

ulimit -n 顯示當前Shell終端同時最大打開多少個文件

Select單進程多併發,文件描述符限制爲1024
Select目前基本不用,無太大併發可用

Poll改進版:本質同Select,無文件描述符限制
水平觸發:以上兩種將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用
Select和poll的時候再次報告這些描述符,因此通常不會丟失,這就是水平觸發。
這種操做速度慢,且耗時,若是有一萬個描述符,要輪循好久。

Epoll:水平觸發和邊緣觸發
邊緣觸發:只告訴進程哪些文件描述符就緒,只說一次,若是沒有采起行動,不會再次告知。

 

#Created on 2017年01月03日
第十三課 select代碼實例分析 54minutes

select異步例子,須要實例化操做。

第十四課 審計做業 14minutes

審計開發
用戶可登陸和操做指定機器,機器組
用戶可經過不一樣的遠程賬號登陸遠程主機:
在選擇主機後可選擇收號登陸,如Root或其餘

做業二:
Select代碼做業,加註釋

Server:

#!/usr/bin/env python
# encoding: utf-8

#Created on 2017年1月11日
#@author: Administrator

#--------------select異步說明--------------------------
import select
import socket
import Queue
import sys

def select_server():
    server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #設定SOcket爲非阻塞模式,當前容許鏈接,但會話需等待,不設定有鏈接時不容許鏈接
    server.setblocking(0)  
    address_port = ('127.0.0.1',9999)
    print >>sys.stderr,'starting up on %s port %s' % address_port
    server.bind(address_port)
    server.listen(5)
    
    '''
    select()方法接收並監控3個通訊列表, 第一個是全部的輸入的data,就是指外部發過來的數據,
    第2個是監控和接收全部要發出去的data(outgoing data),第3個監控錯誤信息,接下來咱們需
    要建立2個列表來包含輸入和輸出信息來傳給select().
    '''
    inputs = [server]
    outputs = []
    '''
    全部客戶端的進來的鏈接和數據將會被server的主循環程序放在上面的list中處理,咱們如今的
    server端須要等待鏈接可寫(writable)以後才能過來,而後接收數據並返回(所以不是在接收到
    數據以後就馬上返回),由於每一個鏈接要把輸入或輸出的數據先緩存到queue裏,而後再由select取
    出來再發出去。
    '''
    message_que = {}
    
    while inputs:
        #select將輪循三張表,將結果返回給前面的三個參數
        print '\nnow wait for connect.......'
        readable,writable,exception = select.select(inputs,outputs,inputs)
        
        
        for s in readable:  #輪循inputs表
            #第一種狀況是S是Server自己,則開始接收客戶端
            if s is server:  #若是目標是當前Server,就是socket對象
                #接收鏈接
                connection,address = s.accept()
                #將鏈接對象放到表中
                inputs.append(connection)
                #將鏈接對象放到Queue中
                message_que[connection] = Queue.Queue()
            #第二種狀況:若是已鏈接,則接收數據
            else:  #若是已鏈接,則接收數據
                data = s.recv(1024)
                if data: #若是有數據
                    message_que[s].put(data) #將數據放到Queue中
                    if s not in outputs:   #若是s不在outputs中,則加入
                        outputs.append(s)
                #第三種狀況,客戶端已斷開,接收數據爲空,則關閉客戶端
                else:
                    inputs.remove(s)  #刪除inputs中的鏈接
                    if s in outputs:  #刪除outputs中的鏈接
                        outputs.remove(s)
                    s.close()      #關閉鏈接
                    del message_que[s]   #刪除Queue中的數據
        '''
    對於writable list中的socket,也有幾種狀態,若是這個客戶端鏈接在跟它對應的queue裏有數據,
    就把這個數據取出來再發回給這個客戶端,不然就把這個鏈接從output list中移除,這樣下一次循環
    select()調用時檢測到outputs list中沒有這個鏈接,那就會認爲這個鏈接還處於非活動狀態
        '''            
    
        for s in writable:
            try:  #取出Queue中的數據
                next_msg = message_que[s].get_nowait()
            except Queue.Empty: #若是數據爲空,則出現錯誤
                #print >>sys.stderr,'output queue for',s.getpeername(),'is empty'
                outputs.remove(s)     #刪除putputs中的鏈接       
            else:  #若是正常取出,則發送給客戶端
                #print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername())
                s.send(next_msg.upper()+'  =====> from server')
                
        for s in exception:  #若是出現錯誤
            inputs.remove(s)  #刪除inputs表的項
            if s in outputs:
                outputs.remove(s) #刪除outputs表的項
            s.close()    #關閉鏈接
            del message_que[s]   #刪除Queue中的項
            
if __name__ == '__main__':
    select_server()    

 Client:

#!/usr/bin/env python
# encoding: utf-8

#Created on 2017年1月11日
#@author: Administrator


import socket
import sys

#開兩個Socket鏈接
socks  = [socket.socket(socket.AF_INET,socket.SOCK_STREAM),
          socket.socket(socket.AF_INET,socket.SOCK_STREAM)]
address_port = ('127.0.0.1',9999)
#定義發送的信息
message = ['helle,this is first data',
           'helle,this is second data',
           'helle,this is thrid data']

#經過迭代方式打開兩個Socket進程
for s in socks:
    try:
        s.connect(address_port)
    except:
        print 'error'

#迭代發送信息
for msg in message:
    #發送
    for s in socks:
        s.send(msg)
        print >>sys.stderr,'%s sending "%s" '%(s.getsockname(),msg)
    #接收
    for s in socks:
        data = s.recv(1024)
        print >>sys.stderr,'%s recevid "%s"' % (s.getsockname(),data)
        if not data:
            print >>sys.stderr,'closeing socket',s.getsockname()
相關文章
相關標籤/搜索