python day11


1、線程
        基本使用
        線程鎖
        自定義線程池
       
        生產者消費者模型(隊列)
2、進程
        基本使用
        進程鎖
        進程數據共享
            默認數據不共享
            queues
            array
            Manager.dict
        進程池
   
    PS:
        IO密集型-多線程
        計算密集型 - 多進程
   
3、協程
        原理:利用一個線程,分解一個線程成爲多個「微線程」==》程序級別
        greenlet
       
        gevent
       
        pip3 install gevent
4、緩存
        一、安裝軟件
        二、程序:安裝其對應的模塊
        Socket鏈接,
        memecach
            一、天生集羣
            二、基本
            三、gets,cas
           
            k -> ""
        redis
            k -> ""
            k -> [11,11,22,33,44]
            k -> {"k1":xxx}
            k -> [11,22]
            k -> [(11,1),(13,2),]html

 


#使用多線程
import threading
def f1(arg):
    print(arg)
t = threading.Thread(target=f1,args=(123,))
t.start()python

 


import threading
from time import ctime,sleep
import time
def music(func):
    for i in range(2):
        print("我在聽--------- %s. %s" %(func,ctime()))
        time.sleep(1)git

def move(func):
    for i in range(2):
        print("我在看---------- %s! %s" %(func,ctime()))
        time.sleep(5)github

threads = []
t1 = threading.Thread(target=music,args=(u'愛情買賣',))  #把music加入到多線程
t2 = threading.Thread(target=move,args=(u'阿凡達',))     #把move加入到多線程
threads.append(t1)
threads.append(t2)redis

for i in threads:
    i.start()緩存

 

 


import threading
from time import ctime,sleep
import time
def music(func):
    for i in range(2):
        print("我在聽--------- %s. %s" %(func,ctime()))
        #time.sleep(1)服務器

def move(func):
    for i in range(2):
        print("我在看---------- %s! %s" %(func,ctime()))
        #time.sleep(5)多線程

t1 = threading.Thread(target=music,args=(u'愛情買賣',))  #把music加入到多線程
t2 = threading.Thread(target=move,args=(u'阿凡達',))     #把move加入到多線程
t1.start()
sleep(2)
t2.start()app

 

同時作兩件事
#聽歌看片一塊兒幹,想要幹兩次,此程序只幹一次,由於setDaemon不等子線程
import threading
from time import ctime,sleep
def music(func):
    for i in range(2):
        print("我在聽-------- %s. %s" %(func,ctime()))
        sleep(1)async

def move(func):
    for i in range(2):
        print("我在看-------- %s! %s" %(func,ctime()))
        sleep(5)

threads = []
t1 = threading.Thread(target=music,args=(u'愛情買賣',))  #把music加入到多線程
threads.append(t1)
t2 = threading.Thread(target=move,args=(u'阿凡達',))     #把move加入到多線程
threads.append(t2)

if __name__ == '__main__':
    for t in threads:
        t.setDaemon(True)       #子線程啓動後,父線程也繼續執行下去,當父線程執行完最後一條語句print "all over %s" %ctime()後,沒有等待子線程,直接就退出了,同時子線程也一同結束。
        t.start()               #開始啓動線程
    print("兩件事同時作完----------- %s" %ctime())

 

 

多線程完整程序,同時聽了歌又看了片
import threading
from time import ctime,sleep
def music(func):
    for i in range(2):
        print("我在聽---------- to %s. %s" %(func,ctime()))
        sleep(1)
def move(func):
    for i in range(2):
        print("我在看---------- to %s! %s" %(func,ctime()))
        sleep(5)
threads = []
t1 = threading.Thread(target=music,args=(u'牛逼存在',))
threads.append(t1)
t2 = threading.Thread(target=move,args=(u'鍾馗伏魔',))
threads.append(t2)
if __name__ == '__main__':
    for t in threads:
        t.setDaemon(True)
        t.start()
    t.join()        #等待進程終止,子程進程沒有運行完,就不執行父進程
    print("全部結束--------------------- %s" %ctime())

 

# 結果
我在聽---------- to 牛逼存在. Mon Jul 18 00:18:22 2016
我在看---------- to 鍾馗伏魔! Mon Jul 18 00:18:22 2016
我在聽---------- to 牛逼存在. Mon Jul 18 00:18:23 2016
我在看---------- to 鍾馗伏魔! Mon Jul 18 00:18:27 2016
全部結束--------------------- Mon Jul 18 00:18:32 2016

 

 

 

