第四章

本章內容

1.動態導入模塊html

2.粘包問題python

3.paramkio(ssh連接模塊)linux

4.多線程git

5.GIL鎖github

6.互訴鎖windows

7.遞歸鎖緩存

8.Semaphore(信號量)服務器

9.事件(多線程標誌位)網絡

10.隊列(queue)多線程

11.生產者消費者模型

12.多進程

13.進程之間通信

14.進程之間數據共享

15.進程池

16. 協程

17.事件驅動

18.堵塞IO 非堵塞,同步IO,異步IO 

1.動態導入模塊

aa.py
def test():
    print("ok")

class C:
    def __init__(self):
        self.name = 'abc'


__import__

data = __import__('day5.aa')
a =data.aa
a.test()

b =data.aa.C()
print(b.name)

--------------------------------------------

import importlib

aa = importlib.import_module('day5.aa')
print(aa.C().name)
aa.test()

---------------------------------------------
conn.send(str(len(cmd_res.encode())).encode("utf-8"))

-------------------------------------------------------------

2.粘包問題

while True:
    cmd = input(">>:").strip()
    if len(cmd) == 0:continue
    if cmd.startwith("get"):
        clinet.send(cmd.encode())
        file_toal_size = int(server_response.decode())
        received_size = 0
        filename = cmd.split()[1]
        f = open(filename,'wb')
        m = hashlib.md5()


        while received_size < file_toal_size:
            if file_toal_size - received_size > 1024:
                size = 1024
            else:
                size = file_toal_size - received_size
            data = client.recv(size)
            received_size += len(data)
            f.write(data)
        else:
            new_file_md5 = m.hexdigest()
			
			
			
--------------------------------------------

import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):
    def handler(self):
        while True:
            try:
                self.data = self.request.recv(1024).strip()
                print("{}wrote:".format(self.client_address[0]))
                print(self.data)
                self.request.send(self.data.upper())
            except ConnectionAbortedError as e:
                print("ree",e)
                break

if __name__ == "__main__":
    HOST,PORT = "localhost",9999

3.paramiko

paramiko模塊安裝

http://blog.csdn.net/qwertyupoiuytr/article/details/54098029

密碼連接

#!/usr/bin/env python
# _*_ encoding:utf-8 _*_

import paramiko

#建立SSH對象
ssh = paramiko.SSHClient()
#容許連接不在know_host文件主機中
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#連接服務器
ssh.connect(hostname='192.168.80.11',port=22,username='root',password='123.com')
#執行命令
stdin,stdout,stderr = ssh.exec_command('df')
#獲取結果
res,err = stdout.read(),stderr.read()

resilt = res if res else err

print(resilt)
#關閉鏈接
ssh.close()


-------------------------------------------------------

transport = paramiko.Transport(('192.168.80.11',22))
transport.connect(username='root',password='123.com')

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin,stdout,stderr = ssh.exec_command('df')
print(stdout.read())

transport.close()

密碼連接上傳/下載文件

transport = paramiko.Transport(('192.168.80.11',22))
transport.connect(username='root',password='123.com')

sftp = paramiko.SFTPClient.from_transport(transport)
#上傳文件
sftp.put('windows.txt','/root/win.txt')
#下載文件
#sftp.get('linux.txt','linux.txt')

transport.close()

免密碼連接

linux 拷貝公鑰
ssh-copy-id "root@192.168.80.11"

#指定公鑰
private_key = paramiko.RSAKey.from_private_key_file('id_rsa')

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.connect(hostname='192.168.80.11',port=22,username='root',pkey=private_key)

stdin,stdout,stderr = ssh.exec_command('df')
result = stdout.read()
print(result)
ssh.close()

免密碼上傳/下載文件

private_key = paramiko.RSAKey.from_private_key_file('id_rsa')

transport = paramiko.Transport(('192.168.80.11',22))
transport.connect(username='root',pkey=private_key)

sftp = paramiko.SFTPClient.from_transport(transport)

sftp.put('windows.txt','/root/windows.txt')
sftp.get('linux.txt','linux.txt')

transport.close()

4.多線程

