在本文開始前,做者並無提倡LSTM是一種高度可靠的模型,它能夠很好地利用股票數據中的內在模式,或者能夠在沒有任何人蔘與的狀況下使用。寫這篇文章,純粹是出於對機器學習的熱愛。在我看來,該模型已經觀察到了數據中的某些模式,所以它能夠在大多數時候正確預測股票的走勢。可是,這個模型是否能夠用於實際,有待用更多回測和實踐去驗證。html
你想要正確地模擬股票價格,所以做爲股票買家,你能夠合理地決定何時買股票,何時賣股票。這就是時間序列建模的切入點。你須要良好的機器學習模型,能夠查看數據序列的歷史記錄,並正確地預測序列的將來元素是什麼。node
提示:股市價格高度不可預測且不穩定。這意味着在數據中沒有一致的模式可讓你近乎完美地模擬股票價格。就像普林斯頓大學經濟學家Burton Malkiel在他1973年的書中寫到的:「隨機漫步華爾街」,若是市場確實是有效的,那麼當股票價格反應的全部因素一旦被公開時,那咱們閉着眼睛均可以作的和專業投資者同樣好。ios
可是,咱們不要一直相信這只是一個隨機過程,以爲機器學習是沒有但願的。你不須要預測將來的股票確切的價格,而是股票價格的變更。作到這點就很不錯了!算法
使用如下數據源:數據庫
地址:https://www.kaggle.com/borismarjanovic/price-volume-data-for-all-us-stocks-etfs網絡
固然你也可基於Wind數據庫去研究。由於Wind數據相對於其餘平臺和數據商而言,整體上在國內算是比較全面和準確的。session
在Kaggle上找到的數據是csv文件,因此,你不須要再進行任何的預處理,所以你能夠直接將數據加載到DataFrame中。同時你還應該確保數據是按日期排序的,由於數據的順序在時間序列建模中相當重要。app
df = df.sort_values('Date') df.head()
plt.figure(figsize = (18,9)) plt.plot(range(df.shape[0]),(df['Low']+df['High'])/2.0) plt.xticks(range(0,df.shape[0],500),df['Date'].loc[::500],rotation=45) plt.xlabel('Date',fontsize=18) plt.ylabel('Mid Price',fontsize=18) plt.show()
上圖已經說明了不少東西。我選擇這家公司而不是其餘公司的具體緣由是,隨着時間的推移,這張圖中展示了不一樣的股價行爲。這將使學習更加穩健,而且能夠更改以便測試各類狀況下預測的好壞程度。dom
計算一天中最高和最低價的平均值來計算的中間價格。機器學習
high_prices = df.loc[:,'High'].as_matrix() low_prices = df.loc[:,'Low'].as_matrix() mid_prices = (high_prices+low_prices)/2.0
如今你能夠分離訓練數據和測試數據。訓練數據是時間序列的前11000個數據,其他的是測試數據。
train_data = mid_prices[:11000] test_data = mid_prices[11000:]
如今須要定義一個標準對數據進行歸一化。MinMaxScalar方法將全部數據歸到0和1之間。你還能夠將訓練和測試數據從新組爲[datasize, numfeatures]。
scaler = MinMaxScaler() train_data = train_data.reshape(-1,1) test_data = test_data.reshape(-1,1)
根據以前得數據,能夠看出不一樣時間段有不一樣的取值範圍,你能夠將整個序列拆分爲窗口來進行歸一化。若是不這樣作,早期的數據接近於0,而且不會給學習過程增長太多價值。這裏你選擇的窗口大小是2500。
當選擇窗口大小時,確保它不是過小,由於當執行窗口規範化時,它會在每一個窗口的末尾引入一箇中斷,由於每一個窗口都是**規範化的。
在本例中,4個數據點將受此影響。但假設你有11000個數據點,4個點不會引發任何問題。
smoothing_window_size = 2500 for di in range(0,10000,smoothing_window_size): scaler.fit(train_data[di:di+smoothing_window_size,:]) train_data[di:di+smoothing_window_size,:] = scaler.transform(train_data[di:di+smoothing_window_size,:]) # You normalize the last bit of remaining data scaler.fit(train_data[di+smoothing_window_size:,:]) train_data[di+smoothing_window_size:,:] = scaler.transform(train_data[di+smoothing_window_size:,:])
將數據從新塑造爲[data_size]的Shape:
train_data = train_data.reshape(-1) test_data = scaler.transform(test_data).reshape(-1)
如今可使用指數移動平均平滑數據。能夠幫助你避免股票價格數據的雜亂,併產生更平滑的曲線。咱們只使用訓練數據來訓練MinMaxScaler,經過將MinMaxScaler與測試數據進行匹配來規範化測試數據是錯誤的。
注意:你應該只平滑訓練數據。
EMA = 0.0 gamma = 0.1 for ti in range(11000): EMA = gamma*train_data[ti] + (1-gamma)*EMA train_data[ti] = EMA all_mid_data = np.concatenate([train_data,test_data],axis=0)
下面是平均結果。它很是接近股票的實際行爲。接下來您將看到一個更精確的一步預測方法:
上面的圖(和MSE)說明了什麼呢?對於很是短的predictiosn(一天以後)來講,這個模型彷佛不算太壞。考慮到股票價格在一晚上之間不會從0變化到100,這種行爲是明智的。接下來咱們來看一種更有趣的平均技術,稱爲指數移動平均。
你可能在互聯網上看到過一些文章使用很是複雜的模型來預測股票市場的行爲。可是要當心!我所看到的這些都只是視覺錯覺,不是由於學習了有用的東西。下面你將看到如何使用簡單的平均方法複製這種行爲。
window_size = 100 N = train_data.size run_avg_predictions = [] run_avg_x = [] mse_errors = [] running_mean = 0.0 run_avg_predictions.append(running_mean) decay = 0.5 for pred_idx in range(1,N): running_mean = running_mean*decay + (1.0-decay)*train_data[pred_idx-1] run_avg_predictions.append(running_mean) mse_errors.append((run_avg_predictions[-1]-train_data[pred_idx])**2) run_avg_x.append(date) print('MSE error for EMA averaging: %.5f'%(0.5*np.mean(mse_errors)))
MSE error for EMA averaging: 0.00003
若是指數移動平均線很好,爲何須要更好的模型呢?
能夠看到,它符合遵循真實分佈的完美直線(經過很是低的MSE證實了這一點)。實際上,僅憑次日的股票市值,你就作不了什麼。就我我的而言,我想要的不是次日股市的確切價格,而是將來30天股市的價格會上漲仍是下跌
讓咱們試着在窗口中進行預測(假設你預測接下來兩天的窗口,而不是次日)。而後你就會意識到EMA會有多麼的失敗。讓咱們經過一個例子來理解這一點。 無論你預測將來的步驟是多少,你都會獲得相同的答案。
輸出有用信息的一種解決方案是查看基於動量算法。他們的預測是基於過去的近期值是上升仍是降低(而不是精確的數值)。例如,若是過去幾天的價格一直在降低,次日的價格可能會更低。這聽起來很合理。然而,咱們將使用更復雜的模型:LSTM。
咱們將使用均值平方偏差來計算咱們的模型有多好。均值平方偏差(MSE)的計算方法是先計算真實值與預測值之間的平方偏差,而後對全部的預測進行平均。可是:
平均預測是一種很好的預測方法(這對股票市場的預測不是頗有用),但對將來的預測並非頗有用。
長短時記憶模型是很是強大的時間序列模型。它們能夠預測將來任意數量的步驟。LSTM模塊(或單元)有5個基本組件,能夠對長期和短時間數據進行建模。
計算方程以下:
Tensorflow爲實現時間序列模型提供了一個很好的子API。後面咱們會使用到它。
首先要實現一個數據生成器來訓練LSTM。這個數據生成器將有一個名爲unrollbatch(…)的方法,該方法將輸出一組按順序批量獲取numunrollings的輸入數據,其中批數據的大小爲[batch_size, 1]。而後每批輸入數據都有相應的輸出數據。
例如,若是numunrollings=3和batchsize=4則看起來像一組展開的批次。
輸入數據: [x0,x10,x20,x30],[x1,x11,x21,x31],[x2,x12,x22,x32]
輸出數據: [x1,x11,x21,x31],[x2,x12,x22,x32],[x3,x13,x23,x33]
下面將演示如何可視化建立一批數據。基本思想是將數據序列劃分爲N / b段,使每一個段的大小爲b,而後定義遊標每段爲1。而後對單個數據進行抽樣,咱們獲得一個輸入(當前段遊標索引)和一個真實預測(在[當前段遊標+1,當前段遊標+5]之間隨機抽樣)。請注意,咱們並不老是獲得輸入旁邊的值,就像它的預測同樣。這是一個減小過擬合的步驟。在每次抽樣結束時,咱們將光標增長1。
在本節中,將定義幾個超參數。D是輸入的維數。很簡單,你以以前的股票價格做爲輸入並預測下一個應該爲1。而後是numunrollings,它表示單個優化步驟須要考慮多少連續時間步驟。越大越好。而後是batchsize。批量處理大小是在單個時間步驟中考慮的數據樣本的數量。越大越好,由於在給定的時間內數據的可見性越好。接下來定義num_nodes,它表示每一個單元格中隱藏的神經元數量。在這個示例中,你能夠看到有三層LSTM。
D = 1 num_unrollings = 50 batch_size = 500 num_nodes = [200,200,150] n_layers = len(num_nodes) dropout = 0.2 tf.reset_default_graph()
接下來爲訓練輸入和標籤訂義佔位符。這很是簡單,由於你有一個輸入佔位符列表,其中每一個佔位符包含一批數據。 該列表包含num_unrollings佔位符,它將用於單個優化步驟。
train_inputs, train_outputs = [],[] for ui in range(num_unrollings): train_inputs.append(tf.placeholder(tf.float32, shape=[batch_size,D],name='train_inputs_%d'%ui)) train_outputs.append(tf.placeholder(tf.float32, shape=[batch_size,1], name = 'train_outputs_%d'%ui))
用三個LSTM層和一個線性迴歸層,用w和b表示,該層提取最後一個長短時間內**元的輸出並輸出對下一個時間步驟的預測。你可使用TensorFlow中的MultiRNNCell來封裝建立的三個LSTMCell對象。此外,還可使用dropout實現LSTM單元格,由於它們能夠提升性能並減小過擬合。
lstm_cells = [ tf.contrib.rnn.LSTMCell(num_units=num_nodes[li], state_is_tuple=True, initializer= tf.contrib.layers.xavier_initializer() ) for li in range(n_layers)] drop_lstm_cells = [tf.contrib.rnn.DropoutWrapper( lstm, input_keep_prob=1.0,output_keep_prob=1.0-dropout, state_keep_prob=1.0-dropout ) for lstm in lstm_cells] drop_multi_cell = tf.contrib.rnn.MultiRNNCell(drop_lstm_cells) multi_cell = tf.contrib.rnn.MultiRNNCell(lstm_cells) w = tf.get_variable('w',shape=[num_nodes[-1], 1], initializer=tf.contrib.layers.xavier_initializer()) b = tf.get_variable('b',initializer=tf.random_uniform([1],-0.1,0.1))
計算LSTM輸出並將其輸入迴歸層,獲得最終預測結果
首先建立TensorFlow變量(c和h),它將保持單元狀態和長短時間記憶單元的隱藏狀態。 而後將train_input列表轉換爲[num_unrollings, batch_size, D],使用tf.nn.dynamic_rnn計算所需輸出。而後使用tf.nn.dynamic_rnn計算LSTM輸出。並將輸出分解爲一列num_unrolling的張量。預測和真實股價之間的損失。
c, h = [],[] initial_state = [] for li in range(n_layers): c.append(tf.Variable(tf.zeros([batch_size, num_nodes[li]]), trainable=False)) h.append(tf.Variable(tf.zeros([batch_size, num_nodes[li]]), trainable=False)) initial_state.append(tf.contrib.rnn.LSTMStateTuple(c[li], h[li])) all_inputs = tf.concat([tf.expand_dims(t,0) for t in train_inputs],axis=0) all_lstm_outputs, state = tf.nn.dynamic_rnn( drop_multi_cell, all_inputs, initial_state=tuple(initial_state), time_major = True, dtype=tf.float32) all_lstm_outputs = tf.reshape(all_lstm_outputs, [batch_size*num_unrollings,num_nodes[-1]]) all_outputs = tf.nn.xw_plus_b(all_lstm_outputs,w,b) split_outputs = tf.split(all_outputs,num_unrollings,axis=0)
損失計算和優化器
如今,要計算損失。然而,在計算損失時,你應該注意到有一個獨特的特徵。對於每一批預測和真實輸出,計算均方偏差。而後把全部這些均方損失加起來(不是平均值)。最後,定義要用來優化神經網絡的優化器在這種狀況下,您可使用Adam,這是一個很是新且性能良好的優化器。
print('Defining training Loss') loss = 0.0 with tf.control_dependencies([tf.assign(c[li], state[li][0]) for li in range(n_layers)]+ [tf.assign(h[li], state[li][1]) for li in range(n_layers)]): for ui in range(num_unrollings): loss += tf.reduce_mean(0.5*(split_outputs[ui]-train_outputs[ui])**2) print('Learning rate decay operations') global_step = tf.Variable(0, trainable=False) inc_gstep = tf.assign(global_step,global_step + 1) tf_learning_rate = tf.placeholder(shape=None,dtype=tf.float32) tf_min_learning_rate = tf.placeholder(shape=None,dtype=tf.float32) learning_rate = tf.maximum( tf.train.exponential_decay(tf_learning_rate, global_step, decay_steps=1, decay_rate=0.5, staircase=True), tf_min_learning_rate) # Optimizer. print('TF Optimization operations') optimizer = tf.train.AdamOptimizer(learning_rate) gradients, v = zip(*optimizer.compute_gradients(loss)) gradients, _ = tf.clip_by_global_norm(gradients, 5.0) optimizer = optimizer.apply_gradients( zip(gradients, v)) print('\tAll done')
這裏定義了與預測相關的TensorFlow操做。首先,爲輸入(sample_inputs)定義一個佔位符,而後與訓練階段相似,定義預測的狀態變量(sample_c和sample_h)。最後用tf.nn.dynamic_rnn計算預測。後經過迴歸層(w和b)發送輸出。 還應該定義reset_sample_state操做,該操做將重置單元狀態和隱藏狀態。 每次進行一系列預測時,都應該在開始時執行此操做。
print('Defining prediction related TF functions') sample_inputs = tf.placeholder(tf.float32, shape=[1,D]) sample_c, sample_h, initial_sample_state = [],[],[] for li in range(n_layers): sample_c.append(tf.Variable(tf.zeros([1, num_nodes[li]]), trainable=False)) sample_h.append(tf.Variable(tf.zeros([1, num_nodes[li]]), trainable=False)) initial_sample_state.append(tf.contrib.rnn.LSTMStateTuple(sample_c[li],sample_h[li])) reset_sample_states = tf.group(*[tf.assign(sample_c[li],tf.zeros([1, num_nodes[li]])) for li in range(n_layers)], *[tf.assign(sample_h[li],tf.zeros([1, num_nodes[li]])) for li in range(n_layers)]) sample_outputs, sample_state = tf.nn.dynamic_rnn(multi_cell, tf.expand_dims(sample_inputs,0), initial_state=tuple(initial_sample_state), time_major = True, dtype=tf.float32) with tf.control_dependencies([tf.assign(sample_c[li],sample_state[li][0]) for li in range(n_layers)]+ [tf.assign(sample_h[li],sample_state[li][1]) for li in range(n_layers)]): sample_prediction = tf.nn.xw_plus_b(tf.reshape(sample_outputs,[1,-1]), w, b) print('\tAll done')
運行LSTM
在這裏,你將訓練和預測幾個時期的股票價格走勢,看看這些預測是否會隨着時間的推移而變得更好或更糟。按照如下步驟操做:
在時間序列上定義一組測試起點(test_points_seq)來計算LSTM
對於每個epoch
用於訓練數據的完整序列長度
展開一組num_unrollings批次
使用展開的批次LSTM進行訓練
計算平均訓練損失
對於測試集中的每一個起點
經過迭代在測試點以前找到的之前的num_unrollings數據點來更新LSTM狀態
使用先前的預測做爲當前輸入,連續預測n_predict_once步驟
計算預測到的n_predict_once點與當時股票價格之間的MSE損失
部分代碼
epochs = 30 valid_summary = 1 n_predict_once = 50 train_seq_length = train_data.size train_mse_ot = [] test_mse_ot = [] predictions_over_time = [] session = tf.InteractiveSession() tf.global_variables_initializer().run() loss_nondecrease_count = 0 loss_nondecrease_threshold = 2 print('Initialized') average_loss = 0 data_gen = DataGeneratorSeq(train_data,batch_size,num_unrollings) x_axis_seq = [] test_points_seq = np.arange(11000,12000,50).tolist() for ep in range(epochs): # ========================= Training ===================================== for step in range(train_seq_length//batch_size): u_data, u_labels = data_gen.unroll_batches() feed_dict = {} for ui,(dat,lbl) in enumerate(zip(u_data,u_labels)): feed_dict[train_inputs[ui]] = dat.reshape(-1,1) feed_dict[train_outputs[ui]] = lbl.reshape(-1,1) feed_dict.update({tf_learning_rate: 0.0001, tf_min_learning_rate:0.000001}) _, l = session.run([optimizer, loss], feed_dict=feed_dict)
可視化預測
能夠看到MSE損失是如何隨着訓練量的減小而減小的。這是一個好跡象,代表模型正在學習一些有用的東西。你能夠看到LSTM比標準平均值作得更好。標準平均(雖然不完美)合理地跟隨真實的股票價格運動。
best_prediction_epoch = 28 plt.figure(figsize = (18,18)) plt.subplot(2,1,1) plt.plot(range(df.shape[0]),all_mid_data,color='b') predictions with high alpha start_alpha = 0.25 alpha = np.arange(start_alpha,1.1,(1.0-start_alpha)/len(predictions_over_time[::3])) for p_i,p in enumerate(predictions_over_time[::3]): for xval,yval in zip(x_axis_seq,p): plt.plot(xval,yval,color='r',alpha=alpha[p_i]) plt.title('Evolution of Test Predictions Over Time',fontsize=18) plt.xlabel('Date',fontsize=18) plt.ylabel('Mid Price',fontsize=18) plt.xlim(11000,12500) plt.subplot(2,1,2) plt.plot(range(df.shape[0]),all_mid_data,color='b') for xval,yval in zip(x_axis_seq,predictions_over_time[best_prediction_epoch]): plt.plot(xval,yval,color='r') plt.title('Best Test Predictions Over Time',fontsize=18) plt.xlabel('Date',fontsize=18) plt.ylabel('Mid Price',fontsize=18) plt.xlim(11000,12500) plt.show()
儘管LSTM並不完美,但它彷佛在大多數狀況下都能正確預測股價走勢。請注意,你的預測大體在0和1之間(也就是說,不是真實的股票價格)。這是能夠的,由於你預測的是股價的走勢,而不是股價自己。
股票價格/移動預測是一項極其困難的任務。就我我的而言,我認爲任何股票預測模型都不該該被視爲理所固然,而且盲目地依賴它們。然而,模型在大多數狀況下可能可以正確預測股票價格的變更,但並不老是如此。
不要被那些預測曲線徹底與真實股價重疊的文章所迷惑。這能夠用一個簡單的平均技術來複制,但實際上它是無用的。更明智的作法是預測股價走勢。
模型的超參數對你獲得的結果很是敏感。所以,一個很是好的事情是在超參數上運行一些超參數優化技術(例如,網格搜索/隨機搜索)。這裏列出了一些最關鍵的超參數:優化器的學習率、層數、每層的隱藏單元數,優化器Adam表現最佳,模型的類型(GRU / LSTM / LSTM with peepholes)。
因爲本文因爲數據量小,咱們用測試損耗來衰減學習速率。這間接地將測試集的信息泄露到訓練過程當中。處理這個問題更好的方法是有一個單獨的驗證集(除了測試集)與驗證集性能相關的衰減學習率。
延伸閱讀: