1、線程
基本使用
線程鎖
自定義線程池
生產者消費者模型(隊列)
2、進程
基本使用
進程鎖
進程數據共享
默認數據不共享
queues
array
Manager.dict
進程池
PS:
IO密集型-多線程
計算密集型 - 多進程
3、協程
原理:利用一個線程,分解一個線程成爲多個「微線程」==》程序級別
greenlet
gevent
pip3 install gevent
4、緩存
一、安裝軟件
二、程序:安裝其對應的模塊
Socket鏈接,
memecach
一、天生集羣
二、基本
三、gets,cas
k -> ""
redis
k -> ""
k -> [11,11,22,33,44]
k -> {"k1":xxx}
k -> [11,22]
k -> [(11,1),(13,2),]html
#使用多線程
import threading
def f1(arg):
print(arg)
t = threading.Thread(target=f1,args=(123,))
t.start()python
import threading
from time import ctime,sleep
import time
def music(func):
for i in range(2):
print("我在聽--------- %s. %s" %(func,ctime()))
time.sleep(1)git
def move(func):
for i in range(2):
print("我在看---------- %s! %s" %(func,ctime()))
time.sleep(5)github
threads = []
t1 = threading.Thread(target=music,args=(u'愛情買賣',)) #把music加入到多線程
t2 = threading.Thread(target=move,args=(u'阿凡達',)) #把move加入到多線程
threads.append(t1)
threads.append(t2)redis
for i in threads:
i.start()緩存
import threading
from time import ctime,sleep
import time
def music(func):
for i in range(2):
print("我在聽--------- %s. %s" %(func,ctime()))
#time.sleep(1)服務器
def move(func):
for i in range(2):
print("我在看---------- %s! %s" %(func,ctime()))
#time.sleep(5)多線程
t1 = threading.Thread(target=music,args=(u'愛情買賣',)) #把music加入到多線程
t2 = threading.Thread(target=move,args=(u'阿凡達',)) #把move加入到多線程
t1.start()
sleep(2)
t2.start()app
同時作兩件事
#聽歌看片一塊兒幹,想要幹兩次,此程序只幹一次,由於setDaemon不等子線程
import threading
from time import ctime,sleep
def music(func):
for i in range(2):
print("我在聽-------- %s. %s" %(func,ctime()))
sleep(1)async
def move(func):
for i in range(2):
print("我在看-------- %s! %s" %(func,ctime()))
sleep(5)
threads = []
t1 = threading.Thread(target=music,args=(u'愛情買賣',)) #把music加入到多線程
threads.append(t1)
t2 = threading.Thread(target=move,args=(u'阿凡達',)) #把move加入到多線程
threads.append(t2)
if __name__ == '__main__':
for t in threads:
t.setDaemon(True) #子線程啓動後,父線程也繼續執行下去,當父線程執行完最後一條語句print "all over %s" %ctime()後,沒有等待子線程,直接就退出了,同時子線程也一同結束。
t.start() #開始啓動線程
print("兩件事同時作完----------- %s" %ctime())
多線程完整程序,同時聽了歌又看了片
import threading
from time import ctime,sleep
def music(func):
for i in range(2):
print("我在聽---------- to %s. %s" %(func,ctime()))
sleep(1)
def move(func):
for i in range(2):
print("我在看---------- to %s! %s" %(func,ctime()))
sleep(5)
threads = []
t1 = threading.Thread(target=music,args=(u'牛逼存在',))
threads.append(t1)
t2 = threading.Thread(target=move,args=(u'鍾馗伏魔',))
threads.append(t2)
if __name__ == '__main__':
for t in threads:
t.setDaemon(True)
t.start()
t.join() #等待進程終止,子程進程沒有運行完,就不執行父進程
print("全部結束--------------------- %s" %ctime())
# 結果
我在聽---------- to 牛逼存在. Mon Jul 18 00:18:22 2016
我在看---------- to 鍾馗伏魔! Mon Jul 18 00:18:22 2016
我在聽---------- to 牛逼存在. Mon Jul 18 00:18:23 2016
我在看---------- to 鍾馗伏魔! Mon Jul 18 00:18:27 2016
全部結束--------------------- Mon Jul 18 00:18:32 2016
#自定義多線程,調用
import threading
class MyThread(threading.Thread):
def __init__(self,func,args):
self.func = func
self.args = args
super(MyThread,self).__init__() #super繼承調用父類,解決多繼承的重複調用問題
def run(self): # 自動執行run方法
self.func(self.args)
def f2(arg):
print(arg)
obj = MyThread(f2,123)
obj.start()
#結果
123
消息隊列
# queue.Queue(2) 先進先出隊列
# put放數據,是否阻塞,阻塞時的超時事件
# get取數據(默認阻塞),是否阻塞,阻塞時的超時事件
# qsize()真實個數
# maxsize 最大支持的個數
# join,task_done,阻塞進程,當隊列中任務執行完畢以後,再也不阻塞
q = queue.Queue(2) #只接收兩個隊列
print(q.empty())
q.put(11)
q.put(22)
print(q.empty())
print(q.qsize())
q.put(22)
q.put(33, block=False)
q.put(33,block=False, timeout=2)
print(q.get())
print(q.get())
print(q.get(timeout=2))
----結果
True
False
2 # 阻塞了,只接收了兩個
import queue
# queue.Queue,先進先出隊列
# queue.LifoQueue,後進先出隊列
# queue.PriorityQueue,優先級隊列
# queue.deque,雙向對隊
#後進先出隊列
q = queue.LifoQueue()
q.put(123) #放數據給隊列
q.put(456)
print(q.get()) #取一條數據
print(q.get()) #取第二條數據
# #優先級隊列
#數值小的高
q = queue.PriorityQueue()
q.put((1,"alex1")) #存數據給隊列,優化級爲1
q.put((1,"alex2"))
q.put((1,"alex3"))
q.put((3,"alex3"))
print(q.get()) #取數據
#雙向對隊
q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
q.pop()
q.popleft()
隊列機制
import queue
queue.Queue #先進先出隊列
queue.LifoQueue #後隊進先出隊列
queue.PriorityQueue #優先級隊列
queue.deque #雙向對隊
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())
q = queue.PriorityQueue()
q.put((1,"alex1"))
q.put((1,"alex2"))
q.put((1,"alex3"))
q.put((3,"alex3"))
print(q.get())
q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
q.pop()
q.popleft()
生產者消費者
消息隊列解決供給問題,解決阻塞,解耦
import queue
import threading
import time
q = queue.Queue() #先進先出隊列
def productor(arg): # 生產者
"""
買票
:param arg:
:return:
"""
q.put(str(arg) + '-包子') #發包子器
for i in range(300): #300我的買包子
t = threading.Thread(target=productor,args=(i,))
t.start()
def consumer(arg): # 消費者
"""
服務器後臺
:param arg:
:return:
"""
while True: #無限生產
print(arg,q.get())
time.sleep(2)
for j in range(3): #3我的一直生產
t = threading.Thread(target=consumer,args=(j,))
t.start()
# 線程鎖
# threading.RLock和threading.Lock
# threading.Lock 產生死鎖阻塞
import threading
lock = threading.Lock() #Lock對象
lock.acquire()
lock.acquire() #產生了死鎖。
lock.release()
lock.release()
-----結果
#一直循環不退出,由於鎖住了
# threading.RLock #在同一線程內,程序不會堵塞。
import threading
rLock = threading.RLock() #RLock對象
rLock.acquire()
rLock.acquire() #在同一線程內,程序不會堵塞。
rLock.release()
rLock.release()
-----結果
Process finished with exit code 0 #不阻塞
# 上鎖和解鎖,遞歸10減到0
import threading
import time
NUM = 10
def func(l):
global NUM
# 上鎖
l.acquire()
NUM -= 1
time.sleep(2)
print(NUM)
# 開鎖
l.release()
#lock = threading.Lock()
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=func,args=(lock,))
t.start()
-----結果
全鎖,全放
#紅燈停,綠燈放行
import threading
def func(i,e):
print(i)
e.wait() # 檢測是什麼燈,若是是紅燈,停;綠燈,行
print(i+100)
event = threading.Event()
for i in range(10):
t = threading.Thread(target=func, args=(i,event,))
t.start()
#========
event.clear() # 設置成紅燈
inp = input('>>>')
if inp == "1": #輸入1放行
event.set() # 設置成綠燈
------結果
0
1
2
3
4
5
6
7
8
9
>>>1
100
103
104
107
108
101
102
105
106
109
信號量,放幾個出去
import threading
import time
NUM = 10
def func(i,l):
global NUM
# 上鎖
l.acquire()
NUM -= 1
time.sleep(2)
print(NUM,i)
# 開鎖
l.release()
#lock = threading.Lock()
#lock = threading.RLock()
lock = threading.BoundedSemaphore(5) #信號量,放幾個出去
for i in range(10):
t = threading.Thread(target=func,args=(i,lock,))
t.start()
import threading
def func(i,con):
print(i)
con.acquire()
con.wait()
print(i+100)
con.release()
c = threading.Condition()
for i in range(10):
t = threading.Thread(target=func, args=(i,c,))
t.start()
while True:
inp = input('>>>')
if inp == 'q':
break
c.acquire()
c.notify(int(inp))
c.release()
import threading
def condition():
ret = False
r = input('>>>')
if r == 'true':
ret = True
else:
ret = False
return ret
def func(i,con):
print(i)
con.acquire()
con.wait_for(condition)
print(i+100)
con.release()
c = threading.Condition()
for i in range(10):
t = threading.Thread(target=func, args=(i,c,))
t.start()
# 1秒以後打印hello world
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello) #1秒以後打印hello world
t.start() # after 1 seconds, "hello, world" will be print
#線程池程序
import queue
import threading
import contextlib
import time
StopEvent = object() #配置空值,判斷空值終止線程
class ThreadPool(object):
def __init__(self, max_num, max_task_num = None): #配置最大線程數和任務最大個數
"""
初始化函數作三件事:
1.判斷任務最大數是否爲真,真則建立隊列存任務,假則建立不受限制隊列
2.建立當前已經建立的線程變量
3.建立當前空閒多少線程變量
"""
if max_task_num:
self.q = queue.Queue(max_task_num) #建立隊列,用來裝任務
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = [] #當前已經建立的線程
self.free_list = [] #當前空閒多少線程
def run(self, func, args, callback=None):
"""
線程池執行一個任務
:param func: 任務函數
:param args: 任務函數所需參數
:param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數一、任務函數執行狀態;二、任務函數返回值(默認爲None,即:不執行回調函數)
:return: 若是線程池已經終止,則返回True不然None
"""
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: #判斷沒有空閒線程和已經建立的線程小於線程池最大數,則建立線程
self.generate_thread() # 建立線程
w = (func, args, callback,) # 如下兩行把任務放入隊列 #FUNC 是函數 ARGS、CALLBACK是元組
self.q.put(w)
def generate_thread(self):
"""
建立一個線程
"""
t = threading.Thread(target=self.call) #建立線程執行call方法(用call方法建立線程),每建立一個線程都執行call
t.start()
def call(self):
"""
循環去獲取任務函數並執行任務函數
"""
current_thread = threading.currentThread
self.generate_list.append(current_thread) #獲取已建立的線程存入列表
event = self.q.get() #取任務
while event != StopEvent:
'''若是獲取的任務不爲空,則執行改變其狀態,循環配置每一個任務爲空閒,不然傳入的是空值,則清除線程列表'''
func, arguments, callback = event #是元組event即任務便執行
try:
result = func(*arguments) #傳參,執行ACTION函數,ACTION執行完是空閒
success = True
except Exception as e:
success = False
result = None
if callback is not None:
try:
callback(success, result)
except Exception as e:
pass
with self.worker_state(self.free_list, current_thread): #若是執行ACTION,把任務設置成空閒
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:
self.generate_list.remove(current_thread)
def close(self):
"""
執行完全部的任務後,全部線程中止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1
def terminate(self):
"""
不管是否還有任務,終止線程
"""
self.terminal = True
while self.generate_list:
self.q.put(StopEvent) # 終止前存入隊列
self.q.empty()
'''如下用contextlib裝飾器模塊爲記錄正在等待的線程數,用於call方法把了隊列所有讀出來,即實現裝飾器爲空'''
@contextlib.contextmanager #contoextlib上下文管理,實現相似with的自動關閉機制,其實就是個with封裝
def worker_state(self, state_list, worker_thread):
"""
用於記錄線程中正在等待的線程數
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
pool = ThreadPool(5) #建立線程池
def callback(status, result):
# status, execute action status
# result, execute action return value
pass
# 建立一個讀任務函數
def action(i):
print(i)
for i in range(300): #300個任務
ret = pool.run(action, (i,), callback)
# time.sleep(5)
# print(len(pool.generate_list), len(pool.free_list))
# print(len(pool.generate_list), len(pool.free_list))
進程數據共享
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
def foo(i,arg): #定義函數兩個選項,用於後續傳參
arg.put(i) #定義寫隊列參數
print('say hi',i,arg.qsize()) #打印i寫入的隊列,arg.qsize統計隊列數
if __name__=='__main__':
li = queues.Queue(20,ctx=multiprocessing) #容許20個隊列,使用多進程處理隊列
for i in range(10): #循環PUT 10次
p = Process(target=foo,args=(i,li,)) #多進程方式處理
p.start() #啓動多進程
進程間數據共享
# Array來共享數據
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Array
def foo(i,arg):
# arg.put(i)
# print('say hi',i,arg.qsize())
arg[i] = i + 100
for item in arg:
print(item)
print('================')
if __name__ == "__main__":
# li = []
# li = queues.Queue(20,ctx=multiprocessing)
li = Array('i', 10) #將10個進程數據存入Array中
for i in range(10):
p = Process(target=foo,args=(i,li,))
#p.daemon = True
p.start()
#p.join()
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Manager
def foo(i,arg):
# arg.put(i)
# print('say hi',i,arg.qsize())
# arg[i] = i + 100
# for item in arg:
# print(item)
# print('====================')
arg[i] = i + 100
print(arg.values())
if __name__=="__main__":
obj = Manager()
li = obj.dict()
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
#p.join()
import time
time.sleep(0.1)
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
from multiprocessing import Manager
def foo(i,arg):
# arg.put(i)
# print('say hi',i,arg.qsize())
# arg[i] = i + 100
# for item in arg:
# print(item)
# print('====================')
arg[i] = i + 100
print(arg.values())
if __name__=="__main__":
obj = Manager()
li = obj.dict()
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
p.join()
# import time
# time.sleep(0.1)
串行沒有多線程
from multiprocessing import Pool
import time
def f1(arg):
time.sleep(1)
print(arg)
if __name__ == "__main__":
pool = Pool(5)
for i in range(30):
pool.apply(func=f1,args=(i,))
print('end')
五個一塊兒執行
from multiprocessing import Pool
import time
def f1(arg):
time.sleep(1)
print(arg)
if __name__ == "__main__":
pool = Pool(5)
for i in range(30):
#pool.apply(func=f1,args=(i,))
pool.apply_async(func=f1,args=(i,)) #去隊列中取任務
pool.close()
pool.join()
from multiprocessing import Pool
import time
def f1(arg):
time.sleep(1)
print(arg)
if __name__ == "__main__":
pool = Pool(5)
for i in range(30):
#pool.apply(func=f1,args=(i,))
pool.apply_async(func=f1,args=(i,)) #去隊列中取任務
time.sleep(2)
# pool.close() #全部的任務執行完畢
pool.terminate() #當即終止
pool.join()
協程
安裝gevent (WINDOWS按如下方式安裝代替pip3 install gevent)
python3 -m pip install gevent
http://www.cnblogs.com/wupeiqi/articles/5040827.html
看11天圖
協程
# 交叉使用,只使用一個線程,在一個線程中規定某個代碼塊執行順序。
# 協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
-------結果
12
56
34
78
協程 :遇到IO操做自動切換
from gevent import monkey; monkey.patch_all()
import gevent
import requests
def f(url):
print('GET: %s' % url)
resp = requests.get(url)
data = resp.text
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])
-------結果
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
449397 bytes received from https://www.yahoo.com/.
25533 bytes received from https://github.com/.
47394 bytes received from https://www.python.org/.
緩存
# Memcached安裝
yum -y install libevent
yum -y install memcached
service memcached start #能夠不用這句,用下面的
# Memcached使用
import memcache
mc = memcache.Client(['172.16.0.2:11211'], debug=True)
mc.set("foo", "bar")
ret = mc.get('foo')
print(ret)
# 啓動Memcached
memcached -d -m 10 -u root -l 127.0.0.1 -p 12000 -c 256 -P /tmp/memcached.pid