大致作的功能爲:json
1.調用api接口,獲取json數據;2.json 數據轉換爲一個csv文件;3.csv文件裏的每行數據轉換成單個xml文件;4.每5個xml文件進行打包api
用到的模塊爲:安全
csv,xml,threading,tarfile,queue多線程
守護線程app
備忘錄:測試
多線程之間通訊,能夠用一個安全的隊列queueurl
多線程之間線程等待(wait)與通知(set)線程
用死循環進行等待通知,跳出死循環,線程爲守護線程,給個flag ,而後break設計
''' Created on 2018年10月4日 @author: 徐良俊 ****線程都須要交給cup 隨機執行,意思就是線程所有啓動 線程間通訊或線程間通知須要人爲控制 **** 設計模塊: csv xml.etree.ElementTree threading queue ''' import requests,logging,csv,time from functools import wraps from threading import Thread,Event from xml.etree.ElementTree import ElementTree,tostring,Element from queue import Queue import tarfile import os class DownloadThread(Thread): '''下載線程 IO密集性操做 "http://hq.sinajs.cn/list=%s" 得到數據格式爲: var hq_str_sh601015="陝西黑貓,6.400,6.420,6.440,6.470,6.390,6.440,6.450,3377008,21697080.000,53120,6.440,91300,6.430,92750,6.420,131000,6.410,131800,6.400,49600,6.450,25200,6.460,15700,6.470,11700,6.480,14200,6.490,2018-09-28,15:00:00,00"; ''' def __init__(self,sid,queue): Thread.__init__(self) self.sid=sid self.url="http://hq.sinajs.cn/list=%s" % sid self.queue = queue def download(self,url): response=requests.get(url,timeout=3000) response.encoding="GBK" if response.ok: content=response.text maincontent_list = content[content.find("\"")+1:content.rfind("\"")].split(",") # info="股票名稱:%s;今日開盤價:%s;昨日收盤價:%s;日期:%s" % (maincontent_list[0],maincontent_list[1],maincontent_list[2],maincontent_list[30]+" "+maincontent_list[31],) # logging.warn(info) return maincontent_list def run(self): data=self.download(self.url) self.queue.put((self.sid,data)) class ConvertCSVThread(Thread): def __init__(self,queue,csvEvent): Thread.__init__(self) self.queue=queue self.csvEvent=csvEvent def dataToCsv(self,writer): while True: csvdata = [] sid,data=self.queue.get() if sid == -1: break if data and data[0] != '' : csvdata.append((data[0],data[1],data[2],data[30],)) writer.writerows(csvdata) def run(self): csvFile = open( "fundnav.csv" , "w",encoding='utf-8-sig') writer = csv.writer(csvFile,lineterminator='\n') csvdata = [] csvdata.append(("基金名稱","開盤價","收盤價","日期",)) writer.writerows(csvdata) self.dataToCsv(writer) csvFile.close() self.csvEvent.set() #告訴其餘線程,文件已經建立好了,你能夠作事了 class ConvertXMLThread(Thread): ''' 把csv每行文件轉變爲單個xml ''' def __init__(self,cEvent,tEvent,csvEvent): Thread.__init__(self) self.cEvent=cEvent self.tEvent=tEvent self.csvEvent=csvEvent def csvToXml(self,scsv): with open(scsv,'r',encoding='utf-8-sig') as f: reader=csv.reader(f) headers=next(reader) headers=list(map(lambda h:h.replace(' ',''),headers)) readerlist=list(reader) index=0 #記錄5個一組 sycount=len(readerlist) #總數 itindex=0 #seek for row in readerlist: index+=1 itindex+=1 root = Element('Data') eRow = Element('Row') for tag,text in zip(headers,row): e=Element(tag) e.text=text eRow.append(e) root.append(eRow) # print(tostring(root,encoding='unicode',method ='xml' ) ) et=ElementTree(root) et.write("tarxml/%s.xml" % index,"utf-8") if(sycount-itindex) < 5 and sycount==itindex: #處理最後不足5個 self.cEvent.set() #通知其餘線程,轉換完畢,你能夠作事了 self.tEvent.wait() #阻塞 break elif index == 5: #每5個打一個包 self.cEvent.set() #通知其餘線程,轉換完畢,你能夠作事了 self.tEvent.wait() #阻塞 self.tEvent.clear() index = 0 def run(self): while True: self.csvEvent.wait() self.csvToXml("fundnav.csv") break class TARThread(Thread): '''此線程爲守護線程,主線程結束,此線程跟着結束''' def __init__(self,cEvent,tEvent): Thread.__init__(self) self.count=0 self.cEvent=cEvent self.tEvent=tEvent self.setDaemon(True) #守護線程,主線程結束,此線程跟着結束 def tarXML(self): self.count+=1 tfname='%d.tgz' % self.count tf = tarfile.open( tfname ,'w:gz') for fname in os.listdir('./tarxml'): if fname.endswith('.xml'): tf.add('./tarxml/%s' % fname) os.remove('./tarxml/%s' % fname) tf.close() if not tf.members: os.remove(tfname) def run(self): while True: self.cEvent.wait() #阻塞,不繼續執行,等待通知(set) self.tarXML() self.cEvent.clear() self.tEvent.set() def handle(): '''測試 多個生產者:IO生產者 一個消費者:CPU密集消費者, 只要 queque 有數據就處理 線程之間交互經過 queque 對象 ''' start=time.time() queue=Queue() codes=['sh601006','sh601005','sh601003','sh601002','sh601001','sh601007','sh601008', 'sh601009','sh601010','sh601011','sh601012','sh601013','sh601014','sh601015','sh601016','sh601017', 'sh601018','sh601019','sh601020','sh601021','sh601022','sh601023','sh601024','sh601025','sh601026'] csvEvent=Event() #控制 把csv 文件轉變成 成多個xml 文件時,確保csv文件以及存在 ct=ConvertCSVThread(queue,csvEvent) dts=[DownloadThread(codes[i],queue) for i in range(len(codes))] #線程都啓動起來,等待cup 調用 for t in dts: t.start() ct.start() cEvent=Event() tEvent=Event() cxml=ConvertXMLThread(cEvent,tEvent,csvEvent) tart=TARThread(cEvent,tEvent) cxml.start() tart.start() #只有在t線程執行完,才能執行主線程 for t in dts: t.join() queue.put((-1,None)) #只有在ct線程執行完,才能執行主線程 ct.join() print("耗時 %s" % (time.time() - start) ) handle()