#自定義多線程,調用
import threading
class MyThread(threading.Thread):
    def __init__(self,func,args):
        self.func = func
        self.args = args
        super(MyThread,self).__init__()          #super繼承調用父類,解決多繼承的重複調用問題
    def run(self):                            # 自動執行run方法
        self.func(self.args)
def f2(arg):
    print(arg)
obj = MyThread(f2,123)
obj.start()

 

#結果
123

 

 

 

 

消息隊列

# queue.Queue(2) 先進先出隊列
# put放數據,是否阻塞,阻塞時的超時事件
# get取數據(默認阻塞),是否阻塞,阻塞時的超時事件
# qsize()真實個數
# maxsize 最大支持的個數
# join,task_done,阻塞進程,當隊列中任務執行完畢以後,再也不阻塞
 
q = queue.Queue(2)                       #只接收兩個隊列
print(q.empty())
q.put(11)
q.put(22)
print(q.empty())
print(q.qsize())
q.put(22)
q.put(33, block=False)
q.put(33,block=False, timeout=2)
print(q.get())
print(q.get())
print(q.get(timeout=2))

 

----結果
True
False
2                    # 阻塞了,只接收了兩個

 

 

 


import queue
# queue.Queue,先進先出隊列
# queue.LifoQueue,後進先出隊列
# queue.PriorityQueue,優先級隊列
# queue.deque,雙向對隊

#後進先出隊列
q = queue.LifoQueue()
q.put(123)                #放數據給隊列
q.put(456)
print(q.get())            #取一條數據
print(q.get())            #取第二條數據


# #優先級隊列
    #數值小的高
q = queue.PriorityQueue()
q.put((1,"alex1"))      #存數據給隊列,優化級爲1
q.put((1,"alex2"))
q.put((1,"alex3"))
q.put((3,"alex3"))
print(q.get())           #取數據

#雙向對隊
q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
q.pop()
q.popleft()

 

 

 

隊列機制

import queue

queue.Queue #先進先出隊列
queue.LifoQueue  #後隊進先出隊列
queue.PriorityQueue #優先級隊列
queue.deque #雙向對隊

q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())

q = queue.PriorityQueue()
q.put((1,"alex1"))
q.put((1,"alex2"))
q.put((1,"alex3"))
q.put((3,"alex3"))
print(q.get())

q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
q.pop()
q.popleft()

 

 

 


生產者消費者
消息隊列解決供給問題,解決阻塞,解耦

import queue
import threading
import time

q = queue.Queue()                      #先進先出隊列

def productor(arg):                      # 生產者
    """
    買票
    :param arg:
    :return:
    """
    q.put(str(arg) + '-包子')         #發包子器

for i in range(300):                  #300我的買包子
    t = threading.Thread(target=productor,args=(i,))
    t.start()

def consumer(arg):                     # 消費者
    """
    服務器後臺
    :param arg:
    :return:
    """
    while True:                         #無限生產
        print(arg,q.get())          
        time.sleep(2)
for j in range(3):                     #3我的一直生產       
    t = threading.Thread(target=consumer,args=(j,))
    t.start()

 

 


   
   
# 線程鎖

#  threading.RLock和threading.Lock


# threading.Lock   產生死鎖阻塞
import threading 
lock = threading.Lock() #Lock對象 
lock.acquire() 
lock.acquire()     #產生了死鎖。 
lock.release() 
lock.release() 

-----結果
            #一直循環不退出,由於鎖住了
           
# threading.RLock          #在同一線程內,程序不會堵塞。
import threading
rLock = threading.RLock()  #RLock對象
rLock.acquire()
rLock.acquire()            #在同一線程內,程序不會堵塞。
rLock.release()
rLock.release()

-----結果
Process finished with exit code 0      #不阻塞

 

# 上鎖和解鎖,遞歸10減到0   
import threading
import time

NUM = 10

def func(l):
    global NUM
    # 上鎖
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    # 開鎖
    l.release()
#lock = threading.Lock()
lock = threading.RLock()      

for i in range(10):
    t = threading.Thread(target=func,args=(lock,))
    t.start()


   
-----結果

 

全鎖,全放

#紅燈停,綠燈放行
import threading

def func(i,e):
    print(i)
    e.wait() # 檢測是什麼燈,若是是紅燈,停;綠燈,行
    print(i+100)

