Python線程和協程

 寫在前面



    好好學習 每天向上html



 

1、線程

  1.關於線程的補充

線程:就是一條流水線的執行過程,一條流水線必須屬於一個車間;
那這個車間的運行過程就是一個進程;

即一個進程內,至少有一個線程;

進程是一個資源單位,真正幹活的是進程裏面的線程;
線程是一個執行單位;

多線程:一個車間內有多條流水線,多個流水線共享該車間的資源;
一個進程內有多個線程,多線程共享一個進程的資源;

線程建立的開銷要遠遠小於建立進程的開銷;

進程之間更多的是一種競爭關係;
線程之間更多的是一種協做關係;


爲什麼要建立多線程?  即從線程的優勢考慮
1.共享資源
2.建立開銷比較小
> 就是爲了要實現併發的效果

  2.線程的建立(和進程相似)

    - 1.建立線程的第一種方式-示例1python

from threading import Thread
import os

def work():
    print('線程:%s' % os.getpid())

if __name__ == '__main__':
    t = Thread(target=work)
    t.start()
    print('主線程:%s' % os.getpid())

---
線程:8028
主線程:8028

      - 示例2:子線程id等於主線程id,而且統一個進程下的多個子線程id都同樣mysql

import threading
from threading import Thread
import os

def work():
    print('線程名字:%s,PID:%s' % (threading.current_thread().getName(),os.getpid()))
def work2():
    print('線程名字:%s,PID:%s' % (threading.current_thread().getName(),os.getpid()))
if __name__ == '__main__':
    t = Thread(target=work)
    t2 = Thread(target=work)
    t.start()
    t2.start()
    print('主線程名字:%s,PID:%s' % (threading.current_thread().getName(),os.getpid()))

---
線程名字:Thread-1,PID:7188
線程名字:Thread-2,PID:7188
主線程名字:MainThread,PID:7188

    - 2.建立子線程的第二種方式git

# 繼承 Thread類
import os
from threading import Thread
class Work(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('子線線程, PID: %s, PPID: %s' % (os.getpid(),os.getppid()))
if __name__ == '__main__':
    t = Work('standby')
    t.start()
    print('我是主線程,PID:%s, PPID: %s' % (os.getpid(),os.getppid()))

---
子線線程, PID: 5424, PPID: 6076
我是主線程,PID:5424, PPID: 6076

  3.線程經常使用屬性

# threading的一些經常使用屬性
from threading import Thread
import threading
import os,time

def work():
    # time.sleep(0.5)
    print('子線程,PID:%s, PPID: %s' % (os.getpid(), os.getppid()))

if __name__ == '__main__':
    t = Thread(target=work)
    t.start()
    t.join()
    print(threading.enumerate()) # 查看當前活躍的線程對象,是一個列表形式,有時候包含子線程,有的時候不包含子線程
    print(threading.active_count()) # 當前活躍的線程數目
    print('主線程,PID:%s, PPID: %s\t個人本質:%s' % (os.getpid(), os.getppid(),threading.current_thread().getName()))



'''
1.沒有加t.join() 而且work函數裏沒有time.sleep(0.5) 這個的狀況下:
    狀況1:
    子線程,PID:6048, PPID: 3216
    [<_MainThread(MainThread, started 5280)>, <Thread(Thread-1, started 7112)>]
    2
    主線程,PID:6048, PPID: 3216	個人本質:MainThread
    
    狀況2:
    子線程,PID:7096, PPID: 3216
    [<_MainThread(MainThread, started 4444)>]
    1
    主線程,PID:7096, PPID: 3216	個人本質:MainThread
2.沒寫join,但work函數里加上time.sleep(0.5)這一行的狀況下:
    [<Thread(Thread-1, started 7008)>, <_MainThread(MainThread, started 6604)>]
    2
    主線程,PID:6592, PPID: 3216	個人本質:MainThread
    子線程,PID:6592, PPID: 3216
3.main里加上 join,可是work裏沒有 sleep的狀況下:
    子線程,PID:3988, PPID: 3216
    [<_MainThread(MainThread, started 6516)>]
    1
    主線程,PID:3988, PPID: 3216	個人本質:MainThread
'''

    - setDaemon() 和 join()github

# 沒有設置 setDaemon(True) 和 join() 的狀況
from threading import Thread
import time
def say(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=say,args=('standby',))
    t.start()
    print('主線程')
    print(t.is_alive())

---執行結果---
主線程
True
standby say hello
# 設置 setDaemon(True) , 沒有join()的狀況
from threading import Thread
import time
def say(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=say,args=('standby',))
    t.setDaemon(True)
    t.start()
    print('主線程')
    print(t.is_alive())

---執行結果:子線程裏的print操做並未執行,子線程跟隨主線程的退出而被動結束了---
主線程
True
# 設置 join() 但沒設置 setDaemon(True) 的狀況
from threading import Thread
import time
def say(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=say,args=('standby',))
    # t.setDaemon(True)
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())

---執行結果:主線程等待子線程執行完再往下執行---
standby say hello
主線程
False
# join() 和 setDaemon(True) 都設置的狀況
from threading import Thread
import time
def say(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=say,args=('standby',))
    t.setDaemon(True)
    t.start()
    t.join()
    print('主線程')
    print(t.is_alive())

---執行結果:join操做使得主線程阻塞了,即等待子線程執行完畢再執行主線程---
standby say hello
主線程
False

  4.多線程

    - 同一進程下的多個線程共享該進程的資源; web

    - 1.多線程和多進程開銷對比示例redis

# 建立 500 個線程
import time
from threading import Thread
def work():
    a = 99999
    b = 101001010010101010
    str1 = 'axaxxchaxchnahxalx'
    str2 = 'axaxxcedw2312haxchnahxalx'
    str3 = '121212axaxxchaxchnahxalx'
    dic = {'k1':'v1','k2':'v2'}

if __name__ == '__main__':
    start_time = time.time()
    t_l = []
    for i in range(500):
        t=Thread(target=work)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    stop_time = time.time()
    print('Run time is %s' % (stop_time-start_time))
# Run time is 0.05900001525878906

# ++++++++++++++++++++++++++++++++++

# 建立 500 個進程
import time
from multiprocessing import Process
def work():
    a = 99999
    b = 101001010010101010
    str1 = 'axaxxchaxchnahxalx'
    str2 = 'axaxxcedw2312haxchnahxalx'
    str3 = '121212axaxxchaxchnahxalx'
    dic = {'k1':'v1','k2':'v2'}

if __name__ == '__main__':
    start_time = time.time()
    p_l = []
    for i in range(500):
        p=Process(target=work)
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    stop_time = time.time()
    print('Run time is %s' % (stop_time-start_time))
# Run time is 19.552000045776367

    - 2.多線程實現socket,改寫以前多進程的方式;sql

# 經過線程Thread實現socket併發
# 服務端
from threading import Thread
from socket import *

def talk(conn,addr):
    try:
        while True: #通信循環
            msg=conn.recv(1024)
            if not msg:break
            print('client %s:%s msg:%s' % (addr[0], addr[1], msg))
            conn.send(msg.upper())
    except Exception as e:
        print('與 ',addr,' 的通訊循環發生的異常:%s' % e)
    finally:
        conn.close()

def server(ip,port):
    server = socket(AF_INET, SOCK_STREAM)
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    server.bind((ip,port))
    server.listen(5)
    while True: #連接循環
        conn,addr=server.accept()
        print('client: ',addr)
        t=Thread(target=talk,args=(conn,addr))
        t.start()

if __name__ == '__main__':
    server('127.0.0.1', 8090)
# 客戶端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8090))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

    - 3.多線程示例:多線程模擬實現文件編輯器的功能json

from threading import Thread
user_input_list = []
formated_list = []
def talk():          # 接收用戶輸入
    while True:
        user_input = input('>>>\t').strip()
        if not user_input:
            continue
        user_input_list.append(user_input)
def format():        # 格式化用戶輸入,這裏只是簡單的作了下 upper操做
    while True:
        if user_input_list:
            res = user_input_list.pop()
            res = res.upper()
            formated_list.append(res)
