PYTHON多線程行情抓取工具實現

思路

藉助python當中threading模塊與Queue模塊組合能夠方便的實現基於生產者-消費者模型的多線程模型。Jimmy大神的tushare一直是廣大python數據分析以及業餘量化愛好者喜好的免費、開源的python財經數據接口包。html

平時一直有在用阿里雲服務器經過tushare的接口自動落地相關財經數據,但日復權行情數據以往在串行下載的過程中,速度比較慢,有時遇到網絡緣由還須要重下。每隻股票的行情下載過程當中都須要完成下載、落地2個步驟,一個可能須要網絡開銷、一個須要數據庫mysql的存取開銷。2者本來就能夠獨立並行執行,是個典型的「生產者-消費者」模型。python

基於queue與threading模塊的線程使用通常採用如下的套路:mysql

producerQueue=Queue()
consumerQueue=Queue()
lock = threading.Lock()
class producerThead(threading.Thread):
    def __init__(self, producerQueue,consumerQueue):
        self.producerQueue=producerQueue
        self.consumerQueue=consumerQueue



    def run(self):
        while not self.thread_stop:
            try:
                #接收任務,若是連續20秒沒有新的任務,線程退出,不然會一直執行
                item=self.producerQueue.get(block=True, timeout=20)
                #阻塞調用進程直到有數據可用。若是timeout是個正整數,
                #阻塞調用進程最多timeout秒,
                #若是一直無數據可用,拋出Empty異常(帶超時的阻塞調用)
            except Queue.Empty:
                print("Nothing to do!thread exit!")
                self.thread_stop=True
                break
            #實現生產者邏輯,生成消費者須要處理的內容 consumerQueue.put(someItem)
            #還能夠邊處理,邊生成新的生產任務
            doSomethingAboutProducing()
            self.producerQueue.task_done()
    def stop(self):
        self.thread_stop = True

class consumerThead(threading.Thread):
    def __init__(self,lock, consumerQueue):
        self.consumerQueue=consumerQueue
    def run(self):
        while true:
            try:
                #接收任務,若是連續20秒沒有新的任務,線程退出,不然會一直執行
                item=self.consumerQueue.get(block=True, timeout=20)
                #阻塞調用進程直到有數據可用。若是timeout是個正整數,
                #阻塞調用進程最多timeout秒,
                #若是一直無數據可用,拋出Empty異常(帶超時的阻塞調用)
            except Queue.Empty:
                print("Nothing to do!thread exit!")
                self.thread_stop=True
                break
            doSomethingAboutConsuming(lock)# 處理消費者邏輯,必要時使用線程鎖 ,如文件操做等
            self.consumerQueue.task_done()
#定義主線程
def main():
    for i in range(n):#定義n個i消費者線程
        t = ThreadRead(producerQueue, consumerQueue)
        t.setDaemon(True)
        t.start()
    producerTasks=[] #定義初始化生產者任務隊列
    producerQueue.put(producerTasks)
    for i in range(n):#定義n個生產者錢程
        t = ThreadWrite(consumerQueue, lock)
        t.setDaemon(True)
        t.start()    
    stock_queue.join()
    data_queue.join()

相關接口

1,股票列表信息接口

  • 做用
    獲取滬深上市公司基本狀況。屬性包括:sql

    code,代碼
    name,名稱
    industry,所屬行業
    area,地區
    pe,市盈率
    outstanding,流通股本(億)
    totals,總股本(億)
    totalAssets,總資產(萬)
    liquidAssets,流動資產
    fixedAssets,固定資產
    reserved,公積金
    reservedPerShare,每股公積金
    esp,每股收益
    bvps,每股淨資
    pb,市淨率
    timeToMarket,上市日期
    undp,未分利潤
    perundp, 每股未分配
    rev,收入同比(%)
    profit,利潤同比(%)
    gpr,毛利率(%)
    npr,淨利潤率(%)
    holders,股東人數
    • 調用方法
import tushare as ts
ts.get_stock_basics()
  • 返回效果