event = threading.Event()

for i in range(10):
    t = threading.Thread(target=func, args=(i,event,))
    t.start()
#========
event.clear() # 設置成紅燈
inp = input('>>>')
if inp == "1":                   #輸入1放行
    event.set() # 設置成綠燈  

   
   
   
------結果
0
1
2
3
4
5
6
7
8
9
>>>1
100
103
104
107
108
101
102
105
106
109

 

 

 


信號量,放幾個出去
import threading
import time
NUM = 10

def func(i,l):
    global NUM
    # 上鎖
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM,i)
    # 開鎖
    l.release()
#lock = threading.Lock()
#lock = threading.RLock()
lock = threading.BoundedSemaphore(5)   #信號量,放幾個出去

for i in range(10):
    t = threading.Thread(target=func,args=(i,lock,))
    t.start()


   
   
   
   
import threading
def func(i,con):
    print(i)
    con.acquire()
    con.wait()
    print(i+100)
    con.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i,c,))
    t.start()

while True:
    inp = input('>>>')
    if inp == 'q':
        break
    c.acquire()
    c.notify(int(inp))
    c.release()
   

   
   
   
import threading

def condition():
    ret = False
    r = input('>>>')
    if r == 'true':
        ret = True
    else:
        ret = False
    return ret


def func(i,con):
    print(i)
    con.acquire()
    con.wait_for(condition)
    print(i+100)
    con.release()

c = threading.Condition()
for i in range(10):
    t = threading.Thread(target=func, args=(i,c,))
    t.start()

   
   
   
   
   
   
# 1秒以後打印hello world
from threading import Timer
def hello():
    print("hello, world")

t = Timer(1, hello)      #1秒以後打印hello world
t.start()  # after 1 seconds, "hello, world" will be print

   
   
   
   
   
   
#線程池程序

import queue
import threading
import contextlib
import time

StopEvent = object()                                        #配置空值,判斷空值終止線程

class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):      #配置最大線程數和任務最大個數
        """
        初始化函數作三件事:
        1.判斷任務最大數是否爲真,真則建立隊列存任務,假則建立不受限制隊列
        2.建立當前已經建立的線程變量
        3.建立當前空閒多少線程變量
        """
        if max_task_num:
            self.q = queue.Queue(max_task_num)             #建立隊列,用來裝任務
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []                            #當前已經建立的線程
        self.free_list = []                                   #當前空閒多少線程

    def run(self, func, args, callback=None):
        """
        線程池執行一個任務
        :param func: 任務函數
        :param args: 任務函數所需參數
        :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
        :return: 若是線程池已經終止,則返回True不然None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:      #判斷沒有空閒線程和已經建立的線程小於線程池最大數,則建立線程
            self.generate_thread()     # 建立線程
        w = (func, args, callback,)    # 如下兩行把任務放入隊列   #FUNC 是函數 ARGS、CALLBACK是元組
        self.q.put(w)

    def generate_thread(self):
        """
        建立一個線程
        """
        t = threading.Thread(target=self.call)             #建立線程執行call方法(用call方法建立線程),每建立一個線程都執行call
        t.start()

    def call(self):
        """
        循環去獲取任務函數並執行任務函數
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)       #獲取已建立的線程存入列表

        event = self.q.get()                            #取任務

        while event != StopEvent:
            '''若是獲取的任務不爲空,則執行改變其狀態,循環配置每一個任務爲空閒,不然傳入的是空值,則清除線程列表'''
            func, arguments, callback = event              #是元組event即任務便執行
            try:
                result = func(*arguments)                 #傳參,執行ACTION函數,ACTION執行完是空閒
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):    #若是執行ACTION,把任務設置成空閒
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        執行完全部的任務後,全部線程中止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        不管是否還有任務,終止線程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)        # 終止前存入隊列

        self.q.empty()


    '''如下用contextlib裝飾器模塊爲記錄正在等待的線程數,用於call方法把了隊列所有讀出來,即實現裝飾器爲空'''
    @contextlib.contextmanager                                #contoextlib上下文管理,實現相似with的自動關閉機制,其實就是個with封裝
    def worker_state(self, state_list, worker_thread):
        """
        用於記錄線程中正在等待的線程數
        """
        state_list.append(worker_thread) 
        try:
            yield
        finally:
            state_list.remove(worker_thread)


pool = ThreadPool(5)                                #建立線程池

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