def save():          # 保存到磁盤中
    while True:
        if formated_list:
            msg = formated_list.pop()
            with open('db.txt','a') as wf:
                wf.write("\n%s" % msg)
if __name__ == '__main__':
    t1 = Thread(target=talk)
    t2 = Thread(target=format)
    t3 = Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

 

2、GIL

  1.定義

參考:http://www.dabeaz.com/python/UnderstandingGIL.pdf緩存

定義:
In CPython, the global interpreter lock, or GIL, 
is a mutex that prevents multiple native threads from executing Python bytecodes at once. 
This lock is necessary mainly because CPython’s memory management is not thread-safe. 
(However, since the GIL exists, 
other features have grown to depend on the guarantees that it enforces.)


結論:
在Cpython解釋器中:
同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核優點;

    - 首先須要明確的一點是GIL並非Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念;

    - GIL使得同一時刻統一進程中只有一個線程被執行

    - 進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點;

    - GIL 保護的是解釋器級別的數據,而Lock保護的是代碼級別的數據;(Python的垃圾回收機制

  2.Egon老師點睛之言

1. cpu究竟是用來作計算的,仍是用來作I/O的?  計算

2. 多cpu,意味着能夠有多個核並行完成計算,因此多核提高的是計算性能

3. 每一個cpu一旦遇到I/O阻塞,仍然須要等待,因此多核對I/O操做沒什麼用處

結論:
1.對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用
2.固然對於一個程序來講,不會是純計算或者純I/O;
咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,
從而進一步分析python的多線程有無用武之地

  3.多進程和多線程在不一樣環境下的實際效果分析對比

分析:

咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,越快完成越好;
解決方案能夠是:

    方案一:開啓四個進程

    方案二:一個進程下,開啓四個線程

    - 1.單核狀況下

若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝;

若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝;

    - 2.多核狀況下

若是四個任務是計算密集型,多核意味着並行計算,因此方案一能夠實現並行計算;
在Cpython中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝

若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

    - 3.結論

如今的計算機基本上都是多核,
python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換);
可是,對於IO密集型的任務效率仍是有顯著提高的;

    - 4.實驗驗證(4核的機器)

# 1.在計算密集型 - 多進程測試
# run time is :34.567999839782715
from multiprocessing import Process
import time
def f1():
    res=0
    for i in range(100000000):
        res += i

if __name__ == '__main__':
    p_l=[]
    start_time=time.time()
    for i in range(10):
        p=Process(target=f1)
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    stop_time=time.time()
    print('run time is :%s' %(stop_time-start_time))

===

# 1.在計算密集型 - 多線程測試
# run time is :66.21500015258789
from threading import Thread
import time
def f1():
    res=0
    for i in range(100000000):
        res += i

if __name__ == '__main__':
    p_l=[]
    start_time=time.time()
    for i in range(10):
        p=Thread(target=f1)
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()
    stop_time=time.time()
    print('run time is :%s' %(stop_time-start_time))
# 2.I/O密集型 - 多進程
# run time is 3.6579999923706055
from multiprocessing import Process
import time
import os
def work():
    with open('db.txt',mode='r',encoding='utf-8') as rf:
        res = rf.read()
        print('%s --> %s' % (os.getpid(),res))

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(100):
        t=Process(target=work)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

===

# 2.I/O密集型 - 多線程
# run time is 0.02200007438659668
from threading import Thread
import time
import os
def work():
    with open('db.txt',mode='r',encoding='utf-8') as rf:
        res = rf.read()
        print('%s --> %s' % (os.getpid(),res))

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=work)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

    - 5.應用場景

多線程用於IO密集型,如socket,爬蟲,web;
多進程用於計算密集型,如金融分析; 

3、鎖

GIL vs Lock

鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據

保護不一樣的數據就應該加不一樣的鎖。

GIL 與Lock是兩把鎖,保護的數據不同:
GIL是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據);
Lock是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理;


由於Python解釋器幫你自動按期進行內存回收,
你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的;

此時你本身的程序裏的線程和py解釋器本身的線程是併發運行的,
假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,
可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,
結果就有可能新賦值的數據被刪除了!!!

爲了解決相似的問題,python解釋器簡單粗暴的加了鎖:
即當一個線程運行時,其它線程都不能動;
這樣就解決了上述的問題,這能夠說是Python早期版本的遺留問題;

  1.互斥鎖

鎖一般被用來實現對共享資源的同步訪問。
爲每個共享資源建立一個Lock對象,
當你須要訪問該資源時,調用acquire方法來獲取鎖對象
(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),
待資源訪問完後,再調用release方法釋放鎖;

    - 1.先模擬下沒有鎖的狀況

from threading import Thread
import time
n=100
def work():
    global n       #在每一個線程中都獲取這個全局變量
    temp=n
    time.sleep(0.1)
    n=temp-1   # 對此公共變量進行-1操做

if __name__ == '__main__':
    t_l=[]
    for i in range(100):
        t=Thread(target=work)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(n)

---
結果是 99 而不是 0

    - 2.加互斥鎖的狀況

from threading import Thread,Lock
import time
n=100
def work():
    with mutex:   # 或者使用acquire()與release()
        global n
        temp=n
        time.sleep(0.1)
        n=temp-1

if __name__ == '__main__':
    mutex = Lock()
    t_l=[]
    for i in range(100):
        t=Thread(target=work)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(n)

---
結果是 0

  2.死鎖

死鎖是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象;
若無外力做用,它們都將沒法推動下去。
此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程;

    - 線程共享進程的數據,因此不用把鎖當作參數傳到子線程中

    - 示例代碼:

    - 線程1拿到A,而後拿到B,而後釋放B,而後釋放A,而後再去拿B,此時被釋放的A被線程2搶到;

    - 線程2想要線程1佔據的B鎖,線程1想要線程2佔據的A鎖,而且雙方都不釋放,最終死鎖;

