# 單例模式 class A: __isinstace = None def __init__(self): print('init') # __new__方法是爲對象在內存中開闢內存空間,__init__是爲該空間封裝屬性 def __new__(cls, *args, **kwargs): print('new') if not cls.__isinstace: cls.__isinstace = super().__new__(cls) return cls.__isinstace if __name__ == '__main__': a = A() aa = A() print(id(a)) print(id(aa))
class A: def __init__(self, v): self.v = v def __getitem__(self, key): print('getitem') def __setitem__(self, key, value): print('setitem') def __getattr__(self, key): print('getattr') def __setattr__(self, key, value): print('setattr') def __delitem__(self, key): print('delitem') def __delattr__(self, key): print('delattr') if __name__ == '__main__': a = A(1) a['v'] = 1 a.v = 2 v = a.v vv = a['v'] del a.v del a['v']
class A: def __init__(self): print('init') def __enter__(self): print('enter') return 1 def __exit__(self, exception_type, exception_value, traceback): print('exit') if __name__ == '__main__': # a 爲__enter__方法返回的值 with A() as a: print(a)
首先獲取目標的全部方法、屬性集,而後再獲取。python
attr()方法就是經過參數獲取相應對象中的值。編程
其中一個很重要的功能就是hasattr()和getattr(),由於python一切皆對象,因此說全部的屬性和方法是以字典的形式保存的,即key-->value,鍵名對應着值。若要獲取一個對象的屬性的值,就是前往該對象的屬性、方法集合中去尋找相應的鍵,若存在就返回值,不然就報錯,若調用getattr就會作相應的錯誤處理(返回一句提示);若要更新一個對象中的值,則是前往屬性、方法集合中去尋找相應的鍵,並更新值,若鍵不存在就更新集合,添加一個鍵,並默認爲其賦予默認的值。緩存
class A: def __init__(self, value): self.value = value if __name__ == '__main__': a = A(1) if hasattr(a, 'b'): getattr(a, 'b') else: print('not find') print(getattr(a, 'b', 'not find')) def func1(): print('func1') def fun2(): print('func2') # globals()獲取當前文件中全部的方法和屬性 # print(globals()['func1']()) import sys # 獲取當前主進程模塊 obj = sys.modules['__main__'] print(obj.__dict__)
python中因爲GIL(全局解釋鎖)的存在,一次只能執行一個進程,並不能達到真正意義上的多進程安全
# 多進程 from multiprocessing import Process, Pool,Manager, Lock import time def func1(): print('fucn1') def func2(): print('func2') def func3(i): print('接收', i) time.sleep(1) print('結束', i) def func3(queue, i, lock): lock.acquire() queue.put(f'放入{i}') print(f'成功放入{i}') time.sleep(2) lock.release() def func4(queue): data = queue.get() print('拿出數據:', data) time.sleep(0.5) class MyProcess(Process): def __init__(self, args): self.args = args super().__init__(target=self.run, args=args) def run(self): print('獲取', self.args[0]) time.sleep(1) print('結束', self.args[0]) if __name__ == '__main__': # p1 = Process(target=func1) # p2 = Process(target=func2) start = time.time() # 進程池 # processes = [Process(target=func3, args=(i, )) for i in range(100)] # pool = Pool() # for i in range(100): # pool.apply_async(func3, (i,)) # # pool.close() # pool.join() # for p in processes: # p.start() # # for p in processes: # p.join() # p1.start() # p2.start() # # p1.join() # p2.join() # 繼承實現 # processes = [MyProcess(args=(i,)) for i in range(100)] # for p in processes: # p.start() # for p in processes: # p.join() # 進程間通訊 queue = Manager().Queue() lock = Lock() processes = [Process(target=func3, args=(queue, i, lock)) for i in range(50)] processes += [Process(target=func4, args=(queue,)) for _ in range(40)] for p in processes: p.start() for p in processes: p.join() # 鎖 end = time.time() print('主進程結束, 耗時:', end-start)
# 多線程 from threading import Thread import time a = 0 def fun1(): print('func1') def fun2(): print('func2') def fun3(): global a print('計算開始:', a) for i in range(1000000): a += 1 print('計算結束:', a) def func4(): global a print('計算開始', a) for i in range(100000): a -= 1 print('計算結果', a) class MyThread(Thread): def run(self): print('MyThread') time.sleep(1) if __name__ == '__main__': start = time.time() # 單線程 # t1 = Thread(target=fun1) # t2 = Thread(target=fun2) # # t1.start() # t2.start() # # t1.join() # t2.join() # 多線程 # threads = [MyThread() for _ in range(100)] # # for t in threads: # t.start() # for t in threads: # t.join() # 多線程通訊, 以及多線程之間的併發所形成的數據有誤 threads = [Thread(target=fun3) for _ in range(100)] # threads += [Thread(target=func4) for _ in range(100)] for t in threads: t.start() for t in threads: t.join() end = time.time() print('最終結果:',a) print('耗時:', end-start)
表示同一時間內最多幾個線程爲就緒狀態,準備被系統調度運行。服務器
from threading import Thread, Semaphore import time def func1(i): s.acquire() print(f'開始執行{i}') time.sleep(1) print(f'結束行執行{i}') s.release() if __name__ == "__main__": start = time.time() s = Semaphore(5) threads = [Thread(target=func1, args=(i, )) for i in range(100)] for t in threads: t.start() for i in threads: t.join() end = time.time() print('執行完成,耗時',end - start)
按照指定順序執行程序即爲同步,不然就爲異步。網絡
from threading import Thread, Lock import time def func1(): while True: lock1.acquire() print('func1') time.sleep(0.5) lock2.release() def func2(): while True: lock2.acquire() print('func2') time.sleep(0.5) lock3.release() def func3(): while True: lock3.acquire() print('func3') time.sleep(0.5) lock1.release() if __name__ == "__main__": lock1 = Lock() lock2 = Lock() lock2.acquire() lock3 = Lock() lock3.acquire() t1 = Thread(target=func1) t2 = Thread(target=func2) t3 = Thread(target=func3) t1.start() t2.start() t3.start()
在OSI7層模型的4層模型中,應用層一下的都是由socket封裝。多線程
# tcp 服務端 from socket import * # tcp s = socket(AF_INET, SOCK_STREAM) s.bind(('localhost', 9999)) s.listen() new_socket = s.accept() # 返回的是一個套接字和客戶端的ip+端口號 # socket, addr = new_socket print(new_socket) data = new_socket[0].recv(1024) # socket.send(message.encode('utf-8')) print(data[1][0]) if len(data) == 0: # 當客戶端斷開鏈接時會發送一個長度爲0的數據,當檢測到長度爲0時,說明客戶端已經斷開鏈接p new_socket[0].close() s.close() # 客戶端 from socket import * s = socket(AF_INET, SOCK_STREAM) s.connect(('localhost', 9999)) # 客戶端請求鏈接,鏈接成功後就能夠收發消息了 s.send('hhhhh'.encode('utf-8')) s.recv(1024) s.close()
# 服務端 from socket import * s = socket(AF_INET, SOCK_DGRAM) s.bind(('localhost', 9999)) data = s.recvfrom(1024) print(data) s.close() # 客戶端 s = socket(AF_INET, SOCK_DGRAM) s.sendto('hhhh'.encode('utf-8'), ('localhost', 9999)) s.close()
from socket import * from threading import Thread import time # 客戶端/服務端(udp) class Send(Thread): def run(self): while True: data = input('>>>') s.sendto(data.encode('utf-8'), addr) time.sleep(0.5) class Receive(Thread): def run(self): while True: data = s.recvfrom(1024) print(data[0].decode('utf-8')) time.sleep(0.5) if __name__ =='__main__': s = socket(AF_INET, SOCK_DGRAM) addr = ('localhost', 9999) s.bind(('localhost', 8888)) send = Send() rece = Receive() try: send.start() rece.start() send.join() rece.join() finally: s.close()
線程實現併發
from socket import * from threading import Thread import time def deal_with_client(socket, addr): while True: data = socket.recv(1024) if len(data) == 0: break print(data.decode('utf-8')) time.sleep(0.5) socket.close() if __name__ == '__main__': s = socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 通常來講一個線程佔用一個端口,經過設置這個使得多個線程能夠訪問同一個端口 s.bind(('localhost', 8888)) s.listen() while True: new_socket, new_addr = s.accept() p = Thread(target=deal_with_client, args=(new_socket, new_addr)) p.start() s.close()