股票價格預測是一件很是唬人的事情,但若是隻基於歷史數據進行預測,顯然徹底不靠譜網絡
股票價格是典型的時間序列數據(簡稱時序數據),會受到經濟環境、政府政策、人爲操做多種複雜因素的影響dom
不像氣象數據那樣具有明顯的時間和季節性模式,例如一天以內和一年以內的氣溫變化等異步
儘管如此,以股票價格爲例,介紹如何對時序數據進行預測,仍然值得一作函數
如下使用TensorFlow和Keras,對S&P 500
股價數據進行分析和預測測試
S&P 500
股價數據爬取自Google Finance API,已經進行過缺失值處理lua
加載庫,pandas主要用於數據清洗和整理code
# -*- coding: utf-8 -*- import pandas as pd import numpy as np import tensorflow as tf import matplotlib.pyplot as plt %matplotlib inline from sklearn.preprocessing import MinMaxScaler import time
用pandas讀取csv文件爲DataFrame,並用describe()
查看特徵的數值分佈orm
data = pd.read_csv('data_stocks.csv') data.describe()
還能夠用info()
查看特徵的概要視頻
data.info()
數據共502列,41266行,502列分別爲:htm
DATE
:該行數據的時間戳SP500
:能夠理解爲大盤指數查看數據的前五行
data.head()
查看時間跨度
print(time.strftime('%Y-%m-%d', time.localtime(data['DATE'].max())), time.strftime('%Y-%m-%d', time.localtime(data['DATE'].min())))
繪製大盤趨勢折線圖
plt.plot(data['SP500'])
去掉DATE
一列,訓練集測試集分割
data.drop('DATE', axis=1, inplace=True) data_train = data.iloc[:int(data.shape[0] * 0.8), :] data_test = data.iloc[int(data.shape[0] * 0.8):, :] print(data_train.shape, data_test.shape)
數據歸一化,只能使用data_train
進行fit()
scaler = MinMaxScaler(feature_range=(-1, 1)) scaler.fit(data_train) data_train = scaler.transform(data_train) data_test = scaler.transform(data_test)
同步預測是指,使用當前時刻的500支個股股價,預測當前時刻的大盤指數,即一個迴歸問題,輸入共500維特徵,輸出一維,即[None, 500] => [None, 1]
使用TensorFlow實現同步預測,主要用到多層感知機(Multi-Layer Perceptron,MLP),損失函數用均方偏差(Mean Square Error,MSE)
X_train = data_train[:, 1:] y_train = data_train[:, 0] X_test = data_test[:, 1:] y_test = data_test[:, 0] input_dim = X_train.shape[1] hidden_1 = 1024 hidden_2 = 512 hidden_3 = 256 hidden_4 = 128 output_dim = 1 batch_size = 256 epochs = 10 tf.reset_default_graph() X = tf.placeholder(shape=[None, input_dim], dtype=tf.float32) Y = tf.placeholder(shape=[None], dtype=tf.float32) W1 = tf.get_variable('W1', [input_dim, hidden_1], initializer=tf.contrib.layers.xavier_initializer(seed=1)) b1 = tf.get_variable('b1', [hidden_1], initializer=tf.zeros_initializer()) W2 = tf.get_variable('W2', [hidden_1, hidden_2], initializer=tf.contrib.layers.xavier_initializer(seed=1)) b2 = tf.get_variable('b2', [hidden_2], initializer=tf.zeros_initializer()) W3 = tf.get_variable('W3', [hidden_2, hidden_3], initializer=tf.contrib.layers.xavier_initializer(seed=1)) b3 = tf.get_variable('b3', [hidden_3], initializer=tf.zeros_initializer()) W4 = tf.get_variable('W4', [hidden_3, hidden_4], initializer=tf.contrib.layers.xavier_initializer(seed=1)) b4 = tf.get_variable('b4', [hidden_4], initializer=tf.zeros_initializer()) W5 = tf.get_variable('W5', [hidden_4, output_dim], initializer=tf.contrib.layers.xavier_initializer(seed=1)) b5 = tf.get_variable('b5', [output_dim], initializer=tf.zeros_initializer()) h1 = tf.nn.relu(tf.add(tf.matmul(X, W1), b1)) h2 = tf.nn.relu(tf.add(tf.matmul(h1, W2), b2)) h3 = tf.nn.relu(tf.add(tf.matmul(h2, W3), b3)) h4 = tf.nn.relu(tf.add(tf.matmul(h3, W4), b4)) out = tf.transpose(tf.add(tf.matmul(h4, W5), b5)) cost = tf.reduce_mean(tf.squared_difference(out, Y)) optimizer = tf.train.AdamOptimizer().minimize(cost) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) for e in range(epochs): shuffle_indices = np.random.permutation(np.arange(y_train.shape[0])) X_train = X_train[shuffle_indices] y_train = y_train[shuffle_indices] for i in range(y_train.shape[0] // batch_size): start = i * batch_size batch_x = X_train[start : start + batch_size] batch_y = y_train[start : start + batch_size] sess.run(optimizer, feed_dict={X: batch_x, Y: batch_y}) if i % 50 == 0: print('MSE Train:', sess.run(cost, feed_dict={X: X_train, Y: y_train})) print('MSE Test:', sess.run(cost, feed_dict={X: X_test, Y: y_test})) y_pred = sess.run(out, feed_dict={X: X_test}) y_pred = np.squeeze(y_pred) plt.plot(y_test, label='test') plt.plot(y_pred, label='pred') plt.title('Epoch ' + str(e) + ', Batch ' + str(i)) plt.legend() plt.show()
最後測試集的loss在0.005左右,預測結果以下
使用Keras實現同步預測,代碼量會少不少,但具體實現細節不及TensorFlow靈活
from keras.layers import Input, Dense from keras.models import Model X_train = data_train[:, 1:] y_train = data_train[:, 0] X_test = data_test[:, 1:] y_test = data_test[:, 0] input_dim = X_train.shape[1] hidden_1 = 1024 hidden_2 = 512 hidden_3 = 256 hidden_4 = 128 output_dim = 1 batch_size = 256 epochs = 10 X = Input(shape=[input_dim,]) h = Dense(hidden_1, activation='relu')(X) h = Dense(hidden_2, activation='relu')(h) h = Dense(hidden_3, activation='relu')(h) h = Dense(hidden_4, activation='relu')(h) Y = Dense(output_dim, activation='sigmoid')(h) model = Model(X, Y) model.compile(loss='mean_squared_error', optimizer='adam') model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False) y_pred = model.predict(X_test) print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size)) print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size)) plt.plot(y_test, label='test') plt.plot(y_pred, label='pred') plt.legend() plt.show()
最後測試集的loss在0.007左右,預測結果以下
異步預測是指,使用歷史若干個時刻的大盤指數,預測當前時刻的大盤指數,這樣才更加符合預測的定義
例如,使用前五個大盤指數,預測當前的大盤指數,每組輸入包括5個step,每一個step對應一個歷史時刻的大盤指數,輸出一維,即[None, 5, 1] => [None, 1]
使用Keras實現異步預測,主要用到循環神經網絡即RNN(Recurrent Neural Network)中的LSTM(Long Short-Term Memory)
from keras.layers import Input, Dense, LSTM from keras.models import Model output_dim = 1 batch_size = 256 epochs = 10 seq_len = 5 hidden_size = 128 X_train = np.array([data_train[i : i + seq_len, 0] for i in range(data_train.shape[0] - seq_len)])[:, :, np.newaxis] y_train = np.array([data_train[i + seq_len, 0] for i in range(data_train.shape[0] - seq_len)]) X_test = np.array([data_test[i : i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)])[:, :, np.newaxis] y_test = np.array([data_test[i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)]) print(X_train.shape, y_train.shape, X_test.shape, y_test.shape) X = Input(shape=[X_train.shape[1], X_train.shape[2],]) h = LSTM(hidden_size, activation='relu')(X) Y = Dense(output_dim, activation='sigmoid')(h) model = Model(X, Y) model.compile(loss='mean_squared_error', optimizer='adam') model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False) y_pred = model.predict(X_test) print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size)) print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size)) plt.plot(y_test, label='test') plt.plot(y_pred, label='pred') plt.legend() plt.show()
最後測試集的loss在0.0015左右,預測結果以下,一層LSTM的效果已經好很是多了
固然,還有一種可能的嘗試,使用歷史若干個時刻的500支個股股價以及大盤指數,預測當前時刻的大盤指數,即[None, 5, 501] => [None, 1]
from keras.layers import Input, Dense, LSTM from keras.models import Model output_dim = 1 batch_size = 256 epochs = 10 seq_len = 5 hidden_size = 128 X_train = np.array([data_train[i : i + seq_len, :] for i in range(data_train.shape[0] - seq_len)]) y_train = np.array([data_train[i + seq_len, 0] for i in range(data_train.shape[0] - seq_len)]) X_test = np.array([data_test[i : i + seq_len, :] for i in range(data_test.shape[0] - seq_len)]) y_test = np.array([data_test[i + seq_len, 0] for i in range(data_test.shape[0] - seq_len)]) print(X_train.shape, y_train.shape, X_test.shape, y_test.shape) X = Input(shape=[X_train.shape[1], X_train.shape[2],]) h = LSTM(hidden_size, activation='relu')(X) Y = Dense(output_dim, activation='sigmoid')(h) model = Model(X, Y) model.compile(loss='mean_squared_error', optimizer='adam') model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, shuffle=False) y_pred = model.predict(X_test) print('MSE Train:', model.evaluate(X_train, y_train, batch_size=batch_size)) print('MSE Test:', model.evaluate(X_test, y_test, batch_size=batch_size)) plt.plot(y_test, label='test') plt.plot(y_pred, label='pred') plt.legend() plt.show()
最後的loss在0.004左右,結果反而變差了
500支個股加上大盤指數的預測效果,還不如僅使用大盤指數
說明特徵並非越多越好,有時候反而會引入沒必要要的噪音
因爲並未涉及到複雜的CNN或RNN,因此在CPU上運行的速度還能夠