from threading import Thread,Lock
import time
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()
        print('\033[45m%s 拿到A鎖\033[0m' %self.name)
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' % self.name)
        mutexB.release()
        mutexA.release()
    def f2(self):
        mutexB.acquire()
        time.sleep(1)
        print('\033[43m%s 拿到B鎖\033[0m' % self.name)
        mutexA.acquire()
        print('\033[45m%s 拿到A鎖\033[0m' % self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    mutexA=Lock()
    mutexB=Lock()
    for i in range(20):
        t=MyThread()
        t.start()

---
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖
Thread-1 拿到B鎖

而後就卡主了...
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量:
counter記錄了acquire的次數,從而使得資源能夠被屢次require。
直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
上面的例子若是使用RLock代替Lock,則不會發生死鎖:

 mutexA=mutexB=threading.RLock() 
#一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1;
這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止;

  3.遞歸鎖

from threading import Thread,RLock
import time
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()
        print('\033[45m%s 拿到A鎖\033[0m' %self.name)
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' % self.name)
        mutexB.release()
        mutexA.release()
    def f2(self):
        mutexB.acquire()
        time.sleep(1)
        print('\033[43m%s 拿到B鎖\033[0m' % self.name)
        mutexA.acquire()
        print('\033[45m%s 拿到A鎖\033[0m' % self.name)
        mutexA.release()
        mutexB.release()

if __name__ == '__main__':
    mutexA=mutexB=RLock()   # 遞歸鎖
    for i in range(20):
        t=MyThread()
        t.start()

---
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-1 拿到A鎖
Thread-2 拿到A鎖
Thread-2 拿到B鎖
Thread-2 拿到B鎖
Thread-2 拿到A鎖
Thread-4 拿到A鎖
Thread-4 拿到B鎖
Thread-4 拿到B鎖
Thread-4 拿到A鎖
Thread-6 拿到A鎖
Thread-6 拿到B鎖
Thread-6 拿到B鎖
Thread-6 拿到A鎖
Thread-8 拿到A鎖
Thread-8 拿到B鎖
Thread-8 拿到B鎖
Thread-8 拿到A鎖
Thread-10 拿到A鎖
Thread-10 拿到B鎖
Thread-10 拿到B鎖
Thread-10 拿到A鎖
Thread-12 拿到A鎖
Thread-12 拿到B鎖
Thread-12 拿到B鎖
Thread-12 拿到A鎖
Thread-14 拿到A鎖
Thread-14 拿到B鎖
Thread-14 拿到B鎖
Thread-14 拿到A鎖
Thread-16 拿到A鎖
Thread-16 拿到B鎖
Thread-16 拿到B鎖
Thread-16 拿到A鎖
Thread-18 拿到A鎖
Thread-18 拿到B鎖
Thread-18 拿到B鎖
Thread-18 拿到A鎖
Thread-20 拿到A鎖
Thread-20 拿到B鎖
Thread-20 拿到B鎖
Thread-20 拿到A鎖
Thread-5 拿到A鎖
Thread-5 拿到B鎖
Thread-5 拿到B鎖
Thread-5 拿到A鎖
Thread-9 拿到A鎖
Thread-9 拿到B鎖
Thread-9 拿到B鎖
Thread-9 拿到A鎖
Thread-13 拿到A鎖
Thread-13 拿到B鎖
Thread-13 拿到B鎖
Thread-13 拿到A鎖
Thread-17 拿到A鎖
Thread-17 拿到B鎖
Thread-17 拿到B鎖
Thread-17 拿到A鎖
Thread-3 拿到A鎖
Thread-3 拿到B鎖
Thread-3 拿到B鎖
Thread-3 拿到A鎖
Thread-11 拿到A鎖
Thread-11 拿到B鎖
Thread-11 拿到B鎖
Thread-11 拿到A鎖
Thread-19 拿到A鎖
Thread-19 拿到B鎖
Thread-19 拿到B鎖
Thread-19 拿到A鎖
Thread-15 拿到A鎖
Thread-15 拿到B鎖
Thread-15 拿到B鎖
Thread-15 拿到A鎖
Thread-7 拿到A鎖
Thread-7 拿到B鎖
Thread-7 拿到B鎖
Thread-7 拿到A鎖

 

4、信號量Semaphore

  1.解釋

Semaphore管理一個內置的計數器,初始化的時候有一個值;
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;

計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

  2.與進程池Pool的區別

與進程池是徹底不一樣的概念:
    進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的;
    而信號量是產生一堆線程/進程,可是同時只有n個線程能夠得到semaphore並執行;

  3.代碼示例

    - 第一種寫法

from threading import Semaphore,Thread
import time
def work(num):
    with sem:
        time.sleep(2)
        print('%s say hello' % num)

if __name__ == '__main__':
    sem = Semaphore(5)
    for i in range(20):
        t = Thread(target=work,args=(i,))
        t.start()

    - 第二種寫法

import threading
import time
semaphore = threading.Semaphore(5)

def func():
    if semaphore.acquire():
        print (threading.currentThread().getName() + ' get semaphore')
        time.sleep(2)
        semaphore.release()

for i in range(20):
  t1 = threading.Thread(target=func)
  t1.start()

 

5、事件Event

  1.解釋

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。
若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,
這時線程同步問題就 會變得很是棘手。

爲了解決這些問題,咱們須要使用threading庫中的Event對象。 
對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。

在初始狀況下,Event對象中的信號標誌被設置爲假。
若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,
那麼這個線程將會被一直阻塞直至該標誌爲真。

一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。
若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行;

  2.幾種用法

event.isSet():返回event的狀態值;

event.wait() :若是 event.isSet()==False將阻塞線程;

event.set()  :設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。

  

 

  3.代碼示例

    - 鏈接MySQL示例

import time
import threading
from threading import Event,Thread
def conn_mysql():
    print('%s waiting...' % threading.current_thread().getName())
    e.wait()
    print('%s start to connect mysql...' % threading.current_thread().getName())
    time.sleep(1)
def check_mysql():
    print('%s checking...' % threading.current_thread().getName())
    time.sleep(2)
    e.set()

if __name__ == '__main__':
    e = Event()
    c1 = Thread(target=conn_mysql)
    c2 = Thread(target=conn_mysql)
    c3 = Thread(target=conn_mysql)
    s = Thread(target=check_mysql)
    c1.start()
    c2.start()
    c3.start()
    s.start()

---
Thread-1 waiting...
Thread-2 waiting...
Thread-3 waiting...
Thread-4 checking...
Thread-3 start to connect mysql...
Thread-1 start to connect mysql...
Thread-2 start to connect mysql...

    - 鏈接redis示例

例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務;
通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。
若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,
那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:
    主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()
    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()
    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()

---
(t1        ) Waiting for redis ready...
(t2        ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t2        ) redis ready, and connect to redis server and do some work [Tue Jul  4 00:33:41 2017]
(t1        ) redis ready, and connect to redis server and do some work [Tue Jul  4 00:33:41 2017]

  4.補充:Event.wait(參數)

threading.Event的wait方法還接受一個超時參數:
默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去;
而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回;

對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務;
咱們就能夠經過設置這個超時參數來達成這樣的目的;
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    while not event.is_set():
        print('\033[44m%s 等待鏈接mysql。。。\033[0m' %threading.current_thread().getName())
        event.wait(0.5)
    print('\033[45mMysql初始化成功,%s開始鏈接。。。\033[0m' %threading.current_thread().getName())
def check_mysql():
    print('\033[41m正在檢查mysql。。。\033[0m')
    time.sleep(random.randint(1,3))
    event.set()
    time.sleep(random.randint(1,3))

if __name__ == '__main__':
    event=Event()
    t1=Thread(target=conn_mysql)
    t2=Thread(target=conn_mysql)
    t3=Thread(target=check_mysql)
    t1.start()
    t2.start()
    t3.start()

---
Thread-1 等待鏈接mysql。。。
Thread-2 等待鏈接mysql。。。
正在檢查mysql。。。
Thread-2 等待鏈接mysql。。。
Thread-1 等待鏈接mysql。。。
Thread-1 等待鏈接mysql。。。
Thread-2 等待鏈接mysql。。。
Thread-2 等待鏈接mysql。。。
Thread-1 等待鏈接mysql。。。
Thread-1 等待鏈接mysql。。。
Thread-2 等待鏈接mysql。。。
Thread-1 等待鏈接mysql。。。
Thread-2 等待鏈接mysql。。。
Mysql初始化成功,Thread-2開始鏈接。。。
Mysql初始化成功,Thread-1開始鏈接。。。

 

6、定時器Timer

  1.定時器,指定n秒後執行某操做;

  2.代碼示例

from threading import Timer
def hello(name):
    print('%s say hello' % name)
t = Timer(3,hello,args=('standby',))
t.start()

 

7、線程隊列

線程隊列queue:使用import queue,用法與進程Queue同樣

queue is especially useful in threaded programming 
when information must be exchanged safely between multiple threads.

  1. queue.Queue()   先進先出

  1 class Queue:
  2     '''Create a queue object with a given maximum size.
  3 
  4     If maxsize is <= 0, the queue size is infinite.
  5     '''
  6 
  7     def __init__(self, maxsize=0):
  8         self.maxsize = maxsize
  9         self._init(maxsize)
 10 
 11         # mutex must be held whenever the queue is mutating.  All methods
 12         # that acquire mutex must release it before returning.  mutex
 13         # is shared between the three conditions, so acquiring and
 14         # releasing the conditions also acquires and releases mutex.
 15         self.mutex = threading.Lock()
 16 
 17         # Notify not_empty whenever an item is added to the queue; a
 18         # thread waiting to get is notified then.
 19         self.not_empty = threading.Condition(self.mutex)
 20 
 21         # Notify not_full whenever an item is removed from the queue;
 22         # a thread waiting to put is notified then.
 23         self.not_full = threading.Condition(self.mutex)
 24 
 25         # Notify all_tasks_done whenever the number of unfinished tasks
 26         # drops to zero; thread waiting to join() is notified to resume
 27         self.all_tasks_done = threading.Condition(self.mutex)
 28         self.unfinished_tasks = 0
 29 
 30     def task_done(self):
 31         '''Indicate that a formerly enqueued task is complete.
 32 
 33         Used by Queue consumer threads.  For each get() used to fetch a task,
 34         a subsequent call to task_done() tells the queue that the processing
 35         on the task is complete.
 36 
 37         If a join() is currently blocking, it will resume when all items
 38         have been processed (meaning that a task_done() call was received
 39         for every item that had been put() into the queue).
 40 
 41         Raises a ValueError if called more times than there were items
 42         placed in the queue.
 43         '''
 44         with self.all_tasks_done:
 45             unfinished = self.unfinished_tasks - 1
 46             if unfinished <= 0:
 47                 if unfinished < 0:
 48                     raise ValueError('task_done() called too many times')
 49                 self.all_tasks_done.notify_all()
 50             self.unfinished_tasks = unfinished
 51 
 52     def join(self):
 53         '''Blocks until all items in the Queue have been gotten and processed.
 54 
 55         The count of unfinished tasks goes up whenever an item is added to the
 56         queue. The count goes down whenever a consumer thread calls task_done()
 57         to indicate the item was retrieved and all work on it is complete.
 58 
 59         When the count of unfinished tasks drops to zero, join() unblocks.
 60         '''
 61         with self.all_tasks_done:
 62             while self.unfinished_tasks:
 63                 self.all_tasks_done.wait()
 64 
 65     def qsize(self):
 66         '''Return the approximate size of the queue (not reliable!).'''
 67         with self.mutex:
 68             return self._qsize()
 69 
 70     def empty(self):
 71         '''Return True if the queue is empty, False otherwise (not reliable!).
 72 
 73         This method is likely to be removed at some point.  Use qsize() == 0
 74         as a direct substitute, but be aware that either approach risks a race
 75         condition where a queue can grow before the result of empty() or
 76         qsize() can be used.
 77 
 78         To create code that needs to wait for all queued tasks to be
 79         completed, the preferred technique is to use the join() method.
 80         '''
 81         with self.mutex:
 82             return not self._qsize()
 83 
 84     def full(self):
 85         '''Return True if the queue is full, False otherwise (not reliable!).
 86 
 87         This method is likely to be removed at some point.  Use qsize() >= n
 88         as a direct substitute, but be aware that either approach risks a race
 89         condition where a queue can shrink before the result of full() or
 90         qsize() can be used.
 91         '''
 92         with self.mutex:
 93             return 0 < self.maxsize <= self._qsize()
 94 
 95     def put(self, item, block=True, timeout=None):
 96         '''Put an item into the queue.
 97 
 98         If optional args 'block' is true and 'timeout' is None (the default),
 99         block if necessary until a free slot is available. If 'timeout' is
100         a non-negative number, it blocks at most 'timeout' seconds and raises
101         the Full exception if no free slot was available within that time.
102         Otherwise ('block' is false), put an item on the queue if a free slot
103         is immediately available, else raise the Full exception ('timeout'
104         is ignored in that case).
105         '''
106         with self.not_full:
107             if self.maxsize > 0:
108                 if not block:
109                     if self._qsize() >= self.maxsize:
110                         raise Full
111                 elif timeout is None:
112                     while self._qsize() >= self.maxsize:
113                         self.not_full.wait()
114                 elif timeout < 0:
115                     raise ValueError("'timeout' must be a non-negative number")
116                 else:
117                     endtime = time() + timeout
118                     while self._qsize() >= self.maxsize:
119                         remaining = endtime - time()
120                         if remaining <= 0.0:
121                             raise Full
122                         self.not_full.wait(remaining)
123             self._put(item)
124             self.unfinished_tasks += 1
125             self.not_empty.notify()
126 
127     def get(self, block=True, timeout=None):
128         '''Remove and return an item from the queue.
129 
130         If optional args 'block' is true and 'timeout' is None (the default),
131         block if necessary until an item is available. If 'timeout' is
132         a non-negative number, it blocks at most 'timeout' seconds and raises
133         the Empty exception if no item was available within that time.
134         Otherwise ('block' is false), return an item if one is immediately
135         available, else raise the Empty exception ('timeout' is ignored
136         in that case).
137         '''
138         with self.not_empty:
139             if not block:
140                 if not self._qsize():
141                     raise Empty
142             elif timeout is None:
143                 while not self._qsize():
144                     self.not_empty.wait()
145             elif timeout < 0:
146                 raise ValueError("'timeout' must be a non-negative number")
147             else:
148                 endtime = time() + timeout
149                 while not self._qsize():
150                     remaining = endtime - time()
151                     if remaining <= 0.0:
152                         raise Empty
153                     self.not_empty.wait(remaining)
154             item = self._get()
155             self.not_full.notify()
156             return item
157 
158     def put_nowait(self, item):
159         '''Put an item into the queue without blocking.
160 
161         Only enqueue the item if a free slot is immediately available.
162         Otherwise raise the Full exception.
163         '''
164         return self.put(item, block=False)
165 
166     def get_nowait(self):
167         '''Remove and return an item from the queue without blocking.
168 
169         Only get an item if one is immediately available. Otherwise
170         raise the Empty exception.
171         '''
172         return self.get(block=False)
173 
174     # Override these methods to implement other queue organizations
175     # (e.g. stack or priority queue).
176     # These will only be called with appropriate locks held
177 
178     # Initialize the queue representation
179     def _init(self, maxsize):
180         self.queue = deque()
181 
182     def _qsize(self):
183         return len(self.queue)
184 
185     # Put a new item in the queue
186     def _put(self, item):
187         self.queue.append(item)
188 
189     # Get an item from the queue
190     def _get(self):
191         return self.queue.popleft()
queue.Queue()類屬性和方法

  2.queue.LifoQueue()   先進後出(堆棧)

 1 class LifoQueue(Queue):
 2     '''Variant of Queue that retrieves most recently added entries first.'''
 3 
 4     def _init(self, maxsize):
 5         self.queue = []
 6 
 7     def _qsize(self):
 8         return len(self.queue)
 9 
10     def _put(self, item):
11         self.queue.append(item)
12 
13     def _get(self):
14         return self.queue.pop()
queue.LifoQueue()類屬性和方法

  3.queue.PriorityQueue()   按照優先級出隊列

 1 class PriorityQueue(Queue):
 2     '''Variant of Queue that retrieves open entries in priority order (lowest first).
 3 
 4     Entries are typically tuples of the form:  (priority number, data).
 5     '''
 6 
 7     def _init(self, maxsize):
 8         self.queue = []
 9 
10     def _qsize(self):
11         return len(self.queue)
12 
13     def _put(self, item):
14         heappush(self.queue, item)
15 
16     def _get(self):
17         return heappop(self.queue)
queue.PriorityQueue()類屬性和方法

    - queue.PriorityQueue()示例

from queue import PriorityQueue

q = PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'beijing'))
q.put((10,('hello','liu',)))
q.put((30,{'k1':'v1'}))