io 操做不沾用cpu
計算佔用cpu,1+1
python多線程 不適合cpu密集操做型的任務,適合io密集型的任務操做

def run(n):
    print('task',n)
    time.sleep(2)

t1=threading.Thread(target=run,args=('t1',))
t2=threading.Thread(target=run,args=('t2',))

t1.start()
t2.start()


############
import threading
import time

class MyThread(threading.Thread):
    def __init__(self,n):
        super(MyThread,self).__init__()
        self.n = n
    def run(self):
        print("runnint task",self.n)

t1 = MyThread("t1")
t1.start()


################
start_time = time.time()

t_objs = []

def run(n):
    print('task',n)
    time.sleep(2)

for i in range(50):
    t = threading.Thread(target=run,args=('t-%s'%i,))
    t.start()
    t_objs.append(t)

for t in t_objs:
    t.join()

print('--------------all-------------')
print("cost",time.time() - start_time)

查看當前運行的主進程和主進程個數

print('-all--',threading.current_thread(),threading.active_count())

守護線程

import threading
import time

start_time = time.time()

t_objs = []

def run(n):
    print('task',n)
    time.sleep(2)


for i in range(50):
    t = threading.Thread(target=run,args=('t-%s'%i,))
    t.setDaemon(True)           #把子線程變成守護線程(守護線程,主線程執行完推出,不等待守護線程執行結束)
    t.start()
    t_objs.append(t)

for t in t_objs:
    t.join()

print('--------------all-------------',threading.current_thread(),threading.active_count())
print("cost",time.time() - start_time)

5.GIL鎖

python只能執行一個進程,因此在執的多進程工做室,是利用上下切花來完成的

由於python是調用C語言原始的進程接口,不能夠調整 進程工做的順序,在同一時間內只有一個進程在處理數據

6.互訴鎖

互訴鎖 防止上下切換覆蓋數據
 
import threading
import time

def run(n):
    lock.acpuire()   #加鎖
    global num
    num += 1
    time.sleep(1)
    lock.release()   #釋放鎖

lock = threading.Lock() #調用鎖

num = 0

t_objs = []

for i in range(50):
    t = threading.Thread(target=run,args=("t-%s"%i,))
    t.start()
    t_objs.append(t)


for  t in t_objs:
    t.join()

print('num',num)

7.遞歸鎖

遞歸鎖 防止鎖順序錯亂

import threading,time

def run1():
    print("grab the first part dara")
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num

def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2+=2
    lock.release()
    return num2

def run3():
    lock.acquire()
    res = run1()
    print("----run1 run2-----")
    res2 = run2()
    lock.release()
    print(res,res2)

if __name__ == '__main__':

    num,num2 = 0,0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() !=1:
    print(threading.active_count())

else:
    print(num,num2)

8.Semaphore(信號量)

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading,time

def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the threading:%s\n"%n)
    semaphore.release()

if __name__ == '__main__':


    semaphore = threading.BoundedSemaphore(5)  #容許5個線程同時運行
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()

while threading.active_count() !=1:
    pass
else:
    print('----all-------')

9.事件(多線程標誌位)

import threading,time
event = threading.Event()

def lighter():
    count = 0
    event.set()
    while True:
        if count >5 and count <10:
            event.clear()
            print("\033[41m--->紅燈\033[0m")
        elif count >10:
            event.set()
            count = 0

        else:
            print("\033[42m--->綠燈\033[0m")
        time.sleep(1)
        count +=1

def car(name):
    while True:
        if event.is_set():
            print("[%s] running..."% name)
            time.sleep(1)
        else:
            print("[%s]sees red light waiting.."%name)
            event.wait()
            print("\033[34m[%s] green ligth is on start going....\033[0m"%name)


light = threading.Thread(target=lighter,)
light.start()

car1 = threading.Thread(target=car,args=("Tesla",))
car1.start()

10.隊列(queue)

解耦,使程序直接耦合,提升程序效率,一個進程修改不影響其餘進程

10.1.先進先出

q = queue.Queue()
# q = queue.Queue(maxsize=3)  設置隊列數量
q.put(1)                      傳數據
q.put(2)
q.put(3)

