撰前小記:python
前些時候穿了幾天的薄外套,最終仍是敗給了廣州的天氣(太熱了)。因此今天仍是短袖短褲齊上陣吧,廣州的夜,真不冷。
本次實驗使用TensorFlow 2.3.1。app
import numpy as np import matplotlib.pyplot as plt from pandas import read_csv import math from keras.models import Sequential from keras.layers import Dense from keras.layers import LSTM from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error %matplotlib inline
使用了Yahoo! Finance ^GSPC的近五年曆史股價數據,從2015年11月到2020年11月,共1256個。該數據包含天天股價的信息,如Date, Open, High, Low, Close, Adj Close, Volume。函數
Tips: 股票小知識測試
爲簡單起見,只使用收盤價做預測。下圖直觀展現了近五年的收盤價。
代碼:lua
# 用pandas載入數據集 dataframe = read_csv('data/stock_data.csv', usecols=[4], engine='python', skipfooter=3) data = dataframe.values # 將整型變爲float data = data.astype('float32') plt.plot(data) plt.show()
預測將來的股票收盤價,本次預測的是最後56個數據。spa
近五年的收盤價是一個長度爲 N 的時間序列,定義p0, p1,...,pN-1爲每一天的價格。用前 i 個數據預測第 i + 1 個數據構建訓練集與測試集,0 < i < N,即code
X 0 = (p 0, p 1,..., p i-1)
X 1 = (p i, p i+1,..., p 2i-1)
...
X t = (p ti, p ti+1,..., p (t+1)i-1)
去預測orm
X t+1 = (p (t+1)i, p (t+1)i+1,..., p (t+2)i-1)
這裏選擇 i = 6。在LSTM中,time_steps = 6,則訓練集可表示爲blog
Input 1 = [p 0, p 1, p 2, p 3, p 4, p 5], Label 1 = [p 6]
Input 2 = [p 1, p 2, p 3, p 4, p 5, p 6], Label 1 = [p 7]
Input 3 = [p 2, p 3, p 4, p 5, p 6, p 7], Label 1 = [p 8]
代碼:ip
# 根據原始數據集構建矩陣 def create_dataset(data, time_steps): dataX, dataY = [], [] for i in range(len(data) - time_steps): a = data[i:(i + time_steps), 0] dataX.append(a) dataY.append(data[i + time_steps, 0]) return np.array(dataX), np.array(dataY)
設定95.55%爲訓練集,剩下的爲測試集:
# 歸一化 scaler = MinMaxScaler(feature_range=(0, 1)) data = scaler.fit_transform(data) # 切割爲訓練集和測試集 train_size = int(len(data) * 0.9555) test_size = len(data) - train_size train, test = data[0:train_size,:], data[train_size:len(data),:] time_steps = 6 trainX, trainY = create_dataset(train, time_steps) testX, testY = create_dataset(test, time_steps) # reshape輸入模型數據的格式爲:[samples, time steps, features] trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1)) testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))
1層LSTM,隱藏層的神經元個數爲128,輸出層爲1個預測值,迭代次數爲100。
Tips: LSTM參數計算
(hidden size × (hidden size + x_dim) + hidden size) × 4
x_dim爲輸入數據的特徵維度,這裏是1。
代碼:
model = Sequential() model.add(LSTM(128, input_shape=(time_steps, 1))) model.add(Dense(1)) model.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy']) model.summary() history = model.fit(trainX, trainY, epochs=100, batch_size=64, verbose=1) score = model.evaluate(testX, testY, batch_size=64, verbose=1)
可視化訓練集的loss函數結果以下圖所示。能夠見到,loss的值逐步收斂。
代碼:
def visualize_loss(history, title): loss = history.history["loss"] epochs = range(len(loss)) plt.figure() plt.plot(epochs, loss, "b", label="Training loss") plt.title(title) plt.xlabel("Epochs") plt.ylabel("Loss") plt.legend() plt.show() visualize_loss(history, "Training Loss")
代碼:
# 預測訓練集與測試集 trainPredict = model.predict(trainX) testPredict = model.predict(testX) # 對預測結果進行反歸一化處理 trainPredict = scaler.inverse_transform(trainPredict) trainY = scaler.inverse_transform([trainY]) testPredict = scaler.inverse_transform(testPredict) testY = scaler.inverse_transform([testY]) # 計算訓練集與測試集的RMSE trainScore = math.sqrt(mean_squared_error(trainY[0], trainPredict[:,0])) print('Train Score: %.2f RMSE' % (trainScore)) testScore = math.sqrt(mean_squared_error(testY[0], testPredict[:,0])) print('Test Score: %.2f RMSE' % (testScore)) # 繪製預測結果圖 trainPredictPlot = np.empty_like(data) trainPredictPlot[:, :] = np.nan trainPredictPlot[time_steps:len(trainPredict) + time_steps, :] = trainPredict testPredictPlot = np.empty_like(data) testPredictPlot[:, :] = np.nan testPredictPlot[len(trainPredict) + (time_steps * 2)-1:len(data) - 1, :] = testPredict plt.plot(scaler.inverse_transform(data)) plt.plot(trainPredictPlot) plt.plot(testPredictPlot) plt.show()
上圖中,藍色線是原始數據,橙色線和綠色線分別是訓練集和測試集的預測結果。
https://www.jianshu.com/p/38d...
https://keras.io/examples/tim...