# 建立一個讀任務函數
def action(i):
    print(i)

for i in range(300):                                  #300個任務
    ret = pool.run(action, (i,), callback)

# time.sleep(5)
# print(len(pool.generate_list), len(pool.free_list))
# print(len(pool.generate_list), len(pool.free_list))

 

 

 

進程數據共享


from multiprocessing import Process
from multiprocessing import queues
import multiprocessing


def foo(i,arg):                 #定義函數兩個選項,用於後續傳參
    arg.put(i)                  #定義寫隊列參數
    print('say hi',i,arg.qsize())      #打印i寫入的隊列,arg.qsize統計隊列數

if __name__=='__main__':
    li = queues.Queue(20,ctx=multiprocessing)    #容許20個隊列,使用多進程處理隊列
    for i in range(10):                          #循環PUT 10次
        p = Process(target=foo,args=(i,li,))     #多進程方式處理
        p.start()                                #啓動多進程

       
       
       
       
進程間數據共享       
# Array來共享數據       
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Array       

def foo(i,arg):
    # arg.put(i)
    # print('say hi',i,arg.qsize())
    arg[i] = i + 100
    for item in arg:
        print(item)
    print('================')

if __name__ == "__main__":
    # li = []
    # li = queues.Queue(20,ctx=multiprocessing)
    li = Array('i', 10)                             #將10個進程數據存入Array中
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        #p.daemon = True
        p.start()
        #p.join()

       
       
       

       

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Manager


def foo(i,arg):
    # arg.put(i)
    # print('say hi',i,arg.qsize())
    # arg[i] = i + 100
    # for item in arg:
    #     print(item)
    # print('====================')
    arg[i] = i + 100
    print(arg.values())

if __name__=="__main__":
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        #p.join()
    import time
    time.sleep(0.1)
   
   
   
   
   
   
   
   
   
   
   
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Manager


def foo(i,arg):
    # arg.put(i)
    # print('say hi',i,arg.qsize())
    # arg[i] = i + 100
    # for item in arg:
    #     print(item)
    # print('====================')
    arg[i] = i + 100
    print(arg.values())

if __name__=="__main__":
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        p.start()
        p.join()
    # import time
    # time.sleep(0.1)

 

 


   
串行沒有多線程
from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)

if __name__ == "__main__":
    pool =  Pool(5)
    for i in range(30):
        pool.apply(func=f1,args=(i,))
    print('end')

 

 

 

 

五個一塊兒執行
from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)

if __name__ == "__main__":
    pool =  Pool(5)
    for i in range(30):
        #pool.apply(func=f1,args=(i,))
        pool.apply_async(func=f1,args=(i,))      #去隊列中取任務

    pool.close()
    pool.join()

   
   
   
   

from multiprocessing import Pool
import time

def f1(arg):
    time.sleep(1)
    print(arg)

if __name__ == "__main__":
    pool =  Pool(5)
    for i in range(30):
        #pool.apply(func=f1,args=(i,))
        pool.apply_async(func=f1,args=(i,))      #去隊列中取任務

    time.sleep(2)
    # pool.close()                          #全部的任務執行完畢
    pool.terminate()                      #當即終止
    pool.join()

   
   


協程
安裝gevent  (WINDOWS按如下方式安裝代替pip3 install gevent)
python3 -m pip install gevent
   
   
   
http://www.cnblogs.com/wupeiqi/articles/5040827.html
看11天圖


協程
# 交叉使用,只使用一個線程,在一個線程中規定某個代碼塊執行順序。
# 協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;
from greenlet import greenlet


def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()


def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

 

-------結果
12
56
34
78

 

 


協程 :遇到IO操做自動切換
from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])

 

 


-------結果
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
449397 bytes received from https://www.yahoo.com/.
25533 bytes received from https://github.com/.
47394 bytes received from https://www.python.org/.

 

 

 

 

 

 

 


緩存


# Memcached安裝
yum -y install libevent
yum -y install memcached
service memcached start  #能夠不用這句,用下面的

# Memcached使用
import memcache
mc = memcache.Client(['172.16.0.2:11211'], debug=True)
mc.set("foo", "bar")
ret = mc.get('foo')
print(ret)

# 啓動Memcached
memcached -d -m 10    -u root -l 127.0.0.1 -p 12000 -c 256 -P /tmp/memcached.pid

http://www.cnblogs.com/wupeiqi/articles/5132791.html

相關文章
相關標籤/搜索