print(q.qsize())              查看隊列大小
print(q.get())                取數據
print(q.get())
# print(q.get_nowait())       取數據爲空時不會卡住
# print(q.get(block=False))   設置false取數據爲空時不會卡住
print(q.get(timeout=1))       設置其數據時間爲1秒

10.2.先進後出

import queue
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(2)
print(q.get())
print(q.get())
print(q.get())

10.3.標記優先級

import queue
q = queue.PriorityQueue()
q.put((-1,"a"))
q.put((3,"b"))
q.put((6,"c"))

print(q.get())
print(q.get())
print(q.get())

11.生產者消費者模型

import threading
import queue


q = queue.Queue()

def producer():
    for i in range(10):
        q.put("骨頭%s"%i)
    print("開始等待骨頭被取走。。。")
    q.join()
    print("全部骨頭被取完了。。。")


def consumer(n):
    while q.qsize() >0:
        print("%s 取到"%n,q.get())
        q.task_done()


p = threading.Thread(target=producer,)
p.start()
# b = threading.Thread(target=consumer,args=("abc",))
# b.start()
consumer("abc")

12.多進程

def run(name):
    time.sleep(2)
    print("hello",name)

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=run,args=("bob %s"%i,))
        p.start()

def thread_run():
    print(threading.get_ident())  #返回當前線程的「線程標識符

def run(name):
    time.sleep(2)
    print("hello",name)
    t = threading.Thread(target=thread_run,)
    t.start()

if __name__ == '__main__':
    for i in range(10):
        p = multiprocessing.Process(target=run,args=("bob %s"%i,))
        p.start()

13.進程之間通信

from multiprocessing import Process,Queue

def f(qq):
    qq.put([42,None,'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f,args=(q,))
    p.start()
    print(q.get())
    p.join()


####################
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name',__name__)
    print('parent process',os.getppid())  打印父進程id
    print('process id:',os.getpid())      打印子進程id
    print("\n\n")

def f(name):
    info('\033[31mfunction f\033[0m')
    print('hello',name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f,args=('bob',))
    p.start()
    p.join()


	
#Pipes
#Pipe是經過管道傳送和接受數據的
from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42,None,"hello"])
    conn.close()

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=f,args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

14.進程之間數據共享

from multiprocessing  import Process,Manager
import os

def f(d,l):

    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)
    print(d)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))

        p_list =[]
        for i in range(10):
            p = Process(target=f,args=(d,l,))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        # print(d)
        # print(l)
		
----------------------------------------------

from multiprocessing import Process,Lock
def f(l,i):
    l.acquire()
    try:
        print('hello world',i)
    finally:
        l.release()

if __name__ == '__main__':
     lock = Lock()

     for num in range(10):
         Process(target=f,args=(lock,num)).start()

15.進程池

from multiprocessing import process,Pool,freeze_support   (windows 須要加,freeze_support)
import time,os

def Foo(i):
    time.sleep(2)
    print('in process',os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:',arg,os.getpid())

if __name__ == '__main__':
    pool = Pool(processes=3)      容許進程池同時放入5個進程
    print("主進程",os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo,args=(i,),callback=Bar)  #同步(並行)    (callback方法 執行完Foo執行Bar 避免重複的長鏈接)
        #pool.apply(func=Foo,args=(i,)) 串行    
    print('end')
    pool.close()
    pool.join()

 

16.協程

import time
import queue
def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        #time.sleep(1)
 
def producer():
 
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n +=1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
 
 
if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    producer()

#########################

from greenlet import greenlet

def test1():
    print(12)      #2
    gr2.switch()    #切換
    print(34)       #4
    gr2.switch()    #切換

def test2():
    print(56)       #3
    gr1.switch()    #切換
    print(78)       #5

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()         #1

  

16.1.自動切換

協程切換原理 遇到IO操做就切換,執行時間短的先執行,(IO 爲等待時間)

import gevent

def foo():
    print('foo 1')   #1
    gevent.sleep(2)
    print('foo 2') #6

def bar():
    print('bar 1')  #2
    gevent.sleep(1)
    print('bar 2') #5

def func3():
    print("func 1") #3 
    gevent.sleep(0)
    print('func 2') #4 


gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
    gevent.spawn(func3),
])

  