print(q.get())
print(q.get())
print(q.get())

---結果---
(10, ('hello', 'liu'))
(20, 'beijing')
(30, {'k1': 'v1'})

 

8、協程

1 協程:單線程下的併發,又稱微線程;用戶態的輕量級線程;
2 > 相似 yield
3     在用戶代碼級別上實現保存狀態,並切換到同線程其餘任務去執行;
4     要實現協程,關鍵在於用戶程序本身控制程序切換,
5     切換以前必須由用戶程序本身保存協程上一次調用時的狀態,
6     如此,每次從新調用時,可以從上次的位置繼續執行
課上記的筆記

  1.什麼是協程?

協程是單線程下的併發,又稱微線程,纖程。英文名Coroutine;
即:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的;


須要強調的是:
  1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其餘線程運行)

  2. 單線程內開啓協程,一旦遇到io,從應用程序級別(而非操做系統)控制切換;

對比操做系統控制線程的切換,用戶在單線程內控制協程的切換,優勢以下:
  1.  協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級;

  2. 單線程內就能夠實現併發的效果,最大限度地利用cpu;


要實現協程,關鍵在於用戶程序本身控制程序切換;
切換以前必須由用戶程序本身保存協程上一次調用時的狀態;
如此,每次從新調用時,可以從上次的位置繼續執行


詳細的:
協程擁有本身的寄存器上下文和棧。
協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧;

  2.協程函數yield

    - 示例1:

# 不用yield:每次函數調用,都須要重複開闢內存空間,即重複建立名稱空間,於是開銷很大
import time
def consumer(item):
    # print(item)
    x=1212
    b=12121212133435
    c=999999999
    str='xsxhaxhalxalxalxmalx'
    str2='zsxhaaaaaxhalx121alxalxmalx'
    str3='sxh1212axwqwqhalxalxalxmalx'
    str4='szzzzxhaxhalxalxsa111alxmalx'
    pass
def producer(target,seq):
    for item in seq:
        target(item)  #每次調用函數,會臨時產生名稱空間,調用結束則釋放,循環100000000次,則重複這麼屢次的建立和釋放,開銷很是大


start_time = time.time()
producer(consumer,range(100000000))
stop_time = time.time()
print('Run time is : %s' % (stop_time-start_time))

# Run time is : 17.020999908447266

    - 示例2:

# 使用yield:無需重複開闢內存空間,即重複建立名稱空間,於是開銷小
import time
def consumer():
    x=1212
    b=12121212133435
    c=999999999
    str='xsxhaxhalxalxalxmalx'
    str2='zsxhaaaaaxhalx121alxalxmalx'
    str3='sxh1212axwqwqhalxalxalxmalx'
    str4='szzzzxhaxhalxalxsa111alxmalx'
    while True:
        item = yield
        # print(item)
        pass
def producer(target,seq):
    for item in seq:
        target.send(item)  # 無需從新建立名稱空間,從上一次暫停的位置繼續,相比上例,開銷小

g = consumer()
next(g)
start_time = time.time()
producer(g,range(100000000))
stop_time = time.time()
print('Run time is : %s' % (stop_time-start_time))

# Run time is : 12.491999864578247

    - 協程的缺點

缺點:
1.協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程

2.協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

    - 協程的定義

協程的定義(知足1,2,3就可稱爲協程):

1.必須在只有一個單線程裏實現併發
2.修改共享數據不需加鎖
3.用戶程序裏本身保存多個控制流的上下文棧
4.附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))



另外:
yield切換在沒有io的狀況下或者沒有重複開闢內存空間的操做,對效率沒有什麼提高,甚至更慢;

  3.greenlet模塊

    - 1.greenlet介紹

greenlet是一個用C實現的協程模塊;

相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator;

    - 2.示例1:

from greenlet import greenlet
def glet1():
    print('test1,first')
    gr2.switch()
    print('test1,sencod')
    gr2.switch()
def glet2():
    print('test2,111111111')
    gr1.switch()
    print('test2,222222222')
if __name__ == '__main__':
    gr1=greenlet(glet1)
    gr2=greenlet(glet2)
    gr1.switch()

---結果---
test1,first
test2,111111111
test1,sencod
test2,222222222

    - 2.示例:傳參數

from greenlet import greenlet
def eat(name):
    print('%s eat food 1' %name)
    gr2.switch('liu')
    print('%s eat food 2' %name)
    gr2.switch()
def play_phone(name):
    print('%s play 1' %name)
    gr1.switch()
    print('%s play 2' %name)

gr1=greenlet(eat)
gr2=greenlet(play_phone)
gr1.switch(name='standby')  #能夠在第一次switch時傳入參數,之後都不須要

---結果---
standby eat food 1
liu play 1
standby eat food 2
liu play 2

    - 3.greenlet只是提供了一種比generator更加便捷的切換方式,仍然是沒有解決遇到IO自動切換的問題

  4.gevent模塊

Python經過yield提供了對協程的基本支持,可是不徹底。而第三方的gevent爲Python提供了比較完善的協程支持。

gevent是第三方庫,經過greenlet實現協程,其基本思想是:
1.當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。
2.因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
3.因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:

    - 建立協程對象:g1=gevent.spawn();

    - spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的;

    - 示例1:沒有IO阻塞的狀況,能夠看到兩個協程對象是順序執行,沒有進行切換(由於沒有I/O相關操做);

