1 import multiprocessing 2 import time,threading 3 4 def thread_run(): #線程函數 5 print(threading.get_ident()) 6 7 def run(name): #進程函數 8 time.sleep(2) 9 print("hello",name) 10 t=threading.Thread(target=thread_run,) #在進程中啓動線程 11 t.start() 12 13 if __name__ == "__main__": 14 for i in range(10): 15 p=multiprocessing.Process(target=run,args=('bob %s' %i,)) #啓動進程 16 p.start()
1 from multiprocessing import Process 2 import os 3 4 def info(title): 5 print(title) 6 print('module name:', __name__) #獲取模塊名稱即父進程名稱 7 print('parent process:', os.getppid()) #獲取父進程ID 8 print('process id:', os.getpid()) #獲取本進程的ID 9 print("\n\n") 10 11 def f(name): 12 info('called from child process function f') 13 print('hello', name) 14 15 if __name__ == '__main__': 16 info('main process line') #父進程爲主函數'__main__', 17 p = Process(target=f, args=('bob',)) #父進程爲info函數 18 p.start()
1 from multiprocessing import Process, Queue #進程間通訊使用的是multiprocessing中的Queue 2 import threading 3 4 def f(qq): 5 print("in child:",qq.qsize()) 6 qq.put([42, None, 'hello']) #發數據 7 8 if __name__ == '__main__': 9 q = Queue() 10 q.put("test123") 11 p = Process(target=f, args=(q,)) 12 p.start() 13 p.join() 14 print("444",q.get_nowait()) #收數據 15 print("444",q.get_nowait()) #收數據
1 from multiprocessing import Process, Pipe 2 3 def f(conn): 4 conn.send([42, None, 'hello from child']) 5 conn.send([42, None, 'hello from child2']) 6 print("from parent:",conn.recv()) # prints "from parent:張洋可好" 7 conn.close() 8 9 if __name__ == '__main__': 10 parent_conn, child_conn = Pipe() #生成管道實例 11 p = Process(target=f, args=(child_conn,)) 12 p.start() 13 print(parent_conn.recv()) # prints "[42, None, 'hello from child']" 14 print(parent_conn.recv()) # prints "[42, None, 'hello from child2']" 15 parent_conn.send("張洋可好") 16 p.join()
1 from multiprocessing import Process, Manager 2 import os 3 def f(d, l): 4 d[os.getpid()] =os.getpid() #在字典中添加進程ID 5 l.append(os.getpid()) #在列表中添加進程ID 6 7 if __name__ == '__main__': 8 with Manager() as manager: 9 d = manager.dict() #生成一個字典,可在多個進程間共享和傳遞,不能使用d={} 10 l = manager.list(range(5))#生成一個列表,可在多個進程間共享和傳遞 11 p_list = [] 12 for i in range(10): 13 p = Process(target=f, args=(d, l)) 14 p.start() 15 p_list.append(p) 16 for res in p_list: #等待結果 17 res.join() 18 print(d) #打印字典 19 print(l) #打印列表
1 from multiprocessing import Process, Lock 2 3 def f(l, i): 4 l.acquire() 5 print('hello world', i) 6 l.release() 7 8 if __name__ == '__main__': 9 lock = Lock() 10 for num in range(100): 11 Process(target=f, args=(lock, num)).start()
1 from multiprocessing import Pool 2 import time,os 3 def Foo(i): 4 time.sleep(2) 5 print("in process",os.getpid()) 6 return i + 100 7 8 def Bar(arg): 9 print('-->exec done:', arg,os.getpid()) 10 11 if __name__ == '__main__': #手動執行時執行這句下面的指令,可是使用其餘腳本調用時,下面的指令不執行 12 pool = Pool(5) #容許進程池同時放入5個進程,Pool(processes=5) 13 print("主進程",os.getpid()) 14 for i in range(10): 15 pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回調(由主進程負責調用),執行完Foo以後再去執行Bar 16 #pool.apply(func=Foo, args=(i,)) #串行 17 #pool.apply_async(func=Foo, args=(i,)) #異步,並行 18 print('end') 19 pool.close() #先close在join,和進程、線程的先join再close方法相反 20 pool.join() #進程池中進程執行完畢後再關閉,若是不join,那麼程序直接關閉。
1 from greenlet import greenlet 2 def test1(): 3 print(12) 4 gr2.switch() #切換到gr2 5 print(34) 6 gr2.switch() 7 def test2(): 8 print(56) 9 gr1.switch() 10 print(78) 11 12 gr1 = greenlet(test1) #啓動一個攜程 13 gr2 = greenlet(test2) 14 gr1.switch() #切換到gr1(test1)開始執行 15 #先打印12;test1中切換到gr2,打印56;test2中切換到gr1,打印34;test1中再次切換到gr2,打印78
1 import gevent 2 def foo(): 3 print('Running in foo') 4 gevent.sleep(2) 5 print('Explicit context switch to foo again') 6 def bar(): 7 print('Explicit精確的 context內容 to bar') 8 gevent.sleep(1) 9 print('Implicit context switch back to bar') 10 def func3(): 11 print("running func3 ") 12 gevent.sleep(0) 13 print("running func3 again ") 14 15 gevent.joinall([ 16 gevent.spawn(foo), #以列表的形式生成協程實例 17 gevent.spawn(bar), 18 gevent.spawn(func3), 19 ]) 20 21 # 輸出結果: 22 # Running in foo 23 # Explicit精確的 context內容 to bar 24 # running func3 25 # running func3 again 26 # Implicit context switch back to bar 27 # Explicit context switch to foo again
1 import gevent,time 2 from urllib.request import urlopen 3 from gevent import monkey 4 #不需使用monkey.patch_all(),不然使用的協程gevent檢測不到urllib中的IO操做 5 monkey.patch_all() #把當前程序的全部的IO操做給我單獨的作上標記 6 7 def f(url): 8 print('GET: %s' % url) 9 resp = urlopen(url) 10 data = resp.read() 11 print('%d bytes received from %s.' % (len(data), url)) 12 13 #並行 14 async_time_start = time.time() 15 gevent.joinall([ 16 gevent.spawn(f, 'https://www.python.org/'), 17 gevent.spawn(f, 'https://www.yahoo.com/'), 18 gevent.spawn(f, 'https://github.com/'), 19 ]) 20 print("異步cost",time.time()-async_time_start ) 21 22 #串行 23 time_start = time.time() 24 urls = [ 'https://www.python.org/', 25 'https://www.yahoo.com/', 26 'https://github.com/' 27 ] 28 for url in urls: 29 f(url) 30 print("同步cost",time.time() - time_start)
1 import gevent 2 from gevent import socket, monkey 3 monkey.patch_all() 4 5 def server(port): 6 s = socket.socket() 7 s.bind(('0.0.0.0', port)) 8 s.listen(500) 9 while True: 10 cli, addr = s.accept() 11 gevent.spawn(handle_request, cli) 12 13 def handle_request(conn): 14 try: 15 while True: 16 data = conn.recv(1024) 17 print("recv:", data) 18 conn.send(data) 19 if not data: 20 conn.shutdown(socket.SHUT_WR) 21 except Exception as ex: 22 print(ex) 23 finally: 24 conn.close() 25 26 if __name__ == '__main__': 27 server(8001)
1 import socket 2 HOST = 'localhost' 3 PORT = 8001 4 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5 s.connect((HOST, PORT)) 6 while True: 7 msg = bytes(input(">>:"), encoding="utf8") 8 s.sendall(msg) 9 data = s.recv(1024) 10 print('Received', data) 11 s.close()
1 import select,socket,queue 2 #建立服務端實例 3 server = socket.socket() 4 server.bind(('localhost',9000)) 5 server.listen(1000) 6 #設置爲非阻塞模式 7 server.setblocking(False) 8 #初始化一個隊列,後面存要返回給客戶端的數據 9 msg_dic={} 10 #存儲連接的列表 11 inputs = [server,] #inputs = [server,conn,conn2] #[conn2,] 12 outputs = [] #outputs = [r1,] 13 14 while inputs: 15 #(inputs, outputs, inputs) 16 #內核須要監測的連接,須要返回的鏈接隊列(須要server端給發數據的客戶端),返回異常的連接(須要將須要監控的連接傳給它,因此和input是同樣的) 17 readable ,writeable,exceptional= select.select(inputs, outputs, inputs ) 18 # print(readable,writeable,exceptional) 19 for r in readable: 20 if r is server: #若是readable中返回的是server,則表明是新連接 21 conn,addr = server.accept() 22 print("來了個新鏈接:",addr) 23 #由於這個新創建的鏈接還沒發數據過來,如今就接收的話程序就報錯了, 24 #因此要想實現客戶端發數據來時server端能知道,就須要讓select再監測這個conn 25 inputs.append(conn) 26 #給新連接初始化一個隊列,用來存服務端給客戶端傳送的數據 27 msg_dic[conn]=queue.Queue() 28 #將這個連接設置爲非阻塞模式 29 conn.setblocking(False) 30 else: #不然就是舊的連接,直接收數據便可 31 data = r.recv(1024) 32 if data: #接收到數據,說明連接是正常的 33 print("收到來自",r.getpeername(),"的數據:",data) 34 if r not in outputs: 35 outputs.append(r) # 將連接放入須要返回數據的連接隊列裏 36 msg_dic[r].put(data.upper()) #將接收到的數據變爲大寫後存在字典中 37 else: #不然連接是斷開的 38 print("連接關閉:",addr) 39 #刪除斷開的連接的全部數據 40 if r in outputs: 41 outputs.remove(r) 42 inputs.remove(r) 43 r.close() 44 del msg_dic[r] 45 46 #服務端接收到客戶端的數據以後下一次循環的時候再發送給客戶端 47 for w in writeable: #要返回給客戶端的鏈接列表 48 try: 49 data_to_client = msg_dic[w].get_nowait() #將字典中要發送的數據取出 50 except queue.Empty: 51 print("此連接無數據要返回:",w) 52 outputs.remove(w) 53 else: 54 w.send(data_to_client) #返回給客戶端源數據 55 #確保下次循環的時候writeable,不返回這個已經處理完的鏈接了 56 outputs.remove(w) 57 58 #刪除錯誤連接的全部內容 59 for e in exceptional: 60 print("此連接出錯:",e) 61 inputs.remove(e) 62 if e in outputs: 63 outputs.remove(e) 64 e.close() 65 del msg_dic[e]
客戶端node
1 import socket 2 HOST = 'localhost' 3 PORT = 9000 4 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5 s.connect((HOST, PORT)) 6 while True: 7 msg = bytes(input(">>:"), encoding="utf8") 8 s.sendall(msg) 9 data = s.recv(1024) 10 print('Received', data) 11 s.close()
# 添加以下的行
* soft noproc 11000
* hard noproc 11000
* soft nofile 4100
* hard nofile 4100python
說明:* 表明針對全部用戶,noproc 是表明最大進程數,nofile 是表明最大文件打開數linux
ulimit -u 10000
ulimit -n 4096
ulimit -d unlimited
ulimit -m unlimited
ulimit -s unlimited
ulimit -t unlimited
ulimit -v unlimitedgit
加上:
* soft nofile 8192
* hard nofile 20480github
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() #生成selectors對象 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print('accepted', conn, 'from', addr,mask) 9 conn.setblocking(False) #將連接設置爲非阻塞模式 10 sel.register(conn, selectors.EVENT_READ, read) #新鏈接註冊read回調函數 11 12 def read(conn, mask): 13 data = conn.recv(1024) # Should be ready 14 if data: 15 print('echoing', repr(data), 'to', conn) 16 conn.send(data) # Hope it won't block 17 else: 18 print('closing', conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind(('localhost', 9998)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() #默認阻塞,有活動鏈接就返回活動的鏈接列表 30 for key, mask in events: 31 callback = key.data #調用accept 32 callback(key.fileobj, mask) #key.fileobj就是那個socket連接
1 import socket 2 messages = [ b'This is the message. ', 3 b'It will be sent ', 4 b'in parts.', 5 ] 6 server_address = ('localhost', 9998) 7 8 # Create a TCP/IP socket 9 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)] 10 print(socks) 11 # Connect the socket to the port where the server is listening 12 print('connecting to %s port %s' % server_address) 13 for s in socks: 14 s.connect(server_address) 15 16 for message in messages: 17 # Send messages on both sockets 18 for s in socks: 19 print('%s: sending "%s"' % (s.getsockname(), message) ) 20 s.send(message) 21 # Read responses on both sockets 22 for s in socks: 23 data = s.recv(1024) 24 print( '%s: received "%s"' % (s.getsockname(), data) ) 25 if not data: 26 print( 'closing socket', s.getsockname() )