def storFile(data,fileName,method='a'): with open(fileName,method,newline ='') as f: f.write(data) pass pass storFile('123', '1.txt')
with open('1.txt','r') as f: print(f.read())
把內存中的數據變爲可保存和共享,實現狀態保存。cPickle使用C語言編寫,效率高,優先使用。若是不存在則使用pickle。pickle使用dump和dumps實現序列化。html
try: import cPickle as pickle except ImportError: import pickle d=dict(url='index.html',title='1',content='2') f=open('2.txt','wb') pickle.dump(d,f) f.close() print(pickle.dumps(d))
使用load實現反序列化python
try: import cPickle as pickle except ImportError: import pickle f=open('2.txt','rb') d=pickle.load(f) f.close() print(d)
多進程使用os的fork複製徹底相同的進程,並對子進程返回0,對父進程返回子進程的pid。只在linux/unix中使用。linux
import os if __name__ == '__main__':
pid=os.fork() if pid<0: print('error pid') elif pid==0: print('child ,parent pid',os.getpid(),os.getppid()) else: print('parent pid,create child ',os.getpid,pid)
使用multiprocessing模塊建立進程,使用start啓動進程,使用join同步。git
import os from multiprocessing import Process def run_proc(name): print('name ,child pid running',name,os.getpid()) if __name__ == '__main__': print('parent pid',os.getpid()) for i in range(5): p=Process(target=run_proc,args=(str(i),)) print('Process will start') p.start() p.join() print('end')
使用multiprocessing模塊中的Pool限定進程數量github
import os from multiprocessing import Process,Pool import random,time def run_proc(name): print('name ,child pid running ',name,os.getpid()) time.sleep(random.random()*10) print('name ,child pid running end',name,os.getpid()) if __name__ == '__main__': print('parent pid',os.getpid()) p=Pool(processes=3) for i in range(10): p.apply_async(run_proc,args=(i,)) print('wait') p.close() p.join() print('end')
適用多進程間通訊,採用put和get方法。編程
import os from multiprocessing import Process,Queue import time,random def write_proc(q,urls): print('w processing ',os.getpid(),'is running') for u in urls: q.put(u) print('put :',u) time.sleep(random.random()) pass def read_proc(q): print('r processing ',os.getpid(),'is running') while(True): u=q.get(True) print('get:',u) pass if __name__ == '__main__': q=Queue() w1=Process(target=write_proc,args=(q,['u1','u2','u3'])) w2=Process(target=write_proc,args=(q,['u4','u5','u6'])) r1=Process(target=read_proc,args=(q,)) w1.start() w2.start() r1.start() w1.join() w2.join() r1.terminate() pass
Pipe方法返回conn1和conn2,全雙工模式下都可收發(Pipe方法中duplex參數控制),經過send和recv控制。windows
import os from multiprocessing import Process,Pipe import time,random def send_proc(p,urls): print('s processing ',os.getpid(),'is running') for u in urls: p.send(u) print('send :',u) time.sleep(random.random()) pass def receive_proc(p): print('r processing ',os.getpid(),'is running') while(True): u=p.recv() print('receive:',u) pass if __name__ == '__main__': p=Pipe() p1=Process(target=send_proc,args=(p[0],['u1','u2','u3'])) p2=Process(target=receive_proc,args=(p[1],)) p1.start() p2.start() p1.join() p2.terminate() pass
一點理解。使用threading模塊建立多線程服務器
import time,random,threading def run_proc(url): print('threading name',threading.current_thread().name) for u in url: print(threading.current_thread().name,'----->',u) time.sleep(random.random()) print('end ',threading.current_thread().name) pass if __name__ == '__main__': print('running :',threading.current_thread().name) w1=threading.Thread(target=run_proc,name='T1',args=(['u1','u2','u3'],)) w2=threading.Thread(target=run_proc,name='T2',args=(['u4','u5','u6'],)) w1.start() w2.start() w1.join() w2.join() print('end') pass
使用threading.Thread繼承建立線程類:代碼源:https://github.com/qiyeboy/SpiderBook網絡
import random import threading import time class myThread(threading.Thread): def __init__(self,name,urls): threading.Thread.__init__(self,name=name) self.urls = urls def run(self): print('Current %s is running...' % threading.current_thread().name) for url in self.urls: print('%s ---->>> %s' % (threading.current_thread().name,url)) time.sleep(random.random()) print('%s ended.' % threading.current_thread().name) print('%s is running...' % threading.current_thread().name) t1 = myThread(name='Thread_1',urls=['url_1','url_2','url_3']) t2 = myThread(name='Thread_2',urls=['url_4','url_5','url_6']) t1.start() t2.start() t1.join() t2.join() print('%s ended.' % threading.current_thread().name)
線程同步以保護數據,主要有Lock和RLock兩種方案。參閱。另外,全局解釋鎖的存在,限制了線程資源訪問,在CPU密集場合傾向使用多進程。對於IO密集型場合,使用多線程。多線程
import threading mylock = threading.RLock() num=0 class myThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self,name=name) def run(self): global num while True: mylock.acquire() print( '%s locked, Number: %d'%(threading.current_thread().name, num)) if num>=100: mylock.release() print( '%s released, Number: %d'%(threading.current_thread().name, num)) break num+=1 print( '%s released, Number: %d'%(threading.current_thread().name, num)) mylock.release() if __name__== '__main__': thread1 = myThread('Thread_1') thread2 = myThread('Thread_2') thread1.start() thread2.start()
from gevent import monkey; monkey.patch_all() import gevent import urllib.request as urllib2 def run_task(url): print('Visit --> %s' % url) try: response = urllib2.urlopen(url) data = response.read() print('%d bytes received from %s.' % (len(data), url)) except Exception: print(Exception) if __name__=='__main__': urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/'] greenlets = [gevent.spawn(run_task, url) for url in urls ] gevent.joinall(greenlets)
支持的池
from gevent import monkey monkey.patch_all() import urllib.request as urllib2 from gevent.pool import Pool def run_task(url): print('Visit --> %s' % url) try: response = urllib2.urlopen(url) data = response.read() print('%d bytes received from %s.' % (len(data), url)) except Exception: print(Exception) return 'url:%s --->finish'% url if __name__=='__main__': pool = Pool(2) urls = ['https://github.com/','https://www.python.org/','http://www.cnblogs.com/'] results = pool.map(run_task,urls) print(results)
建立服務進程(Windows)代碼源
import queue as Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support #任務個數 task_number = 10 #定義收發隊列 task_queue = Queue.Queue(task_number) result_queue = Queue.Queue(task_number) def get_task(): return task_queue def get_result(): return result_queue # 建立相似的QueueManager: class QueueManager(BaseManager): pass def win_run(): #windows下綁定調用接口不能使用lambda,因此只能先定義函數再綁定 QueueManager.register('get_task_queue',callable = get_task) QueueManager.register('get_result_queue',callable = get_result) #綁定端口並設置驗證口令,windows下須要填寫ip地址,linux下不填默認爲本地 manager = QueueManager(address = ('127.0.0.1',8001),authkey = b'qiye') #啓動 manager.start() try: #經過網絡獲取任務隊列和結果隊列 task = manager.get_task_queue() result = manager.get_result_queue() #添加任務 for url in ["ImageUrl_"+str(i) for i in range(10)]: print('put task %s ...' %url) task.put(url) print('try get result...') for i in range(10): print('result is %s' %result.get(timeout=10)) except: print('Manager error') finally: #必定要關閉,不然會爆管道未關閉的錯誤 manager.shutdown() if __name__ == '__main__': #windows下多進程可能會有問題,添加這句能夠緩解 freeze_support() win_run()
建立任務進程:python後續版本的修正:
#coding:utf-8 import time from multiprocessing.managers import BaseManager # 建立相似的QueueManager: class QueueManager(BaseManager): pass # 實現第一步:使用QueueManager註冊獲取Queue的方法名稱 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 實現第二步:鏈接到服務器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和驗證口令注意保持與服務進程設置的徹底一致: m = QueueManager(address=(server_addr, 8001), authkey='qiye') # 從網絡鏈接: m.connect() # 實現第三步:獲取Queue的對象: task = m.get_task_queue() result = m.get_result_queue() # 實現第四步:從task隊列取任務,並把結果寫入result隊列: while(not task.empty()): image_url = task.get(True,timeout=5) print('run task download %s...' % image_url) time.sleep(1) result.put('%s--->success'%image_url) # 處理結束: print('worker exit.')
建立Linux版本的服務進程:(未測試)
服務進程(taskManager.py)(linux版) import random,time,Queue from multiprocessing.managers import BaseManager #實現第一步:創建task_queue和result_queue,用來存聽任務和結果 task_queue=Queue.Queue() result_queue=Queue.Queue() class Queuemanager(BaseManager): pass #實現第二步:把建立的兩個隊列註冊在網絡上,利用register方法,callable參數關聯了Queue對象, # 將Queue對象在網絡中暴露 Queuemanager.register('get_task_queue',callable=lambda:task_queue) Queuemanager.register('get_result_queue',callable=lambda:result_queue) #實現第三步:綁定端口8001,設置驗證口令‘qiye’。這個至關於對象的初始化 manager=Queuemanager(address=('',8001),authkey='qiye') #實現第四步:啓動管理,監聽信息通道 manager.start() #實現第五步:經過管理實例的方法得到經過網絡訪問的Queue對象 task=manager.get_task_queue() result=manager.get_result_queue() #實現第六步:添加任務 for url in ["ImageUrl_"+str(i) for i in range(10)]: print 'put task %s ...' %url task.put(url) #獲取返回結果 print 'try get result...' for i in range(10): print 'result is %s' %result.get(timeout=10) #關閉管理 manager.shutdown()
建立服務端
#coding:utf-8 import socket import threading import time def dealClient(sock, addr): #第四步:接收傳來的數據,併發送給對方數據 print('Accept new connection from %s:%s...' % addr) sock.send(b'Hello,I am server!') while True: data = sock.recv(1024) time.sleep(1) if not data or data.decode('utf-8') == 'exit': break print('-->>%s!' % data.decode('utf-8')) sock.send(('Loop_Msg: %s!' % data.decode('utf-8')).encode('utf-8')) #第五步:關閉套接字 sock.close() print('Connection from %s:%s closed.' % addr) if __name__=="__main__": #第一步:建立一個基於IPv4和TCP協議的Socket # 套接字綁定的IP(127.0.0.1爲本機ip)與端口 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('127.0.0.1', 999)) #第二步:監聽鏈接 s.listen(5) print('Waiting for connection...') while True: # 第三步:接受一個新鏈接: sock, addr = s.accept() # 建立新線程來處理TCP鏈接: t = threading.Thread(target=dealClient, args=(sock, addr)) t.start()
建立客戶端
#coding:utf-8 import socket #初始化Socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #鏈接目標的ip和端口 s.connect(('127.0.0.1', 999)) # 接收消息 print('-->>'+s.recv(1024).decode('utf-8')) # 發送消息 s.send(b'Hello,I am a client') print('-->>'+s.recv(1024).decode('utf-8')) s.send(b'exit') #關閉套接字 s.close()
服務端與客戶端
import socket #建立Socket,綁定指定的ip和端口 #SOCK_DGRAM指定了這個Socket的類型是UDP。綁定端口和TCP同樣。 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('127.0.0.1', 9999)) print('Bind UDP on 9999...') while True: # 直接發送數據和接收數據 data, addr = s.recvfrom(1024) print('Received from %s:%s.' % addr) s.sendto(b'Hello, %s!' % data, addr) import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for data in [b'Hello', b'World']: # 發送數據: s.sendto(data, ('127.0.0.1', 9999)) # 接收數據: print(s.recv(1024).decode('utf-8')) s.close()