人的精力是有限的,因此將目光聚焦在更小的範圍內,也許可以獲得性價比最高的效果。git
股票不少不少,可是咱們關心的也許並很少,將本身關心或者符合本身買入股票的前提條件的股票歸入一個股票池,而後再必定時間內只關注這些股票,也許是一個不錯的選擇,github
入市有風險,投資需謹慎,本文不做爲任何投資參考依據。sql
這裏主要指免費的,收費的本人也沒用過。數據庫
比較推薦的數據源有兩個tushare或者baostock.api
tushare須要註冊(若是你須要質量更高的數據的話)。對獲取頻率或者小於日線頻率的數據有限制。微信
若是你們想註冊tushare的話, 可使用個人分享連接: https://tushare.pro/register?reg=277890 這樣我能多50積分^_^多線程
使用何種方式獲取數據都是能夠的,看本身喜歡。併發
安裝很簡單app
pip install tushare pip install baostock pip install akshare pip install pytdx
這裏使用爲了接下來的操做須要將必定歷史範圍的股票數據下載下來,這裏下載起始時間爲20160101,截至時間爲運行代碼的時間範圍的歷史日線數據。ide
這裏以tushare爲例, tushare獲取歷史數據有兩種方式。
第一種是以迭代歷史交易日的方式獲取全部歷史數據,假設獲取三年的歷史數據,一年通常220個交易日左右,那麼3年須要請求660屢次左右,若是以這種方式的話,就下載數據的時間只須要1分鐘多點的樣子。
第二種是以迭代全部股票代碼的方式獲取全部歷史數據,股票數量有大概3800多個,須要請求3800屢次,可是在積分有限的狀況下一分鐘最多請求500次,也就意味着僅下載數據的時間至少須要大概8分鐘時間。
理論上,你獲取的歷史範圍超過17.3年,那麼使用第一種方式才比第二種方式快。
這兩種方式我實現了,你們能夠參考如下代碼.
首先是一些必要的設置及額外的重試機制。
DATE_FORMAT = "%Y%m%d" TS_TOKEN = "<你的Tushare Token>" ts.set_token(TS_TOKEN) pro = ts.pro_api(TS_TOKEN) def retry(fn, max_retry=3, sleep_init=14): """簡單的retry函數""" count = 1 while count < max_retry: count += 1 try: return fn() except Exception as e: # traceback.print_exc("") # 等待時間遞增 time_sleep = sleep_init * count + 1 print("遇到異常%s, 在%s秒後再次嘗試第%s次" % (e, time_sleep, count)) time.sleep(time_sleep) def save_to_csv(ret, data_path="stock"): if not path.exists(data_path): # 若是父目錄不存在不會報錯 os.makedirs(data_path) for ts_code, df in ret.items(): fname = "-".join([ts_code, ".csv"]) fp = path.join(data_path, fname) df.to_csv(fp, index=False)
第一種方式
def download_by_trade_date(start_date, end_date, data_path="by_trade_date", worker_size=2, debug=False): """ 經過交易日來遍歷時間範圍內的數據,當交易日的個數小於股票的數量時,效率較高. 一年通常220個交易日左右,可是股票卻有3800多個,那麼經過交易日來下載數據就高效的多了 """ now = datetime.now() start_time = now try: start_date_ = datetime.strptime(start_date, DATE_FORMAT) end_date_ = datetime.strptime(end_date, DATE_FORMAT) if end_date_ < start_date_: sys.exit("起始時間應該大於結束時間") if start_date_ > now: sys.exit("起始時間應該大於當前時間") if end_date_ > now: end_date = now.strftime(DATE_FORMAT) except Exception: traceback.print_exc("") sys.exit("傳入的start_date[%s]或end_date[%s]時間格式不正確, 格式應該像20200101" % (start_date, end_date)) # 獲取交易日曆 try: trade_cal = pro.trade_cal(exchange="SSE", is_open="1", start_date=start_date, end_date=end_date, fields="cal_date") except Exception: sys.exit("獲取交易日曆失敗") trade_date_lst = trade_cal.cal_date pool = ThreadPoolExecutor(max_workers=worker_size) print("準備開始獲取 %s到%s 的股票數據" % (start_date, end_date)) def worker(trade_date): # 用偏函數包裝一下 # pro = ts.pro_api(TS_TOKEN) fn = partial(pro.daily, trade_date=trade_date) return retry(fn) # 最終保存到一個列表中 ret = defaultdict(list) # future 列表 fs_lst = [] # 經過線程併發獲取數據 for trade_date in trade_date_lst: # print(trade_date) # 這裏不使用pool.map的緣由是, map返回的future列表會亂序 # submit的位置參數不須要須要放到可迭代對象裏面(通常是元組), 臥槽。。。 fs = pool.submit(worker, trade_date) fs_lst.append(fs) # break # ***************************** # 獲取每一個交易日的股票數據 # ***************************** for trade_date, fs in zip(trade_date_lst, fs_lst): if debug: print("開始獲取交易日[%s]的數據" % trade_date) # 若是有異常或者結果爲空的話 if fs.exception() or not isinstance(fs.result(), pd.DataFrame): print(fs.exception()) sys.exit("在交易日[%s]超太重試最大的次數也沒有獲取到數據" % trade_date) day_df = fs.result() columns = day_df.columns # 遍歷一個交易日的全部股票數據 # print(datetime.now()) # 遍歷day_df.values 大概2ms # 2 ms ± 63.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) # 遍歷day_df.iterrows() 大概285ms # 285 ms ± 2.64 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) for row in day_df.values: ts_code = row[0] ret[ts_code].append(row) # print(datetime.now()) merge_start_time = datetime.now() new_ret = {} for key, value in ret.items(): new_ret[key] = pd.DataFrame(value, columns=columns) merge_end_time = datetime.now() # 組合[series...] 須要142ms # 142 ms ± 1.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) # 組合[array....] 須要6.56ms # 6.56 ms ± 66.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) print("合併共花費時間: %s" % (merge_end_time - merge_start_time)) # ***************************** # 獲取將結果保存到本地的csv文件中 # ***************************** print("數據已經獲取完畢準備保存到本地") save_to_csv(new_ret, data_path=data_path) end_time = datetime.now() print("*****************************") print("下載完成, 共花費時間%s" % (end_time - start_time)) print("*****************************")
第二種方式
def download_by_ts_code(start_date, end_date, data_path="by_ts_code", debug=False, worker_size=3): """由於按股票代碼的方式實在太慢了(若是你寬帶速度比較快的話), 也就不必多線程了""" now = datetime.now() start_time = now try: start_date_ = datetime.strptime(start_date, DATE_FORMAT) end_date_ = datetime.strptime(end_date, DATE_FORMAT) if end_date_ < start_date_: sys.exit("起始時間應該大於結束時間") if start_date_ > now: sys.exit("起始時間應該大於當前時間") if end_date_ > now: end_date = now.strftime(DATE_FORMAT) except Exception: traceback.print_exc("") sys.exit("傳入的start_date[%s]或end_date[%s]時間格式不正確, 格式應該像20200101" % (start_date, end_date)) def worker(ts_code): fn = partial(ts.pro_bar, ts_code=ts_code, adj='qfq', start_date=start_date, end_date=end_date) return retry(fn) pool = ThreadPoolExecutor(max_workers=worker_size) print("準備開始獲取 %s到%s 的股票數據" % (start_date, end_date)) # 不指定任何參數會獲取5000條最近的數據,ts_code會重複 day = pro.daily() # 固定順序,set是沒法包裝有序的 all_ts_code = list(set(day.ts_code)) fs_lst = [] for ts_code in all_ts_code: fs_lst.append(pool.submit(worker, ts_code)) # break for ts_code, fs in zip(all_ts_code, fs_lst): # 若是有異常或者結果爲空的話 if fs.exception() or not isinstance(fs.result(), pd.DataFrame): print(fs.exception()) sys.exit("在交易日[%s]超太重試最大的次數也沒有獲取到數據" % trade_date) df = fs.result() # %timeit df.sort_index() # 192 µs ± 3.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each) # %timeit df.sort_index(inplace=True) # 2.54 µs ± 177 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each) df.sort_index(inplace=True, ascending=False) if not isinstance(df, pd.DataFrame): sys.exit("在股票[%s]超太重試最大的次數也沒有獲取到數據" % ts_code) save_to_csv({ts_code: df}, data_path=data_path) if debug: print("股票[%s]歷史數據下載保存完成" % ts_code) end_time = datetime.now() print("*****************************") print("下載完成, 共花費時間%s" % (end_time - start_time)) print("*****************************")
我感受我寫了好多註釋以及一些方法之間性能對比,關於各個方法之間的性能對比你們仍是須要注意的,由於雖然平時感覺不出來,可是在較屢次數的循環的時候就會發現性能差別了。
至此須要的歷史數據就獲得了。
若是對讀寫速度很在乎的話,存儲在sqlite數據庫或者其餘其餘數據會快不少。
不管是主觀交易仍是量化交易,選股的方式我的認爲大概分爲如下三類。
本文主要集中在如下幾種方式
股票的形態多種多樣,這裏以選擇股票中的黃昏(早晨)十字星爲例。
黃昏十字星與早晨十字星的區別在於所處趨勢不同。
圖示以下:
若是須要程序判斷,那麼這些長度須要量化,不能模棱兩可。因此根據十字星的定義,量化以下:
實體長度: 超過2.5%纔算長實體,並且上下影線不能超過0.3%
十字星: 實體長度不超過1.5%
趨勢: 包括十字星在內往前數6根k線,其中的第1根k線收盤價均小於它後面4根收盤價爲向上趨勢,反之趨勢向下,而且幅度超過2.5%。
這裏我只是主觀的量化沒有任何交易經驗的選擇,若是不認同的話,能夠自行修改代碼。
代碼以下:
def select_doji(): # fp = "by_ts_code/600249.SH-.csv" fp = "by_ts_code/300130.SZ-.csv" df = pd.read_csv(fp, index_col="trade_date", parse_dates=["trade_date"]) df = df[["open", "high", "low", "close"]] # k線數量 k_size = 6 # 起始幅度大小 treand_threshold = 0.025 # 長實體最小長度 entity_length_threshold = 0.025 # 長實體上下最大影線長度 entity_shadow_line_length_threshold = 0.03 # 十字星實體長度最大長度 doji_entity_length_threshold = 0.015 # 十字星上下影線長度最小長度 # doji_shadow_line_length_threshold = 0.005 trend_map = {1: "向上", -1: "向下"} def up_or_down_trend(row): """1表明向上, -1表明向下, 0表明震盪""" first = row[0] last = row[-1] if all(first > row[1:]) and first > (last * treand_threshold): return -1 elif all(first < row[1:]) and last > (first * treand_threshold): return 1 else: return 0 df["trend"] = df.close.rolling(k_size).apply(up_or_down_trend, raw=True) df.fillna(value=0, inplace=True) def k_sharp(k): """返回k線的上下影線長度, 實體長度""" open_, high, low, close = k[:4] if open_ > close: upper_line_length = (high - open_) / high lower_line_length = (close - low) / close entity_length = (open_ - close) / open_ else: upper_line_length = (high - close) / high lower_line_length = (open_ - low) / open_ entity_length = (close - open_) / close return upper_line_length, lower_line_length, entity_length def is_up_or_down_doji(k_lst, trend): # open, high, low, close if len(k_lst) != 3: sys.exit("判斷十字星須要三根K線") is_ok = False k1, k2, k3 = k_lst # 判斷是否跳空 # 經過high, close過於嚴格 if trend > 0: # 趨勢向上時,最低點是否大於兩個實體的最高價 if k2[0] < k1[1] or k2[0] < k3[1]: # if k2[0] < k1[1] : return is_ok else: # 趨勢向下時,最高點是否小於兩個實體的最高價 if k2[0] > k1[2] or k2[0] > k3[2]: return is_ok k1_sharp = k_sharp(k1) # print("k1 sharp") # print(k1_sharp) # 判斷是否爲長實體 if (k1_sharp[2] < entity_length_threshold or k1_sharp[0] > entity_shadow_line_length_threshold or k1_sharp[1] > entity_shadow_line_length_threshold): return is_ok k3_sharp = k_sharp(k3) # print("k3 sharp") # print(k3_sharp) if (k3_sharp[2] < entity_length_threshold or k3_sharp[0] > entity_shadow_line_length_threshold or k3_sharp[1] > entity_shadow_line_length_threshold): return is_ok # print("ok") # 判斷是否爲十字星 k2_sharp = k_sharp(k2) # print("k2 sharp") # print(k2_sharp) # 實體長度不超過0.2%, 上下影線長度超過0.6%, 若是規定上下影線的長度不太好找 # if (k2_sharp[2] > doji_entity_length_threshold # or k2_sharp[0] < doji_shadow_line_length_threshold # or k2_sharp[1] < doji_shadow_line_length_threshold): if k2_sharp[2] > doji_entity_length_threshold: return is_ok return True df_values = df.values ret = [] for index in range(len(df_values)): if index < k_size: continue trend = df_values[index - 1][-1] if trend == 0: continue val_slice = slice(index-2, index+1) k_lst = df_values[val_slice] if is_up_or_down_doji(k_lst, trend): ret.append(index-1) print("在>>%s<<<找到趨勢爲[%s]的十字星" % (df.index[index-1], trend_map[trend])) if not ret: print("沒有發現任何十字星") ax = ohlc_plot(df[["open", "high", "low", "close"]]) for i in ret: # print(i) mark_x = df.index[i] # mark_y = df.loc[mark_x].low # print(mark_x, mark_y) ax.axvline(mark_x) plt.show() return ret
結果以下:
說實話標準的不太好找,因此代碼的條件設置不是很嚴格。
形態選股頗有意思,之後有機會單獨出一篇文章。
經過交易額過濾,選擇股票最近一百個交易日的成交額平均值,而後進行排序。
交易額排序
def select_by_amount(data_path, top_size=20): # 或許交易日比較好 day_range = 100 all_df = load_all_local_data(data_path, tail_size=day_range) ret = [] for ts_code, df in all_df.items(): df["amount_avg"] = df.amount.rolling(day_range).mean() amount_avg = df["amount_avg"][-1] # NaN的布爾值爲True, 因此須要np.isnan判斷 if np.isnan(amount_avg): continue ret.append((ts_code, amount_avg)) ret.sort(key=lambda x:x[1]) print("交易額排名前%s的結果以下" % top_size) print(ret[-top_size:]) return ret[-top_size:]
結果以下。
[('002714.SZ', 2162871.9387400015), ('000002.SZ', 2294273.739109999), ('002460.SZ', 2372975.2177099977), ('600745.SH', 2461243.4350499995), ('601990.SH', 2551214.2825800003), ('603986.SH', 2679324.4020000002), ('600703.SH', 2759167.96963), ('002456.SZ', 2759314.642220001), ('000651.SZ', 2782179.0211499995), ('000100.SZ', 2853012.22389), ('002185.SZ', 3149773.6836400027), ('000858.SZ', 3168871.2845699983), ('002475.SZ', 3392269.47999), ('300750.SZ', 3511882.7076800014), ('600030.SH', 4039920.83439), ('000725.SZ', 4189700.4488400007), ('300059.SZ', 4447245.855129998), ('600519.SH', 4448082.42745), ('601318.SH', 4892206.13003), ('000063.SZ', 5128169.27109)]
流通市值排序
def select_by_float_market_value(trade_date, top_size=20): df = pro.daily_basic(ts_code='', trade_date=trade_date, fields="ts_code,close,float_share") ret = [] for row in df.values: ts_code = row[0] float_market_value = row[1] * row[2] if np.isnan(float_market_value) or not float_market_value: continue ret.append((ts_code, float_market_value)) ret.sort(key=lambda x:x[1]) print("流通市值名前%s的結果以下" % top_size) print(ret[-top_size:]) return ret[-top_size:]
結果以下
交易額排名前20的結果以下 [('000002.SZ', 24443367.828188002), ('000001.SZ', 25072232.462559998), ('601088.SH', 26237241.386405), ('600000.SH', 28497216.593586), ('601166.SH', 30952865.369478), ('000651.SZ', 33204757.629185997), ('603288.SH', 36720702.433056), ('600900.SH', 36894000.0), ('000333.SZ', 39063915.66100799), ('600028.SH', 40612052.69455), ('600276.SH', 42312945.987192), ('000858.SZ', 54753808.6884), ('601628.SH', 54786707.43), ('600036.SH', 69375140.114727), ('601857.SH', 70759948.006466), ('601988.SH', 72081806.077332), ('601318.SH', 75958643.459976), ('601288.SH', 99096634.04564801), ('601398.SH', 136154167.33219498), ('600519.SH', 166848191.796)]
這種方式比較簡單,可是通常還須要一些其餘的條件相互配合。
若是期待後續文章能夠關注個人微信公衆號(又耳筆記),頭條號(又耳筆記),Github(youerning).