name    industry    area       pe   outstanding     totals  totalAssets
code
600606   金豐投資     房產服務   上海     0.00     51832.01   51832.01    744930.44
002285    世聯行     房產服務   深圳    71.04     76352.17   76377.60    411595.28
000861   海印股份     房產服務   廣東   126.20     83775.50  118413.84    730716.56
000526   銀潤投資     房產服務   福建  2421.16      9619.50    9619.50     20065.32
000056    深國商     房產服務   深圳     0.00     14305.55   26508.14    787195.94
600895   張江高科     園區開發   上海   171.60    154868.95  154868.95   1771040.38
600736   蘇州高新     園區開發   江蘇    48.68    105788.15  105788.15   2125485.75
600663    陸家嘴     園區開發   上海    47.63    135808.41  186768.41   4562074.50
600658    電子城     園區開發   北京    19.39     58009.73   58009.73    431300.19
600648    外高橋     園區開發   上海    65.36     81022.34  113534.90   2508100.75
600639   浦東金橋     園區開發   上海    57.28     65664.88   92882.50   1241577.00
600604   市北高新     園區開發   上海   692.87     33352.42   56644.92    329289.50

2,日復權行情接口

  • 做用

提供股票上市以來全部歷史數據,默認爲前復權,讀取後存到本地,做爲後續分析的基礎數據庫

  • 調用方法
ts.get_h_data('002337', start='2015-01-01', end='2015-03-16') #兩個日期之間的前復權數據

parameter:
code:string,股票代碼 e.g. 600848
start:string,開始日期 format:YYYY-MM-DD 爲空時取當前日期
end:string,結束日期 format:YYYY-MM-DD 爲空時取去年今日
autype:string,復權類型,qfq-前復權 hfq-後復權 None-不復權,默認爲qfq
index:Boolean,是不是大盤指數,默認爲False
retry_count : int, 默認3,如遇網絡等問題重複執行的次數
pause : int, 默認 0,重複請求數據過程當中暫停的秒數,防止請求間隔時間過短出現的問題

return:
date : 交易日期 (index)
open : 開盤價
high : 最高價
close : 收盤價
low : 最低價
volume : 成交量
amount : 成交金額
  • 返回結果
open   high  close    low     volume      amount
date
2015-03-16  13.27  13.45  13.39  13.00   81212976  1073862784
2015-03-13  13.04  13.38  13.37  13.00   40548836   532739744
2015-03-12  13.29  13.95  13.28  12.96   71505720   962979904
2015-03-11  13.35  13.48  13.15  13.00   59110248   780300736
2015-03-10  13.16  13.67  13.59  12.72  105753088  1393819776
2015-03-09  13.77  14.73  14.13  13.70  139091552  1994454656
2015-03-06  12.17  13.39  13.39  12.17   89486704  1167752960
2015-03-05  12.79  12.80  12.17  12.08   26040832   966927360
2015-03-04  13.96  13.96  13.30  12.58   26636174  1060270720
2015-03-03  12.17  13.10  13.10  12.05   19290366   733336768

實現

廢話很少說,直接上代碼,服務器

  • 生產者線程,讀取行情
class ThreadRead(threading.Thread):
    def __init__(self, queue, out_queue):
        '''
        用於根據股票代碼、須要讀取的日期,讀取增量的日行情數據,
        :param queue:用於保存須要讀取的股票代碼、起始日期的列表
        :param out_queue:用於保存須要寫入到數據庫表的結果集列表
        :return:
        '''
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue
    def run(self):
        while true:
            item = self.queue.get()
            time.sleep(0.5)
            try:
                df_h_data = ts.get_h_data(item['code'], start=item['startdate'], retry_count=10, pause=0.01)
                if df_h_data is not None and len(df_h_data)>0:
                    df_h_data['secucode'] = item['code']
                    df_h_data.index.name = 'date'
                    print df_h_data.index,item['code'],item['startdate']
                    df_h_data['tradeday'] = df_h_data.index.strftime('%Y-%m-%d')
                    self.out_queue.put(df_h_data)
            except Exception, e:
                print str(e)
                self.queue.put(item) # 將沒有爬取成功的數據放回隊列裏面去,以便下次重試。
                time.sleep(10)
                continue

            self.queue.task_done()
  • 消費者線程,本地存儲
