import time import threading def get_detail_html(url): print("get detail html start") time.sleep(2) print("get detail html stop") def get_detail_url(url): print("url start") time.sleep(2) print("url end") if __name__=="__main__": thread1= threading.Thread(target=get_detail_html,args=("",)) thread2= threading.Thread(target=get_detail_url,args=("",)) start_time=time.time() # thread1.setDaemon() # thread2.setDaemon() thread1.start() thread2.start() thread1.join() thread2.join() print("lasttime :{}".format(time.time()-start_time)) pass
import time import threading class thread_get_detail_html(threading.Thread): def run(self): print("get detail html start") time.sleep(2) print("get detail html stop") class thread_get_detail_url(threading.Thread): def run(self): print("url start") time.sleep(2) print("url end") if __name__=="__main__": # thread1= threading.Thread(target=get_detail_html,args=("",)) # thread2= threading.Thread(target=get_detail_url,args=("",)) thread1=thread_get_detail_html() thread2=thread_get_detail_url() start_time=time.time() # thread1.setDaemon() # thread2.setDaemon() thread1.start() thread2.start() thread1.join() thread2.join() print("lasttime :{}".format(time.time()-start_time)) pass
以上就能發現,啓動了兩個線程分別執行了thread_get_detail_url和thread_get_detail_url。html
- 全局變量
- Queue消息隊列
假設咱們如今繼續來完成這個爬蟲的正常邏輯。python
1. 線程間的變量傳遞git
import time import threading detail_url_list=[] def get_detail_html(): global detail_url_list if len(detail_url_list)==0: return url=detail_url_list.pop() print("get detail html start :{}".format(url)) time.sleep(2) print("get detail html stop :{}".format(url)) def get_detail_url(): global detail_url_list print("url start") for i in range(20): detail_url_list.append("htttp://www.baidu.com/{id}".format(id=i)) time.sleep(2) print("url end") if __name__=="__main__": start_time=time.time() thread1= threading.Thread(target=get_detail_url) thread1.start() for i in range(10): thread_2=threading.Thread(target=get_detail_html) thread_2.start() print("lasttime :{}".format(time.time()-start_time)) pass
實際上,還能夠更方便。將變量做爲參數傳遞,在方法中就不須要global了。github
import time import threading detail_url_list=[] def get_detail_html(detail_url_list): if len(detail_url_list)==0: return url=detail_url_list.pop() print("get detail html start :{}".format(url)) time.sleep(2) print("get detail html stop :{}".format(url)) def get_detail_url(detail_url_list): print("url start") for i in range(20): detail_url_list.append("htttp://www.baidu.com/{id}".format(id=i)) time.sleep(2) print("url end") if __name__=="__main__": start_time=time.time() thread1= threading.Thread(target=get_detail_url,args=(detail_url_list,)) thread1.start() for i in range(10): thread_2=threading.Thread(target=get_detail_html,args=(detail_url_list,)) thread_2.start() print("lasttime :{}".format(time.time()-start_time)) pass
import time import threading from queue import Queue def get_detail_html(queue): url=queue.get() print("get detail html start :{}".format(url)) time.sleep(2) print("get detail html stop :{}".format(url)) def get_detail_url(queue): print("url start") for i in range(20): queue.put("htttp://www.baidu.com/{id}".format(id=i)) time.sleep(2) print("url end") if __name__=="__main__": start_time=time.time() url_queue=Queue() thread1= threading.Thread(target=get_detail_url,args=(url_queue,)) thread1.start() for i in range(10): thread_2=threading.Thread(target=get_detail_html,args=(url_queue,)) thread_2.start()
2.線程間的同步問題編程
import threading total=0 def add(): global total for i in range(100000000): total += 1 def desc(): global total for i in range(100000000): total = total - 1 if __name__=="__main__": add_total=threading.Thread(target=add) desc_total=threading.Thread(target=desc) add_total.start() desc_total.start() add_total.join() desc_total.join() print(total)
#input def add1(a): a += 1 def desc1(a): a -= 1 import dis print(dis.dis(add1)) print(dis.dis(desc1)) #output 22 0 LOAD_FAST 0 (a) 2 LOAD_CONST 1 (1) 4 INPLACE_ADD 6 STORE_FAST 0 (a) 8 LOAD_CONST 0 (None) 10 RETURN_VALUE None 25 0 LOAD_FAST 0 (a) 2 LOAD_CONST 1 (1) 4 INPLACE_SUBTRACT 6 STORE_FAST 0 (a) 8 LOAD_CONST 0 (None) 10 RETURN_VALUE None
#input def add1(a): a += 1 def desc1(a): a -= 1 import dis print(dis.dis(add1)) print(dis.dis(desc1)) #output 22 0 LOAD_FAST 0 (a) 2 LOAD_CONST 1 (1) 4 INPLACE_ADD 6 STORE_FAST 0 (a) 8 LOAD_CONST 0 (None) 10 RETURN_VALUE None 25 0 LOAD_FAST 0 (a) 2 LOAD_CONST 1 (1) 4 INPLACE_SUBTRACT 6 STORE_FAST 0 (a) 8 LOAD_CONST 0 (None) 10 RETURN_VALUE None
from threading import RLock total=0 lock=RLock() def add(): global total global lock for i in range(1000000): lock.acquire() lock.acquire() total += 1 lock.release() lock.release()
#input import threading class XiaoAi(threading.Thread): def __init__(self,cond): self.cond=cond super().__init__(name="小愛") def run(self): with self.cond: print("小愛: 天貓在嗎 我是小愛") self.cond.notify() #小愛print完了,信號發送 self.cond.wait() #小愛等待接受信號 print("小愛: 咱們來背詩吧") self.cond.notify() class TianMao(threading.Thread): def __init__(self,cond): self.cond=cond super().__init__(name="天貓") def run(self): with self.cond: self.cond.wait() print("天貓: 在 我是天貓") self.cond.notify() self.cond.wait() print("天貓: 好啊") self.cond.notify() if __name__=="__main__": condition=threading.Condition() xiaoai=XiaoAi(condition) tianmao=TianMao(condition) tianmao.start() xiaoai.start() #output: 小愛: 天貓在嗎 我是小愛 天貓: 在 我是天貓 小愛: 咱們來背詩吧 天貓: 好啊
import threading import time class HtmlSpider(threading.Thread): def __init__(self,url,sem): super().__init__() self.url=url self.sem=sem def run(self): time.sleep(2) print("got html text success") self.sem.release() class UrlProducer(threading.Thread): def __init__(self,sem): super().__init__() self.sem=sem def run(self): for i in range(20): self.sem.acquire() html_test=HtmlSpider("www.baidu.com/{}".format(i),self.sem) html_test.start() if __name__=="__main__": sem=threading.Semaphore(3) #設置控制的數量爲3 urlproducer=UrlProducer(sem) urlproducer.start()
ps:安全
class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) 。。。 self.mutex = threading.Lock() self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def put(self, item, block=True, timeout=None): with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() 。。。。。。
#例:將以前爬蟲模擬的腳本改成線程池用。 from concurrent.futures import ThreadPoolExecutor import time def get_html(times): time.sleep(times) print("get html page {} successed!".format(times)) return times excutor=ThreadPoolExecutor(max_workers=2) #submit 提交到線程池 #submit的返回很重要,返回的對象Future類能夠判斷這個函數的執行狀態等 #submit 是非阻塞的 task1=excutor.submit(get_html,(3)) task2=excutor.submit(get_html,(2)) print(task1.done()) print(task2.done()) #result 是阻塞的,接受函數的返回 print(task1.result()) print(task2.result()) #output: False False get html page 2 successed! get html page 3 successed! 3 2
print(task1.done()) print(task1.cancel()) print(task2.done()) #result 是阻塞的,接受函數的返回 print(task1.result()) print(task2.result()) output: False False False get html page 2 successed! get html page 3 successed! 3 2
cancelled():返回 Future 表明的線程任務是否被成功取消。多線程
from concurrent.futures import ThreadPoolExecutor,as_completed import time def get_html(uid): time.sleep(uid) url="www.test.com/{}".format(uid) print("get url successed: \" {} \"".format(url)) return uid excutor=ThreadPoolExecutor(max_workers=2) uids=[5,2,3] future_list=[ excutor.submit(get_html,(uid)) for uid in uids] for future in as_completed(future_list): print(future.result()) #output: get url successed: " www.test.com/2 " 2 get url successed: " www.test.com/5 " 5 get url successed: " www.test.com/3 " 3
as_completed():yield 全部完成的futures的全部返回。併發
那麼as_complete是如何作到收集全部完成的異步方法的狀態的呢?app
from concurrent.futures import ThreadPoolExecutor,as_completed import time def get_html(uid): time.sleep(uid) url="www.test.com/{}".format(uid) print("get url successed: \" {} \"".format(url)) return uid excutor=ThreadPoolExecutor(max_workers=2) uids=[5,2,3] future_list=[ excutor.submit(get_html,(uid)) for uid in uids] for future in as_completed(future_list): print(future.result()) #output: get url successed: " www.test.com/2 " 2 get url successed: " www.test.com/5 " 5 get url successed: " www.test.com/3 " 3
from concurrent.futures import ThreadPoolExecutor,as_completed,wait import time def get_html(uid): time.sleep(uid) url="www.test.com/{}".format(uid) print("get url successed: \" {} \"".format(url)) return uid excutor=ThreadPoolExecutor(max_workers=2) uids=[5,2,3] result_list=excutor.map(get_html,uids) for result in result_list: print(result) #output: get url successed: " www.test.com/2 " get url successed: " www.test.com/5 " 5 2 get url successed: " www.test.com/3 " 3
from concurrent.futures import ThreadPoolExecutor,as_completed,wait import time def get_html(uid): time.sleep(uid) url="www.test.com/{}".format(uid) print("get url successed: \" {} \"".format(url)) return uid excutor=ThreadPoolExecutor(max_workers=2) uids=[5,2,3] future_list=[ excutor.submit(get_html,(uid)) for uid in uids] print("task end") #output: task end get url successed: " www.test.com/2 " get url successed: " www.test.com/5 " get url successed: " www.test.com/3 "
#尚未執行完,就輸出了 task end。須要加上: wait(future_list) print("task end") #output: get url successed: " www.test.com/2 " get url successed: " www.test.com/5 " get url successed: " www.test.com/3 " task end
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenThreadPool(self._broken) if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') if _shutdown: raise RuntimeError('cannot schedule new futures after' 'interpreter shutdown') f = _base.Future() #初始化一個future對象f w = _WorkItem(f, fn, args, kwargs) #其實是這個_WorkItem把(future對象,執行函數,函數須要的參數)放進去的,而且完成函數的執行,而且設置future的result self._work_queue.put(w) #將w這個task放入 _work_queue隊列,會在下面這個方法中,被起的Thread進行調用。 self._adjust_thread_count() #調整線程數量,而且初始化線程,開啓線程。Thread方法的參數是self._work_queue。起來的線程中執行的task是上兩步生成的w隊列。 return f