【python 多線程】下載文件分批壓縮

大致作的功能爲:json

     1.調用api接口,獲取json數據;2.json 數據轉換爲一個csv文件;3.csv文件裏的每行數據轉換成單個xml文件;4.每5個xml文件進行打包api

用到的模塊爲:安全

       csv,xml,threading,tarfile,queue多線程

       守護線程app

       

備忘錄:測試

       多線程之間通訊,能夠用一個安全的隊列queueurl

       多線程之間線程等待(wait)與通知(set)線程

       用死循環進行等待通知,跳出死循環,線程爲守護線程,給個flag ,而後break設計

'''
Created on 2018年10月4日

@author: 徐良俊


****線程都須要交給cup 隨機執行,意思就是線程所有啓動
線程間通訊或線程間通知須要人爲控制 ****

設計模塊:
    csv
    xml.etree.ElementTree
    threading
    queue
    


'''
import requests,logging,csv,time
from functools import wraps
from threading import Thread,Event
from xml.etree.ElementTree import ElementTree,tostring,Element

from queue import Queue

import tarfile
import os

class DownloadThread(Thread):
    '''下載線程
    IO密集性操做
    
    
    "http://hq.sinajs.cn/list=%s"  得到數據格式爲:
    var hq_str_sh601015="陝西黑貓,6.400,6.420,6.440,6.470,6.390,6.440,6.450,3377008,21697080.000,53120,6.440,91300,6.430,92750,6.420,131000,6.410,131800,6.400,49600,6.450,25200,6.460,15700,6.470,11700,6.480,14200,6.490,2018-09-28,15:00:00,00";
    '''
    def __init__(self,sid,queue):
        Thread.__init__(self)
        self.sid=sid
        self.url="http://hq.sinajs.cn/list=%s" % sid
        self.queue = queue
        

    def download(self,url):
        response=requests.get(url,timeout=3000)
        response.encoding="GBK"
        if response.ok:
            content=response.text 
            maincontent_list = content[content.find("\"")+1:content.rfind("\"")].split(",")
#             info="股票名稱:%s;今日開盤價:%s;昨日收盤價:%s;日期:%s" % (maincontent_list[0],maincontent_list[1],maincontent_list[2],maincontent_list[30]+" "+maincontent_list[31],)
#             logging.warn(info)
        return maincontent_list
    
    
    def run(self):
        data=self.download(self.url)
        self.queue.put((self.sid,data))
        
        
        
        
        
        
class ConvertCSVThread(Thread):

    def __init__(self,queue,csvEvent):
        Thread.__init__(self)     
        self.queue=queue   
        self.csvEvent=csvEvent
        
    def dataToCsv(self,writer):
        
        while True:
            csvdata = []
            sid,data=self.queue.get()
            if sid == -1:
                break
            if data and data[0] != '' :
                csvdata.append((data[0],data[1],data[2],data[30],))
                writer.writerows(csvdata)    
        
        
    def run(self):
        
        csvFile = open( "fundnav.csv" , "w",encoding='utf-8-sig')
        writer = csv.writer(csvFile,lineterminator='\n')
        csvdata = []
        csvdata.append(("基金名稱","開盤價","收盤價","日期",))
        writer.writerows(csvdata)    
        
        self.dataToCsv(writer)
        csvFile.close()
         
        self.csvEvent.set() #告訴其餘線程,文件已經建立好了,你能夠作事了
        

class ConvertXMLThread(Thread):
    ''' 
    把csv每行文件轉變爲單個xml  
        
    '''
    
    
    def __init__(self,cEvent,tEvent,csvEvent):
        Thread.__init__(self)     
        self.cEvent=cEvent
        self.tEvent=tEvent  
        self.csvEvent=csvEvent  
            
    def csvToXml(self,scsv):
        with open(scsv,'r',encoding='utf-8-sig') as f:
            reader=csv.reader(f)
            headers=next(reader)
            headers=list(map(lambda h:h.replace(' ',''),headers))
            
            
            readerlist=list(reader)
            
            index=0  #記錄5個一組
            sycount=len(readerlist) #總數
            itindex=0  #seek
            
            
            for row in readerlist:
                index+=1
                itindex+=1
                
                root = Element('Data')
                eRow = Element('Row')
                for tag,text in zip(headers,row):
                    e=Element(tag)
                    e.text=text
                    eRow.append(e)
                root.append(eRow)
                
#                 print(tostring(root,encoding='unicode',method ='xml' ) )
                 
                et=ElementTree(root)
                et.write("tarxml/%s.xml" % index,"utf-8")
                
                
                if(sycount-itindex) < 5 and sycount==itindex: #處理最後不足5個
                    self.cEvent.set() #通知其餘線程,轉換完畢,你能夠作事了
                    self.tEvent.wait() #阻塞
                    break
                elif index == 5: #每5個打一個包
                    self.cEvent.set()   #通知其餘線程,轉換完畢,你能夠作事了
                    self.tEvent.wait() #阻塞  
                    self.tEvent.clear()
                    index = 0
        
    def run(self):
        while True:
            self.csvEvent.wait() 
            self.csvToXml("fundnav.csv")
            break
 

class TARThread(Thread):
    '''此線程爲守護線程,主線程結束,此線程跟着結束'''
    def __init__(self,cEvent,tEvent):
        Thread.__init__(self)
        self.count=0 
        self.cEvent=cEvent
        self.tEvent=tEvent  
        self.setDaemon(True) #守護線程,主線程結束,此線程跟着結束
            
    def tarXML(self):
        self.count+=1
        tfname='%d.tgz' % self.count
        tf = tarfile.open( tfname ,'w:gz')
        for fname in os.listdir('./tarxml'):
            if fname.endswith('.xml'):
                tf.add('./tarxml/%s' % fname)
                os.remove('./tarxml/%s' % fname)
        tf.close()
         
        if not tf.members:
            os.remove(tfname) 
        
    def run(self):
        while True:
            self.cEvent.wait() #阻塞,不繼續執行,等待通知(set)
            self.tarXML() 
            self.cEvent.clear() 
            
            self.tEvent.set() 
 

  

def handle():
    '''測試
    
    多個生產者:IO生產者
一個消費者:CPU密集消費者,  只要 queque 有數據就處理 
線程之間交互經過 queque 對象
    
    '''      
    start=time.time()       
    
    queue=Queue()
    codes=['sh601006','sh601005','sh601003','sh601002','sh601001','sh601007','sh601008',
           'sh601009','sh601010','sh601011','sh601012','sh601013','sh601014','sh601015','sh601016','sh601017',
           'sh601018','sh601019','sh601020','sh601021','sh601022','sh601023','sh601024','sh601025','sh601026']       
    
    csvEvent=Event()    #控制 把csv 文件轉變成  成多個xml 文件時,確保csv文件以及存在 
    ct=ConvertCSVThread(queue,csvEvent)    
    dts=[DownloadThread(codes[i],queue) for i in range(len(codes))]
     
    #線程都啓動起來,等待cup 調用 
    for t in dts:
        t.start()
    ct.start()
    
    cEvent=Event()
    tEvent=Event()
    cxml=ConvertXMLThread(cEvent,tEvent,csvEvent)
    tart=TARThread(cEvent,tEvent)   
    cxml.start()     
    tart.start()
     
    #只有在t線程執行完,才能執行主線程 
    for t in dts:
        t.join()
      
    queue.put((-1,None)) 
     
    #只有在ct線程執行完,才能執行主線程 
    ct.join() 
     
    print("耗時 %s" % (time.time() - start) )



handle()
相關文章
相關標籤/搜索