class ThreadWrite(threading.Thread):
    def __init__(self, queue, lock, db_engine):
        '''
        :param queue: 某種形式的任務隊列,此處爲tushare爲每一個股票返回的最新日復權行情數據
        :param lock:  暫時用鏈接互斥操做,防止mysql高併發,後續可嘗試去掉
        :param db_engine:  mysql數據庫的鏈接對象
        :return:no
        '''
        threading.Thread.__init__(self)
        self.queue = queue
        self.lock = lock
        self.db_engine = db_engine

    def run(self):
        while True:
            item = self.queue.get()
            self._save_data(item)
            self.queue.task_done()

    def _save_data(self, item):
            with self.lock:
                try:
                    item.to_sql('cron_dailyquote', self.db_engine, if_exists='append', index=False)
                except Exception, e:  # 若是是新股,則有可能df_h_data是空對象,所以須要跳過此類狀況不處理
                    print str(e)
  • 定義主線程
from Queue import Queue
stock_queue = Queue()
data_queue = Queue()
lock = threading.Lock()
def main():
    '''
    用於測試多線程讀取數據
    :return:
    '''
    #獲取環境變量,取得相應的環境配置,上線時不須要再變動代碼
    global stock_queue
    global data_queue
    config=os.getenv('FLASK_CONFIG')
    if config == 'default':
        db_url='mysql+pymysql://root:******@localhost:3306/python?charset=utf8mb4'
    else:
        db_url='mysql+pymysql://root:******@localhost:3306/test?charset=utf8mb4'
    db_engine = create_engine(db_url, echo=True)
    conn = db_engine.connect()
    #TODO 增長ts.get_stock_basics()報錯的處理,若是取不到信息則直接用數據庫中的股票代碼信息,來獲取增量信息
    #TODO 增長一個標誌,若是一個股票代碼的最新日期不是最新日期,則需標記該代碼不須要從新獲取數據,即記錄該股票更新日期到了最新工做日,
    df = ts.get_stock_basics()
    df.to_sql('stock_basics',db_engine,if_exists='replace',dtype={'code': CHAR(6)})
    # 計算距離當前日期最大的工做日,以便每日定時更新
    today=time.strftime('%Y-%m-%d',time.localtime(time.time()))
    s1=("select max(t.date) from cron_tradeday t where flag=1 and t.date <='"+ today+"'")
    selectsql=text(s1)
    maxTradeay = conn.execute(selectsql).first()
    # 計算每隻股票當前加載的最大工做日期,支持重跑
    s = ("select secucode,max(t.tradeday) from cron_dailyquote t group by secucode ")
    selectsql = text(s)
    result = conn.execute(selectsql)  # 執行查詢語句
    df_result = pd.DataFrame(result.fetchall())
    df_result.columns=['stockcode','max_tradeday']
    df_result.set_index(df_result['stockcode'],inplace=True)
    # 開始歸檔前復權歷史行情至數據庫當中,以即可以方便地計算後續選股模型

    for i in range(3):#使用3個線程
        t = ThreadRead(stock_queue, data_queue)
        t.setDaemon(True)
        t.start()
    for code in set(list(df.index)):
        try:
            #若是當前股票已是最新的行情數據,則直接跳過,方便重跑。
            #print maxTradeay[0],df_result.loc[code].values[1]
            if df_result.loc[code].values[1] == maxTradeay[0]:
                continue
            startdate=getLastNdate(df_result.loc[code].values[1],1)
        except Exception, e:
            #若是某隻股票沒有相關的行情,則默認開始日期爲2015年1月1日
            startdate='2015-01-01'
        item={}
        item['code']=code
        item['startdate']=startdate
        stock_queue.put(item) # 生成生產者任務隊列
    for i in range(3):
        t = ThreadWrite(data_queue, lock, db_engine)
        t.setDaemon(True)
        t.start()
    stock_queue.join()
    data_queue.join()
  • 執行效果

本來須要2,3個小時才能執行完成的每日復權行情增量落地,有效縮短至了1小時之內,這裏線程數並不上越多越好,因爲復權行情讀的是新浪接口,在高併發狀況下會返回HTTP 503服務器過載的錯誤,另外高併發下可能須要使用IP代理池,下載的時段也須要嘗試多個時段進行。初次嘗試,若是有更好的方法或者哪裏有考慮不周的地方歡迎留言建議或者指正。網絡

相關文章
相關標籤/搜索