import gevent
import os,threading
def eat(name):
    print('%s eat food first, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    print('%s eat food second, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
def play(name):
    print('%s play phone 1 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    print('%s play phone 2 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
g1 = gevent.spawn(eat,'standby')   # 建立第一個協程對象,要執行的是eat函數,eat函數的參數是 'standby'
g2 = gevent.spawn(play,'liu')      # 建立第二個協程對象...
g1.join()
g2.join()
print('主線程,PID:%s, Name: %s' % (os.getpid(),threading.current_thread().getName()))

---結果---
standby eat food first, pid: 7132, mem: <Greenlet at 0x1105d58: eat('standby')>
standby eat food second, pid: 7132, mem: <Greenlet at 0x1105d58: eat('standby')>
liu play phone 1 pid: 7132, mem: <Greenlet at 0x1105f20: play('liu')>
liu play phone 2 pid: 7132, mem: <Greenlet at 0x1105f20: play('liu')>
主線程,PID:7132, Name: MainThread

    - 示例2:在eat函數和play函數里加入的 gevent.sleep() 模擬I/O操做,能夠看到進行的切換;

import gevent
import os,threading
def eat(name):
    print('%s eat food first, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    gevent.sleep(1)
    print('%s eat food second, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
def play(name):
    print('%s play phone 1 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    gevent.sleep(3)
    print('%s play phone 2 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
g1 = gevent.spawn(eat,'standby')
g2 = gevent.spawn(play,'liu')
g1.join()
g2.join()
print('主線程,PID:%s, Name: %s' % (os.getpid(),threading.current_thread().getName()))

---結果---
standby eat food first, pid: 1216, mem: <Greenlet at 0x10f3d58: eat('standby')>
liu play phone 1 pid: 1216, mem: <Greenlet at 0x10f3f20: play('liu')>
standby eat food second, pid: 1216, mem: <Greenlet at 0x10f3d58: eat('standby')>
liu play phone 2 pid: 1216, mem: <Greenlet at 0x10f3f20: play('liu')>
主線程,PID:1216, Name: MainThread

    - 示例3:給gevent模塊以外的IO操做打補丁

# 不打補丁的狀況
import gevent
import os,threading,time
def eat(name):
    print('%s eat food first, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    time.sleep(1)
    print('%s eat food second, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
def play(name):
    print('%s play phone 1 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    time.sleep(2)
    print('%s play phone 2 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
g1 = gevent.spawn(eat,'standby')
g2 = gevent.spawn(play,'liu')
g1.join()
g2.join()
print('主線程,PID:%s, Name: %s' % (os.getpid(),threading.current_thread().getName()))

---結果---
standby eat food first, pid: 6292, mem: <Greenlet at 0x1134d58: eat('standby')>
standby eat food second, pid: 6292, mem: <Greenlet at 0x1134d58: eat('standby')>
liu play phone 1 pid: 6292, mem: <Greenlet at 0x1134f20: play('liu')>
liu play phone 2 pid: 6292, mem: <Greenlet at 0x1134f20: play('liu')>
主線程,PID:6292, Name: MainThread


# 打補丁的以後
import gevent
from gevent import monkey;monkey.patch_all() # 給gevent模塊以外的IO操做打補丁
import os,threading,time
def eat(name):
    print('%s eat food first, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    time.sleep(1)
    print('%s eat food second, pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
def play(name):
    print('%s play phone 1 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
    time.sleep(2)
    print('%s play phone 2 pid: %s, mem: %s' % (name,os.getpid(),gevent.getcurrent()))
g1 = gevent.spawn(eat,'standby')
g2 = gevent.spawn(play,'liu')
g1.join()
g2.join()
print('主線程,PID:%s, Name: %s' % (os.getpid(),threading.current_thread().getName()))

---結果---
standby eat food first, pid: 6804, mem: <Greenlet at 0x2c215a0: eat('standby')>
liu play phone 1 pid: 6804, mem: <Greenlet at 0x2c21768: play('liu')>
standby eat food second, pid: 6804, mem: <Greenlet at 0x2c215a0: eat('standby')>
liu play phone 2 pid: 6804, mem: <Greenlet at 0x2c21768: play('liu')>
主線程,PID:6804, Name: MainThread

    - 應用1:單線程爬去網頁內容

      - 示例1:沒有使用gevent的狀況

import requests
import time
def get_page(url):
    print('Get page: %s' % url)
    response = requests.get(url)
    if 200 == response.status_code:
        print(response.text)
start_time = time.time()
get_page('https://www.python.org')
get_page('https://www.yahoo.com')
get_page('https://www.github.com')
get_page('https://www.baidu.com/')
get_page('https://www.stanford.edu/')
get_page('http://www.hitwh.edu.cn/')
stop_time = time.time()
print('Run time is : %s' % (stop_time-start_time))

# Run time is : 6.61899995803833

      - 示例2:使用gevent實現的協程併發

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
    print('Get page: %s' % url)
    response = requests.get(url)
    if 200 == response.status_code:
        print(response.text)
start_time = time.time()
g1 = gevent.spawn(get_page,url='https://www.python.org')
g2 = gevent.spawn(get_page,url='https://www.yahoo.com')
g3 = gevent.spawn(get_page,url='https://www.github.com')
g4 = gevent.spawn(get_page,url='https://www.baidu.com/')
g5 = gevent.spawn(get_page,url='https://www.stanford.edu/')
g6 = gevent.spawn(get_page,url='http://www.hitwh.edu.cn/')
gevent.joinall([g1,g2,g3,g4,g5,g6])
stop_time = time.time()
print('Run time is : %s' % (stop_time-start_time))

# Run time is : 3.629999876022339

  

from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊以前;

或者咱們乾脆記憶成:要用gevent,須要將from gevent import monkey;monkey.patch_all()放到文件的開頭;

    - 應用2:經過gevent實現單線程下(多協程)的socket併發;

# server端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
def server(ip,port):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind((ip,port))
    s.listen(5)
    while True:  #鏈接循環
        conn, addr = s.accept()
        print('client',addr)
        gevent.spawn(talk,conn,addr)
def talk(conn,addr):
    try:
        while True:  #通訊循環
            res=conn.recv(1024)
            if not res:
                break
            print('client %s:%s msg:%s' %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print('與 ', addr, ' 的通訊循環發生的異常:%s' % e)
    finally:
        conn.close()

if __name__ == '__main__':
    server('127.0.0.1', 8090)
# client端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8090))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

    - 模擬500個client同時請求,單線程下開啓gevent協程實現併發徹底能夠支撐

from threading import Thread
from socket import *
import threading

def client(server_ip,port):
    c=socket(AF_INET,SOCK_STREAM)
    c.connect((server_ip,port))
    count=0
    while True:
        c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
        msg=c.recv(1024)
        print(msg.decode('utf-8'))
        count+=1
if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client,args=('127.0.0.1',8090))
        t.start()

 

9、socketserver

基於tcp的套接字,關鍵就是兩個循環,一個連接循環,一個通訊循環

socketserver模塊中分兩大類:server類(解決連接問題)和request類(解決通訊問題)

  server類:

  request類:

   繼承關係:

   - 簡單的代碼示:基於TCP的多線程socketserver

# server端
import socketserver
class MyHandler(socketserver.BaseRequestHandler): #通信循環
    def handle(self):
        while True:
            try:
                res=self.request.recv(1024)
                print('client %s msg:%s' %(self.client_address,res))
                self.request.send(res.upper())
            except Exception as e:
                print('與 ', self.client_address, ' 的通訊循環發生的異常:%s' % e)
                break

if __name__ == '__main__':
    s=socketserver.ThreadingTCPServer(('127.0.0.1',8090),MyHandler)
    s.serve_forever() #連接循環

# client端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8090))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

  - 詳細關係請參照源碼;

 

10、基於udp的socket

  - 參考:http://www.cnblogs.com/linhaifeng/articles/6129246.html#_label8

  - 因爲udp無鏈接,因此能夠同時多個客戶端去跟服務端通訊;

# Part1
發消息,都是將數據發送到己端的發送緩衝中,收消息都是從己端的緩衝區中收;
只有TCP有粘包現象,UDP永遠不會粘包;

1. tcp:send發消息,recv收消息
2. udp:sendto發消息,recvfrom收消息

# Part2
send與sendinto
tcp是基於數據流的,而udp是基於數據報的:
send(bytes_data)
發送數據流,數據流bytes_data若爲空,本身這段的緩衝區也爲空,操做系統不會控制tcp協議發空包;
sendinto(bytes_data,ip_port)
發送數據報,bytes_data爲空,還有ip_port,
全部即使是發送空的bytes_data,數據報其實也不是空的,本身這端的緩衝區收到內容,操做系統就會控制udp協議發包;

# Part3

單獨udp的客戶端,發現並不會報錯,相反tcp卻會報錯;
由於udp協議只負責把包發出去,對方收不收,我根本無論;
而tcp是基於連接的,必須有一個服務端先運行着,
客戶端去跟服務端創建連接而後依託於連接才能傳遞消息,任何一方試圖把連接摧毀都會致使對方程序的崩潰;

udp程序:服務端有幾個recvfrom就要對應幾個sendinto;


TCP 可靠:     有鏈接,發完消息,對方回一個ack以後,纔會清空本地的緩存區;
UDP 不可靠:    無鏈接,發送消息,不須要對方回一個ack;

  - 基於UDP的socket,使用 recvfrom 和 sendto來收發消息;

# server端
from socket import *
s=socket(AF_INET,SOCK_DGRAM)
s.bind(('127.0.0.1',8090))

while True:
    client_msg,client_addr=s.recvfrom(1024)
    print('Client: %s' % client_msg.decode('utf-8'))
    s.sendto(client_msg.upper(),client_addr)

# client端
from socket import *
c=socket(AF_INET,SOCK_DGRAM)

while True:
    msg=input('>>: ').strip()
    c.sendto(msg.encode('utf-8'),('127.0.0.1',8090))
    server_msg,server_addr=c.recvfrom(1024)
    print('Server:%s, Response: %s' %(server_addr,server_msg.decode('utf-8')))

  - 基於UDP的socketserver

# server端
import socketserver

class MyUDPhandler(socketserver.BaseRequestHandler):
    def handle(self):
        client_msg,s=self.request
        print('Client addr: %s, Msg: %s' % (self.client_address,client_msg.decode('utf-8')))
        s.sendto(client_msg.upper(),self.client_address)

if __name__ == '__main__':
    s=socketserver.ThreadingUDPServer(('127.0.0.1',8090),MyUDPhandler)
    s.serve_forever()

# client端
from socket import *
c=socket(AF_INET,SOCK_DGRAM)

while True:
    msg=input('>>: ').strip()
    c.sendto(msg.encode('utf-8'),('127.0.0.1',8090))
    server_msg,server_addr=c.recvfrom(1024)
    print('Server:%s, Response: %s' %(server_addr,server_msg.decode('utf-8')))

 

11、I/O多路複用

參考1:python中的IO多路複用

 

12、練習

要求:

基於socketserver的ftp程序,支持多併發;

代碼實現:

 1 ftp
 2 ├─bin
 3 │  │_ run.py
 4  5 ├─conf
 6 │  │_ config.py
 7  8 ├─db
 9 │  ├─accounts
10 │  │  ├─alex
11 │  │  │      .pwd
12 │  │  │      1.txt
13 │  │  │      123.txt
14 │  │  │      a.txt
15 │  │  │      b.txt
16 │  │  │      haha.py
17 │  │  │
18 │  │  ├─liu
19 │  │  │      .pwd
20 │  │  │      1.txt
21 │  │  │      123.txt
22 │  │  │      a.txt
23 │  │  │      b.txt
24 │  │  │      coding02.py
25 │  │  │
26 │  │  └─xin
27 │  │          .pwd
28 │  │          1.txt
29 │  │          a.txt
30 │  │          b.txt
31 │  │
32 │  └─public
33 │          1.txt
34 │          123.txt
35 │          666.txt
36 │          a.txt
37 │          b.txt
38 │          coding02.py
39 │          haha.py
40 │          本節內容
41 │          課前回顧
42 43 └─src
44     │  common.py
45     │  ftp_server.py
46     │_ user.py
代碼結構

 具體代碼:

#!/usr/bin/python3
# -*- coding:utf-8 -*-

# config.py

PWD_FILE = '.pwd'
BASE_DIR = r"D:\soft\work\Python_17\day10\hoemwork\db\accounts"
PUBLIC_DIR = r"D:\soft\work\Python_17\day10\hoemwork\db\public"
DONE = b'file_send_done'
PUT_OK = b'PUT_OK'
PUT_ERR = b'PUT_ERR'
TOTAL_AVAILABLE_SIZE = 1000000

INFO = """
=======================
Welcome to ftp system
1.Login.
2.Register new account.
=======================
"""
FTP_INFO = """
1.List all the file you possessed.
2.Show available space.
3.List all public file.
4.Get/Put one file.
5.Increse your capacity.
"""

  

#!/usr/bin/python3
# -*- coding:utf-8 -*-

# run.py

import os,sys,time,pickle,getpass,json,struct,socket,hashlib
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from conf.config import *
from src.common import *
from src.user import User

def ftp_run(user_obj):
    while True:
        cmd_dict = {
            'Q':bye,
            '1':show_my_files,
            '2':show_my_available_space,
            '3':show_all_files,
            '4':ftp_option,
            '5':increse_my_capacity,
        }
        print(FTP_INFO)
        option = input('Input your choice, q/Q to exit>>>\t').strip().upper()
        if option not in cmd_dict:
            print('Input invalid, bye...')
        else:
            cmd_dict[option](user_obj)

# start here...
print(INFO)
option = input('Input option number>>>\t').strip()
if option.isdigit() and 1 == int(option):
    user_name = input('Input your name>>>\t').strip()
    user_list = os.listdir(BASE_DIR)
    if user_name not in user_list:
        print('No user: %s exist.' % user_name)
        exit(2)
    user_obj_file = r"%s%s%s%s%s" % (BASE_DIR,os.sep,user_name,os.sep,PWD_FILE)
    user_obj = pickle.load(open(user_obj_file,'rb'))
    user_pwd = getpass.getpass('Input your passwd>>>\t').strip()
    if user_pwd == user_obj.passwd:
        print('\nWelcome %s' % user_obj.name)
        print('Your leave space is %sbytes.' % user_obj.available_space)
        ftp_run(user_obj)
    else:
        print('Password is incorrect')
        exit(2)

elif option.isdigit() and 2 == int(option):
    name = input('Input your name>>>\t').strip()
    pwd = getpass.getpass('Input your passwd>>>\t').strip()
    capacity = input('Input your capacity, unit is Byte(default:1000000)>>>\t').strip()
    if not capacity:
        capacity = TOTAL_AVAILABLE_SIZE
    elif not capacity.isdigit():
        print('Capacity input invalid.')
        exit(2)
    user_list = os.listdir(BASE_DIR)
    user = User(name, pwd, capacity)
    if user.name not in user_list:
        user.save()
        print('%s created successfully' % user.name)
    else:
        print('%s already exist...' % user.name)
else:
    print('Input invalid.')

  

#!/usr/bin/python3
# -*- coding:utf-8 -*-

# ftp_server.py

'''
1.server run and listen...
2.client conn
3.client send cmd
4.server recv and analyse
5.start to transmission
'''

import os,sys,time,json,struct,hashlib
import socketserver
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from conf.config import *

class Ftp_Server:
    def __init__(self,ip,port):
        self.ip = ip
        self.port = port
    @staticmethod
    def get(conn,filename):
        print('Start to download the %s' % filename)
        public_file_list = Ftp_Server.get_all_public_file()
        # 判斷文件是否存在
        if filename not in public_file_list:
            print("%s does't exist, exit." % filename)
            file_dict = {
                'flag': False,
                'filename': filename,
                'hash_value': None,
                'file_total_size': None
            }
            file_json = json.dumps(file_dict)
            file_byte = file_json.encode('utf-8')
            conn.send(struct.pack('i', len(file_byte)))
            conn.send(file_byte)
            return
        # 先傳輸文件的屬性:文件大小、文件hash值;
        file_total_size = os.path.getsize(r'%s%s%s' % (PUBLIC_DIR,os.sep,filename))
        with open(r'%s%s%s' % (PUBLIC_DIR,os.sep,filename),mode='rb') as rf:
            md5_obj = hashlib.md5()
            md5_obj.update(rf.read())
            file_hash = md5_obj.hexdigest()
        file_dict = {
            'flag': True,
            'filename': filename,
            'hash_value': file_hash,
            'file_total_size': file_total_size
        }
        file_json = json.dumps(file_dict)
        file_byte = file_json.encode('utf-8')
        conn.send(struct.pack('i', len(file_byte)))
        conn.send(file_byte)
        # 開始傳輸真正的文件內容
        with open(r'%s%s%s' % (PUBLIC_DIR,os.sep,filename),mode='rb') as rf:
            while True:
                data = rf.read(100)
                if not data:
                    time.sleep(0.1)
                    conn.send(DONE)
                    break
                else:
                    conn.send(data)
                    # print('>>>>>>>>>>>>>>>>>>>>>>>>>>>')
        print('%s download done' % filename)
        conn.close()
    @staticmethod
    def put(conn,filename):
        print('Start to upload the %s' % filename)
        # 接收 file_struct + file_dict
        file_struct = conn.recv(4)
        file_len = struct.unpack('i', file_struct)[0]
        file_byte = conn.recv(file_len)
        file_json = file_byte.decode('utf-8')
        file_dict = json.loads(file_json)
        # 循環接收 file_byte 並寫入到文件
        with open(r'%s%s%s' % (PUBLIC_DIR,os.sep,filename), mode='wb') as wf:
            data = conn.recv(100)
            while True:
                # print(data)
                if DONE == data:
                    break
                wf.write(data)
                data = conn.recv(100)
        # 獲取並比較文件大小和md5值
        recv_file_total_size = os.path.getsize(r'%s%s%s' % (PUBLIC_DIR,os.sep,filename))
        with open(r'%s%s%s' % (PUBLIC_DIR,os.sep,filename), mode='rb') as rf:
            md5_obj = hashlib.md5()
            md5_obj.update(rf.read())
        recv_file_hash = md5_obj.hexdigest()
        if recv_file_hash == file_dict['hash_value'] and recv_file_total_size == file_dict['file_total_size']:
            conn.send(PUT_OK)
        else:
            conn.send(PUT_ERR)
        print('%s upload done.' % filename)
        conn.close()

    @classmethod
    def get_all_public_file(cla):
        return os.listdir(PUBLIC_DIR)
    def start(self):
        server = socketserver.ThreadingTCPServer((self.ip, self.port), MyHandler)
        server.serve_forever()  # 連接循環

class MyHandler(socketserver.BaseRequestHandler): #通信循環
    def handle(self):
        while True:
            try:
                print('Client: ', self.client_address)
                print(self.request)
                cmd_struct = self.request.recv(4)
                cmd_len = struct.unpack('i', cmd_struct)[0]
                cmd_byte = self.request.recv(cmd_len)
                cmd_json = cmd_byte.decode('utf-8')
                cmd_dict = json.loads(cmd_json)
                t = time.strftime('%Y-%m-%d %X')
                print('User: %s\tTime: %s\tCMD: %s' % (cmd_dict['user'], t, cmd_dict['cmd'] + " " + cmd_dict['filename']))
                # 反射到Ftp_Server的get/put方法
                func = getattr(Ftp_Server, cmd_dict['cmd'])
                func(self.request, cmd_dict['filename'])
            except Exception as e:
                print('與 ', self.client_address, ' 的通訊循環發生的異常:%s' % e)
            finally:
                # 客戶端完成一次傳輸,關閉鏈接
                self.request.close()
                break


if __name__ == '__main__':
    ftp_server = Ftp_Server('127.0.0.1', 8090)
    ftp_server.start() # 鏈接循環

  

#!/usr/bin/python3
# -*- coding:utf-8 -*-

# user.py

import os,pickle
from conf.config import *

class User():
    def __init__(self,name,passwd,capacity):
        self.name = name
        self.passwd = passwd
        self.capacity = capacity
    def save(self):
        user_path = r"%s%s%s" % (BASE_DIR,os.sep,self.name)
        if not os.path.exists(user_path):
            os.mkdir(user_path,700)
        pwd_file = r"%s%s%s" % (user_path, os.sep, PWD_FILE)
        pickle.dump(self, open(pwd_file, 'wb'))
    def show_all_file(self):
        file_list = os.listdir(r"%s%s%s" % (BASE_DIR, os.sep, self.name))
        print('\n%s have files below:' % self.name)
        for file in file_list:
            if file.startswith('.'):
                continue
            print(file)
    # 獲取用戶家目錄可用空間大小(單位是字節 byte)
    @property
    def available_space(self):
        used_size = 0
        path = r"%s%s%s%s" % (BASE_DIR,os.sep,self.name,os.sep)
        try:
            filename = os.walk(path)
            for root, dirs, files in filename:
                for fle in files:
                    size = os.path.getsize(path + fle)
                    used_size += size
            return int(self.capacity) - used_size
        except Exception as err:
            print(err)

 

#!/usr/bin/python3
# -*- coding:utf-8 -*-

# common.py

import os,sys,json,hashlib,struct,socket,time
from conf.config import *
from src.ftp_server import Ftp_Server

# 字節bytes轉化kb\m\g
def format_size(bytes):
    try:
        bytes = float(bytes)
        kb = bytes / 1024
    except:
        print("傳入的字節格式不對")
        return "Error"
    if kb >= 1024:
        M = kb / 1024
        if M >= 1024:
            G = M / 1024
            return "%fG" % (G)
        else:
            return "%fM" % (M)
    else:
        return "%fkb" % (kb)

def bye(user_obj = None):
    print('See you, %s' % user_obj.name)
    exit(0)

def show_my_files(user_obj = None):
    user_obj.show_all_file()

def show_my_available_space(user_obj = None):
    available_space = format_size(user_obj.available_space)
    print(available_space)

def show_all_files(user_obj = None):
    public_file_list = os.listdir(PUBLIC_DIR)
    print('==========Public file===========')
    for file in public_file_list:
        print(file)
    print('================================')

def ftp_option(user_obj = None):
    input_cmd = input('[get/put] filename>>>\t').strip()
    input_list = input_cmd.split()
    if 2 != len(input_list):
        print('Input invalid, input like this:\nget file\nput file\n')
    else:
        if hasattr(Ftp_Server, input_list[0]):
            client_to_run(user_obj, input_list)
        else:
            print('No %s option.' % input_list[0])

def increse_my_capacity(user_obj = None):
    print('Hello %s, your capacity information: %s/%s' % \
          (user_obj.name,format_size(user_obj.available_space),format_size(user_obj.capacity)))
    increse_space = input('How much do you wanna increse(byte) >>>\t').strip()
    if increse_space.isdigit():
        new_capacity = int(user_obj.capacity) + int(increse_space)
        user_obj.capacity = new_capacity
        user_obj.save()
        print('Increased successfully\nYour capacity information: %s/%s' % \
              (format_size(user_obj.available_space),format_size(user_obj.capacity)))
    else:
        print('Invalid input, must be a number.')

def client_to_run(user_obj, input_list):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('127.0.0.1', 8090))
    cmd_dict = {
        'user':user_obj.name,
        'cmd':input_list[0],
        'filename':input_list[1]
    }
    cmd_json = json.dumps(cmd_dict)
    cmd_byte = cmd_json.encode('utf-8')
    client.send(struct.pack('i', len(cmd_byte)))
    client.send(cmd_byte)
    # 從公共目錄下載文件到本身的家目錄
    if 'get' == input_list[0].lower():
        # 接收 file_struct + file_dict
        file_struct = client.recv(4)
        file_len = struct.unpack('i', file_struct)[0]
        file_byte = client.recv(file_len)
        file_json = file_byte.decode('utf-8')
        file_dict = json.loads(file_json)
        # 判斷文件是否存在
        if not file_dict['flag']:
            print("%s does't exist, exit." % file_dict['filename'])
            return
        # 判斷用戶家目錄可用空間是否大於要下載的文件大小
        if user_obj.available_space < file_dict['file_total_size']:
            print('You are have %s byte available only\n%s is %s, download failed.' % (user_obj.available_space,input_list[1],file_dict['file_total_size']))
            return
        recv_size = 0
        # 循環接收 file_real_byte 並寫入到文件
        with open(r'%s%s%s%s%s' % (BASE_DIR,os.sep,user_obj.name,os.sep,input_list[1]),mode='wb') as wf:
            data = client.recv(100)
            f = sys.stdout
            while True:
                if DONE == data:
                    break
                # print(data)
                wf.write(data)
                recv_size += len(data)
                # 設置下載進度條
                pervent = recv_size / file_dict['file_total_size']
                percent_str = "%.2f%%" % (pervent * 100)
                n = round(pervent * 60)
                s = ('#' * n).ljust(60, '-')
                f.write(percent_str.ljust(8, ' ') + '[' + s + ']')
                f.flush()
                # time.sleep(0.1)
                f.write('\r')
                data = client.recv(100)
            f.write('\n')
        recv_file_total_size = os.path.getsize(r'%s%s%s%s%s' % (BASE_DIR,os.sep,user_obj.name,os.sep,input_list[1]))
        with open(r'%s%s%s%s%s' % (BASE_DIR,os.sep,user_obj.name,os.sep,input_list[1]),mode='rb') as rf:
            md5_obj = hashlib.md5()
            md5_obj.update(rf.read())
        recv_file_hash = md5_obj.hexdigest()
        print('%s %s done.' %(input_list[0],input_list[1]))
        if recv_file_total_size == file_dict['file_total_size'] and recv_file_hash == file_dict['hash_value']:
            print('%s md5 is ok.' % input_list[1])
        else:
            print('%s md5 err.' % input_list[1])
        # print(file_dict['filename'],file_dict['hash_value'],file_dict['file_total_size'])
    # 把本身家目錄的文件上傳到公共目錄
    elif 'put' == input_list[0].lower():
        # 先判斷是否存在要上傳的文件
        if not os.path.exists(r'%s%s%s%s%s' % (BASE_DIR,os.sep,user_obj.name,os.sep,input_list[1])):
            print('%s not exist, please check.' % input_list[1])
            return
        # 先傳輸文件的屬性:文件大小、文件hash值;
        file_total_size = os.path.getsize(r'%s%s%s%s%s' % (BASE_DIR,os.sep,user_obj.name,os.sep,input_list[1]))
        with open(r'%s%s%s%s%s' % (BASE_DIR,os.sep,user_obj.name,os.sep,input_list[1]), mode='rb') as rf:
            md5_obj = hashlib.md5()
            md5_obj.update(rf.read())
            file_hash = md5_obj.hexdigest()
        file_dict = {
            'flag': True,
            'filename': input_list[1],
            'hash_value': file_hash,
            'file_total_size': file_total_size
        }
        file_json = json.dumps(file_dict)
        file_byte = file_json.encode('utf-8')
        client.send(struct.pack('i', len(file_byte)))
        client.send(file_byte)

        send_size = 0
        # 開始傳輸真正的文件內容
        with open(r'%s%s%s%s%s' % (BASE_DIR,os.sep,user_obj.name,os.sep,input_list[1]),mode='rb') as rf:
            while True:
                data = rf.read(100)
                if not data:
                    time.sleep(0.1)
                    client.send(DONE)
                    break
                client.send(data)
                # print('上傳 +1 次')
                send_size += len(data)
                # 設置上傳進度條
                f = sys.stdout
                pervent = send_size / file_dict['file_total_size']
                percent_str = "%.2f%%" % (pervent * 100)
                n = round(pervent * 60)
                s = ('#' * n).ljust(60, '-')
                f.write(percent_str.ljust(8, ' ') + '[' + s + ']')
                f.flush()
                # time.sleep(0.1)
                f.write('\r')
        f.write('\n')
        print('File upload done')
        upload_res = client.recv(1024)
        if upload_res == PUT_OK:
            print('%s upload ok.' % input_list[1])
        elif upload_res == PUT_ERR:
            print('%s upload err.' % input_list[1])
        else:
            print('ERROR: %s' % upload_res)
    client.close()

__all__ = ['format_size','bye','show_my_files','show_my_available_space',\
           'show_all_files','ftp_option','increse_my_capacity']
相關文章
相關標籤/搜索