16.2.利用協程爬蟲

#!/usr/bin/env python
# _*_ encoding:utf-8 _*_

from greenlet import greenlet
from urllib import request
import gevent,time
from gevent import monkey
monkey.patch_all()           #把當前程序全部的IO操做作上標記(不然gevent沒法識別)

def f(url):
    print('GET: %s'% url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s'%(len(data),url))

urls = ['https://www.python.org/ ',
        'https://www.yahoo.com/ ',
       'https://github.com/ ']

time_start = time.time()
for url in urls:
    f(url)

print("cost",time.time() - time_start)

async_time_start = time.time()

gevent.joinall([
    gevent.spawn(f,'https://www.python.org/ '),
    gevent.spawn(f,'https://www.yahoo.com/ '),
    gevent.spawn(f,'https://github.com/ '),])

print("cost",time.time() - async_time_start)

  

16.3.協程socket

server

#!/usr/bin/env python
# _*_ encoding:utf-8 _*_

import sys
import socket
import time
import gevent

from gevent import socket,monkey
monkey.patch_all()

def server(port):
    s = socket.socket()
    s.bind(('localhost',port))
    s.listen(500)
    while True:
        cli,addr = s.accept()
        gevent.spawn(handle_request,cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print('recv:',data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)
    except Exception as ex:
        print(ex)
    finally:
        conn.close()

if __name__ == '__main__':
    server(8001)


clinet

import socket

HOST = 'localhost'
PORT = 8001
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((HOST,PORT))

while True:
    msg = bytes(input(">>"),encoding='utf8')
    s.sendall(msg)
    data = s.recv(1024)
    print('Received',data)

s.close()

  

17.事件驅動

對事件作處理 例如:點擊鼠標 放到一個時間列表 按鍵盤放到一個時間列表 有一個進程來處理

  

18.堵塞IO 非堵塞,同步IO,異步IO 

http://www.cnblogs.com/alex3714/articles/5876749.html

18.1 .文件描述

服務收到一個事件,會放到對應的列表裏面,文件描述符就是對應的索引,而索引對應是文件句柄(文件對象)

18.2.緩存io

程序不能夠直接調用系內核,程序打開文件都是調用內核來完成的,例如拷貝文件,是先拷貝到內核緩存區 而後再拷貝到io

18.3.阻塞IO (blocking IO)

一個進程正在執行,另外一個進程在等待,就形成了堵塞。

18.4 .非阻塞IO (nibulocking IO)

當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,
而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,
而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,
因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,
那麼它立刻就將數據拷貝到了用戶內存,而後返回。

因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。

18.5.IO多路複用(IO multipexing)

IO multiplexing就是咱們說的select,poll,epoll,
有些地方也稱這種IO方式爲event driven IO。
select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。
它的基本原理就是select,poll,
epoll這個function會不斷的輪詢所負責的全部socket,
當某個socket有數據到達了,就通知用戶進程。


當用戶進程調用了select,那麼整個進程會被block,
而同時,kernel會「監視」全部select負責的socket,
當任何一個socket中的數據準備好了,select就會返回。
這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。

因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,
而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。


18.6.IO多路複用之 select,poll,epoll的區別

select,poll,epoll是IO多路復中監視數據

select

例若有100個連接過來 內核檢測到其中只有兩個有數據,內核不會告訴select
所以select 須要本身循環查找消耗事件,
select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,
不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。

poll

poll在1986年誕生於System V Release 3,
它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。


epoll

它幾乎具有了以前所說的一切優勢,
被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。

例若有100個連接過來 內核檢測到其中只有兩個有數據,
內核會直接告訴epoll只有兩個有數據 就不用本身查找。

 

18.7.異步IO (asyncchronous IO)

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。
而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,
首先它會馬上返回,因此不會對用戶進程產生任何block。而後,
kernel會等待數據準備完成,而後將數據拷貝到用戶內存,
當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

 

阻塞IO,非阻塞IO,IO多路複用:都爲同步IO
異步IO:異步IO

 

  IO多路複用之select模式

 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 
 4 import socket
 5 import queue
 6 import select
 7 
 8 msg_dic = {}
 9 
10 server = socket.socket()
11 server.bind(('localhost',9000))
12 server.listen(1000)
13 
14 #不阻塞
15 server.setblocking(False)
16 
17 inputs = [server,]
18 outputs = []
19 
20 while True:       #新鏈接 ,下次循環執行,異常
21     readable,writeable,execeptional=select.select(inputs,outputs,inputs)    #io多路複用select模式
22     for r in readable:
23         if r is server:  #表明來了一個新鏈接
24             conn,addr = server.accept()
25             print("來了新鏈接",addr)
26             inputs.append(conn)   #是由於這個新建的連接尚未發送數據過來,如今就接收,
27             #因此要想實現這個客戶端發送數據來時server能知道,就讓server再監測這個conn.
28 
29             msg_dic[conn]= queue.Queue()  #初始化一個隊列,後面存要返給這個客戶端的數據
30         else:
31              data = r.recv(1024)
32              print('收到數據',data)
33              msg_dic[r].put(data)  #把新來的連接添加隊列
34              outputs.append(r)     #添加outputs下次循環執行
35 
36     for w in writeable:    #要返回給客戶端的連接列表
37         data_to_client = msg_dic[w].get() #取隊列數據
38         w.send(data_to_client)            #發送數據
39         outputs.remove(w)                 #確保下次循環的時候writeabke,不反回已經處理完的連接
40 
41     for e in execeptional:
42         if e in outputs:
43             outputs.remove(e)   #刪除
44 
45         inputs.remove(e)      #刪除
46 
47         del msg_dic[e]  #刪除 
48 
49 server
server
 1 import socket
 2 
 3 HOST = 'localhost'
 4 PORT = 9000
 5 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 6 s.connect((HOST,PORT))
 7 
 8 while True:
 9     msg = bytes(input(">>"),encoding='utf8')
10     s.sendall(msg)
11     data = s.recv(1024)
12     print('Received',data)
13 
14 s.close()
client

 IO多路複用之epoll模式

 1 #!/usr/bin/env python
 2 # _*_ encoding:utf-8 _*_
 3 import selectors
 4 import socket
 5 
 6 sel = selectors.DefaultSelector()
 7 
 8 def accept(sock,mask):
 9     conn,addr = sock.accept()
10     conn.setblocking(False)
11     sel.register(conn,selectors.EVENT_READ,read)
12 
13 def read(conn,mask):
14     data = conn.recv(1000)
15     if data:
16         print('echoing', repr(data), 'to', conn)
17         conn.send(data)
18     else:
19         print('closing', conn)
20         sel.unregister(conn)
21         conn.close()
22 
23 
24 
25 sock = socket.socket()
26 sock.bind(('localhost',9999))
27 sock.listen(100)
28 sock.setblocking(False)       #設置非堵塞
29 sel.register(sock,selectors.EVENT_READ,accept)  #註冊
30 
31 while True:
32     events = sel.select()  #默認堵塞,有活動連接就返回活動連接列表
33     for key,mask in events:  #有連接過來
34         callback = key.data    #accept
35         callback(key.fileobj,mask) #執行accept函數 key.fileobj=conn
server
 1 import socket
 2 import sys
 3 
 4 messages = ['This is the mess',
 5             'It will be sent',
 6             'in parts',
 7             ]
 8 
 9 server_address = ('localhost',9999)
10 
11 socks = [socket.socket(socket.AF_INET,socket.SOCK_STREAM) for i in range(5)]
12 
13 for s in socks:
14     s.connect(server_address)
15 
16 for message in messages:
17     for s in socks:
18         s.send(message.encode())
19         print('send %s %s' % s.getsockname(),message)
20 
21     for s in socks:
22         data = s.recv(1024)
23         print('recv %s %s'% s.getsockname(),data)
24         if not data:
25             print('not data %s %s'%s.getsockname(),data)
26             s.close()
client
相關文章
相關標籤/搜索