client端 import socket sk = socket.socket() sk.connect(('127.0.0.1',9001)) while True: a = input('輸入內容,q退出') if a.upper() == 'Q': sk.send(a.encode('utf-8')) break else: i = sk.recv(1024).decode('utf-8') if i.upper() == 'Q': break print(i) sk.send(a.encode('utf-8')) sk.close()
server端 import socket sk = socket.socket() ###實例化一個對象 sk.bind(('127.0.0.1',9001)) ###綁定一個地址 ip + 端口 ###127.0.0.1 永遠標識本機地址 ###不過交換機 排除一些網絡問題 sk.listen() ###開始監聽客戶的請求 conn,addr = sk.accept() ###接收一個鏈接請求 while True: a = input('請輸入內容,q退出') if a.upper() == 'Q': conn.send(a.encode('utf-8')) break else: conn.send(a.encode('utf-8')) i = conn.recv(1024).decode('utf-8') if i.upper() == 'Q': break print(i) conn.close() ###關閉與客戶端的鏈接 sk.close() ###關閉服務端
server端 from socket import socket,SOCK_DGRAM sk = socket(type= SOCK_DGRAM) sk.bind(('127.0.0.1',9001)) msg,addr = sk.recvfrom(1024) client端 from socket import socket,SOCK_DGRAM sk = socket(type= SOCK_DGRAM) ip = ('127.0.0.1',9002) sk.sendto(msg,ip)
server端 import struct import socket def proto_send(msg): msg = msg.encode('utf-8') len_msg = len(msg) proto_len = struct.pack('i', len_msg) ###把字節的長度編程4字節,i表明int conn.send(proto_len) conn.send(msg) sk = socket.socket() sk.bind(('192.168.12.26',9001)) sk.listen() conn,addr = sk.accept() msg1 = 'hello' msg2 = 'world' proto_send(msg1) proto_send(msg2) client端 import struct import socket sk = socket.socket() def proto_recv(): len_msg = sk.recv(4) len_msg = struct.unpack('i', len_msg)[0] ###解包取元組中的第0個元素就是數字 msg = sk.recv(len_msg) return msg sk.connect(('192.168.12.26',9001)) for i in range(1000000):2*i msg1 = proto_recv() print(msg1) msg2 = proto_recv() print(msg2)
server端 import socket sk = socket.socket() sk.bind(('192.168.12.26',9002)) sk.listen() conn,addr = sk.accept() with open(r'D:\Python_s25\day29\1.內容回顧.py','rb') as f: content = f.read() conn.send(content) conn.close() sk.close() client端 import socket sk = socket.socket() sk.connect(('127.0.0.1',9002)) content = sk.recv(40960) with open(r'內容回顧.py','wb') as f: f.write(content) sk.close()
import struct ret = struct.pack('i',197274000) print(ret) # 可以把一個任意大小的數據 轉換成固定的 4個字節 res = struct.unpack('i',b'\x90)\xc2\x0b') print(res[0]) # -2147483648 ~ +2147483647 # 最大2g
server端 import os import socket import hashlib SECRET_KEY = b'alexbigsb' def check_client(conn): randbytes = os.urandom(32) conn.send(randbytes) md5 = hashlib.md5(SECRET_KEY) md5.update(randbytes) code = md5.hexdigest() code_cli = conn.recv(32).decode('utf-8') return code == code_cli sk = socket.socket() sk.bind(('127.0.0.1',9001)) sk.listen() while True: conn,addr = sk.accept() if not check_client(conn):continue print('進程正常的通訊了') 一種加密方式 # import os # import hmac # hashlib # randbytes = os.urandom(32) # mac = hmac.new(SECRET_KEY,randbytes) # ret = mac.digest() # print(ret)
client端 import os import socket import hashlib SECRET_KEY = b'alexbigs' def check_client(): randbytes = sk.recv(32) md5 = hashlib.md5(SECRET_KEY) md5.update(randbytes) code = md5.hexdigest().encode('utf-8') sk.send(code) sk = socket.socket() sk.connect(('127.0.0.1',9001)) check_client() print('正常的客戶端通訊')
def processBar(num, total): """ 進度條 :param num: :param total: :return: """ rate = num / total rate_num = int(rate * 100) if rate_num == 100: r = '\r%s>%d%%\n' % ('=' * rate_num, rate_num,) else: r = '\r%s>%d%%' % ('=' * rate_num, rate_num,) sys.stdout.write(r) processBar(1024,20480)
server端 from socketserver import BaseRequestHandler,ThreadingTCPServer class Myserver(BaseRequestHandler): def handle(self): conn = self.request 內容 server = ThreadingTCPServer(('127.0.0.1',9001),Myserver) server.serve_forever()
import dis a = 1 def func(): global a a+=1 dis.dis(func)
import os import time print(os.getpid()) #本進程id print(os.getppid()) # parent process id #父進程id time.sleep(100)
import os import time from multiprocessing import Process def func(): ''' 在子進程中執行的func :return: ''' print('子進程 :',os.getpid(),os.getppid()) time.sleep(3) if __name__ == '__main__': p = Process(target=func) p.start() print('主進程 :',os.getpid())
import os import time from multiprocessing import Process def son_process(i): print('start son',os.getpid(),i) time.sleep(1) print('end son') if __name__ == '__main__': for i in range(5): Process(target=son_process,args=(i,)).start() ### strat son 12256 1 strat son 13252 0 strat son 4088 2 strat son 15644 4 strat son 12292 3 end son end son end son end son end son
from multiprocessing import Process import time def son_process(n): print('start',n) time.sleep(1) print('end',n) if __name__ == '__main__': p_l = [] for i in range(10): p = Process(target= son_process,args= (i,) ) p.start() ### start至關於告訴操做系統要開啓一個子進程,而子進程的調度是由操做系統控制的 p_l.append(p) for i in p_l: i.join() ### join 會循環查看每一個子進程是否結束,未結束時會阻塞等待這個子進程結束, 結束了就會查看下一個,都結束了纔會執行下面的 print('全部任務結束')
from multiprocessing import Process import time def son(): while True: time.sleep(1) print('is son') if __name__ == '__main__': p = Process(target= son) p.daemon = True ###將當前子程序設置爲守護進程 p.start() time.sleep(5) ###主程序會在本身的代碼執行完以後殺死守護進程並回收
面試題:求結果 from multiprocessing import Process import time def son(): while True: time.sleep(1) print('is son') def son2(): print('start son2') time.sleep(10) print('end son2') if __name__ == '__main__': p = Process(target= son) p.daemon = True p.start() Process(target= son2).start() time.sleep(5) #守護進程不會守護除了主進程代碼以外的其餘子進程 ### start son2 is son is son is son is son end son2
from multiprocessing import Process import time def son(): while True: time.sleep(1) print('in son') if __name__ == '__main__': p = Process(target=son) p.start() time.sleep(5) p.terminate() # 異步非阻塞操做 print(p.is_alive()) ###True 由於是terminate是異步的 因此 可能會同時執行 time.sleep(0.1) print(p.is_alive()) ###False print('我還能夠繼續作其餘的事情,主進程的代碼並不結束')
from multiprocessing import Process import os class MyProcess(Process): def run(self): ###重寫Process中的run方法 print(os.getpid()) if __name__ == '__main__': print('主:',os.getpid()) MyProcess().start() 傳參數: from multiprocessing import Process import os class MyProcess(Process): def __init__(self,arg1,arg2): super().__init__() ###執行父類中的init,必需要執行父類的init self.a1 = arg1 ###執行重寫的init,能夠加可是不能減 self.a2 = arg2 def run(self): ###重寫Process中的run方法 print(os.getpid(),self.a1,self.a2) if __name__ == '__main__': print('主:',os.getpid()) MyProcess(1,2).start() ###開啓子進程
n = 0 def son(i): global n n += 1 print(n,i) if __name__ == '__main__': p_l = [] for i in range(5): p = Process(target= son,args=(i,)) p.start() p_l.append(p) print(n) ### 0 1 0 1 2 1 3 1 1 1 4
from multiprocessing import Process,Lock import os def charge(lock): lock.acquire() ###上鎖 with open('lt','r')as f: content = f.read() num = int(content) num += 1 for i in range(10000):i += 1 with open('lt','w')as f: f.write(str(num)) lock.release() ###解鎖 if __name__ == '__main__': lock = Lock() ###實例化一個對象(鎖) for i in range(10): Process(target= charge,args=(lock,)).start()
from multiprocessing import Process,Queue def son(q): print(q.get(123)) ###從隊列中取出 if __name__ == '__main__': q = Queue() ###實例化一個隊列 p = Process(target= son,args=(q,)) p.start() q.put(123) ###發送到隊列中
from multiprocessing import Process,Queue import queue q = Queue(2) try: for i in range(4): q.put_nowait(i) # put_nowait 同步非阻塞方法 except queue.Full: print(i) q2 = Queue(2) try: print(q2.get_nowait()) except queue.Empty:pass q = Queue(5) ret = q.qsize() # 查看當前隊列有多少值 print(q.empty()) print(q.full())
閹割版單例模塊html
class Foo: instance = None def __init__(self, name): self.name = name def __new__(cls, *args, **kwargs): # 返回空對象 if cls.instance: return cls.instance cls.instance = object.__new__(cls) return cls.instance obj1 = Foo('小妹') obj2 = Foo('傻子') print(obj1,obj2)
爬蟲下載圖片python
1. 安裝第三方模塊 pip3 install requests 2. 使用 import requests url = 'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg' # python僞造瀏覽器向地址發送請求 rep = requests.get(url) # 請求返回回來的字節 # print(rep.content) with open('xxxxxx.jpg',mode='wb') as f: f.write(rep.content)
import requests url_list = [ 'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg', 'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg', 'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg' ] for url in url_list: ret = requests.get(url) file_name = url.rsplit('/',maxsplit=1)[-1] with open(file_name,mode='wb') as f: # 下載小文件 # f.write(ret.content) # 下載大文件 for chunk in ret.iter_content(): f.write(chunk)
線程的概念git
形象的關係 工廠--應用程序 車間--進程 工人--線程
進程和線程的區別?github
進程師計算機資源分配的最小單位 線程師計算機中能夠被cpu調度的最小單位 一個進程中能夠有多個線程,同一個進程中的線程能夠共享此進程中的資源,一個進程中至少有一個線程(一個應用程序中至少有一個進程) 在python中由於有GIL鎖, 默認進程之間沒法進行資源共享,若是進程想要通信能夠基於:文件/網絡/Queue
多線程的應用web
快速應用面試
import threading def task(arg): pass # 實例化一個線程對象 t = threading.Thread(target=task,args=('xxx',)) # 將線程提交給cpu t.start()
import threading def task(arg): ret = requests.get(arg) file_name = arg.rsplit('/', maxsplit=1)[-1] with open(file_name, mode='wb') as f: f.write(ret.content) for url in url_list: # 實例化一個線程對象 t = threading.Thread(target=task,args=(url,)) # 將線程提交給cpu t.start()
常見方法redis
t.start(),將線程提交給cpu,由cpu來進行調度算法
t.join(),等待編程
import threading 示例1 loop = 1000 number = 0 def _add(count): flobal number for i in range(count): number +=1 t = threading.Thread(target = _add,args=(loop,)) t.start() t.join() print(number)
import threading 示例2 loop = 1000 number = 0 def _add(count): global number for in in range(count): number +=1 def _sub(count): global number for i in range(count): number -= 1 t1 = threading.Thread(target=_add,args(loop,)) t2 = threading.Thread(target=_add,args(loop,)) t1.start() t2.atart() print(number)
import thrading 示例3 loop = 10000000 number = 0 def _add(count): global number for i in range(count): number += 1 def _sub(count): global number for i in range(count): number -= 1 t1 = threading.Thread(target=_add,args=(loop,)) t2 = threading.Thread(target=_sub,args=(loop,)) t1.start() t2.start() t1.join() # t1線程執行完畢,才繼續日後走 t2.join() # t2線程執行完畢,才繼續日後走 print(number)
import thrading 示例4 loop = 10000000 number = 0 def _add(count): global number for i in range(count): number += 1 def _sub(count): global number for i in range(count): number -= 1 t1 = threading.Thread(target=_add,args=(loop,)) t2 = threading.Thread(target=_sub,args=(loop,)) t1.start() t1.join() # t1線程執行完畢,才繼續日後走 t2.start() t2.join() # t2線程執行完畢,才繼續日後走 print(number)
t.setDaemon(),設置成爲守護線程小程序
import threading import time def task(arg): time.sleep(5) print('小妹') t = threading.Thread(target=task,args=(11,)) t.setDaemon(True) t.start() print('小妹')
線程名稱的設置和獲取
import threading def task(arg): #獲取當前執行此代碼的線程 name = threading.current+thread().getName() print(name) for i in range(10): t = threading.Thread(target=task,args=(11,)) t.setName('小妹-%s' %i) t.start()
run(),自定義線程時,cpu調度執行的方法
class XIAOMEI(threading.Thread): def run(self): print(‘執行此線程',self._args) obj = XIAOMEI(args=(100,)) obj.start()
練習題:基於socket 和多線程實現相似於socketserver模塊的功能
import socket import threading def task(connect,address): pass server = socket.socket() serber.bind('127.0.0.1',9000) server.listen(5) Whlie True: conn,addr = server.accept() #處理用戶請求 t= threading.thread(target=task,args=(conn,addr,)) t.start()
線程安全
Lock 互斥鎖
import threading import time def task(): global num #申請鎖 lock.acquire() num += 1 time.sleep(0.2) print(num) #釋放鎖 lock.release() with lock: num += 1 time.sleep(0.2) print(num) for i in range(10): t = threading.Thread(target=task) t.start()
RLock,遞歸鎖支持上屢次鎖
import threading import time num = 0 #線程鎖 locs = threading.RLock() def task(): golbal num #申請鎖 lock.acquire() num +=1 lock.acquire() time.sleep(0.2) print(num) #釋放鎖 lock.release() lock.release() for i in range(10): t=threading.Thread(target=task) t.start()
死鎖
import threading import time lock1 = threading.RLock() lock2 = threading.RLock() def fenglin(): lock1.acquire() time.sleep(1) lock2.acquire() def rimo(): lock2.acquire() time.sleep(1) lock1.acquire() t1 = threading.Thread(target=fenglin) t2 = threading.Thread(target=rimo) t1.start() t2.start()
基於線程鎖完成一個單例模式
import threading import time class XIAOMEI(): instance = None lock = threading.RLock() def __init__(self,name): self.name = name def __new__(cls,*args,**kwargs): if cls.instance: return cls.instance with cls.lock: if cls.instance return cls.instance time.sleep(0.1) cls.instance=object.__new__(cls) return cls.instance def task(): obj = Singleton('x') print(obj) for i in range(10): t = threading.Thread(target=task) t.start() data = Singleton('xiaomei') print(data)
GIL全局解釋鎖
同一時刻保證一個進程中只有一個線程能夠被cpu調度,因此在使用python開發時要注意: 計算密集型,用多進程 IO密集型,用多線程 GIL在列表和字典中起到了線程安全的做用
import time from concurrent.futures import ThreadPoolExecutor def task(n1, n2): time.sleep(2) print('任務') # 建立線程池 pool = ThreadPoolExecutor(10) for i in range(100): pool.submit(task, i, 1) print('END') # 等線程池中的任務執行完畢以後,再繼續往下走 pool.shutdown(True) print('其餘操做,依賴線程池執行的結果') 函數的返回值存在submit返回的對象裏面,用result()方法查看
import time from concurrent.futures import ThreadPoolExecutor def task(arg): time.sleep(2) print('任務') return 666 # 建立線程池 pool = ThreadPoolExecutor(10) ret = pool.map(task,range(1,20)) print('END',ret) pool.shutdown(True) for item in ret: print(item) 函數的返回值存在map的返回值中
回調函數
import requests from concurrent.futures import ThreadPoolExecutor def get_url(name,url): ret = requests.get(url) return name,ret.content def dump(res): with open(res.result()[0],mode='wb') as f: f.write(res.result()[1]) tp = ThreadPoolExecutor(4) dic = { 'baidu':'http://www.baidu.com', 'taobao':'http://www.taobao.com', 'jd':'http://www.jd.com', 'sina':'http://www.sina.com', 'sogou':'http://www.sogou.com', 'cnblog':'https://www.cnblogs.com/Eva-J/articles/9676220.html', } for k in dic: ret = tp.submit(get_url,k,dic[k]) ret.add_done_callback(dump) ###requests會發生阻塞,等待全部線程都回來,使用回調函數以後哪一個線程先回來就先執行哪一個
import time from concurrent.futures import ProcessPoolExecutor def task(n1, n2): time.sleep(2) print('任務') if __name__ == '__main__': # 建立線程池 pool = ProcessPoolExecutor(10) for i in range(20): pool.submit(task, i, 1) print('END')
from greenlet import greenlet def test1(): print('i1') gr2.switch() print('i3') gr2.switch() def test2(): print('i2') gr1.switch() print('i4') gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() ###單單是協程沒用 必需要加上遇到IO自動切換
from gevent import monkey monkey.patch_all() ###必須寫在前面 import gevent import time def eat(): print('eat food 1') time.sleep(3) print('eat food 2') def play(): print('play 1') time.sleep(3) print('play 2') g1 = gevent.spawn(eat) g2 = gevent.spawn(play) gevent.joinall([g1, g2])
from gevent import monkey monkey.patch_all() import gevent import requests def f1(url): print('GET: %s' % url) data = requests.get(url) print('%d bytes received from %s.' % (len(data.content), url)) def f2(url): print('GET: %s' % url) data = requests.get(url) print('%d bytes received from %s.' % (len(data.content), url)) def f3(url): print('GET: %s' % url) data = requests.get(url) print('%d bytes received from %s.' % (len(data.content), url)) gevent.joinall([ gevent.spawn(f1, 'https://www.python.org/'), gevent.spawn(f2, 'https://www.yahoo.com/'), gevent.spawn(f3, 'https://github.com/'), ])
相同性: 進程、協程、線程的區別:三個均可以提升併發 概念和關係: 進程是計算機中分配資源的最小單位,進程之間數據不共享,進程開啓和銷燬的開銷大,由操做系統調度,能夠利用多個 cpu。 線程是計算機中CPU調度的最小單位,線程之間資源共享,能夠利用多核(可是CPython中有GIL鎖),開啓和銷燬的開銷小,由操做系統負責調度。 協程又稱爲「微線程」,是基於代碼人爲創造的,資源開銷小,協程可以識別的IO操做:網絡操做,sleep,一個進程中能夠有多個線程,一個線程能夠建立多個協程 協程和cpython解釋器下的線程有啥區別 :資源開銷小 可以把單線程的效率提升 協程可以識別的io操做不如線程多 使用場景: 計算密集型,多進程; I0密集型,多線程/協程+ I0切換 單純的協程是沒辦法提升併發,只是代碼之間的來回切換,加上I0自動切換才 有意義,有I0操做用協程
from queue import Queue q = Queue() q.put('123') q.put(456) v1 = q.get() v2 = q.get() print(v1,v2) # 默認阻塞 v1 = q.get() print(v1)
from queue import LifoQueue lq = LifoQueue() lq.put(1) lq.put(2) lq.put(3) print(lq.get())
from queue import PriorityQueue pq = PriorityQueue() pq.put((10,'alex')) pq.put((5,'egon')) pq.put((15,'yuan')) print(pq.get()) print(pq.get()) print(pq.get())
1. 申請126或163郵箱 2. 開啓服務+受權碼 3. 經過代碼發送 """ import smtplib from email.mime.text import MIMEText from email.utils import formataddr # 寫郵件的內容 msg = MIMEText('老闆,我想演男一號,你想怎麼着都行。', 'plain', 'utf-8') msg['From'] = formataddr(["炮手", 'zh309603@163.com']) msg['To'] = formataddr(["老闆", '424662508@qq.com']) msg['Subject'] = "情愛的導演" server = smtplib.SMTP_SSL("smtp.163.com", 465) server.login("zh309603", "zhzzhz123") # 受權碼 server.sendmail('zh309603@163.com', ['424662508@qq.com', ], msg.as_string()) server.quit()
就是把大量的數據放到隊列中 而後有大量服務器從隊列中取出整理這些數據 import threading from queue import Queue import time q = Queue() def send(to,subject,text): import smtplib from email.mime.text import MIMEText from email.utils import formataddr # 寫郵件的內容 msg = MIMEText(text, 'plain', 'utf-8') msg['From'] = formataddr(["炮手", 'zh309603@163.com']) msg['To'] = formataddr(["老闆", to]) msg['Subject'] = subject server = smtplib.SMTP_SSL("smtp.163.com", 465) server.login("zh309603", "zhzzhz123") # 受權碼 server.sendmail('zh309603@163.com', [to, ], msg.as_string()) server.quit() def producer(i): """ 生產者 :return: """ print('生產者往隊列中放了10個任務',i) info = {'to':'424662508@qq.com', 'text':'你好','subject':'好友請求'} q.put(info) def consumer(): """ 消費者 :return: """ while True: print('消費者去隊列中取了任務') info = q.get() print(info) send(info['to'],info['subject'],info['text']) for i in range(10): t = threading.Thread(target=producer,args=(i,)) t.start() for j in range(5): t = threading.Thread(target=consumer) t.start()