GRU是LSTM網絡的一種效果很好的變體,它較LSTM網絡的結構更加簡單,並且效果也很好,所以也是當前很是流形的一種網絡。GRU既然是LSTM的變體,所以也是能夠解決RNN網絡中的長依賴問題。html
在LSTM中引入了三個門函數:輸入門、遺忘門和輸出門來控制輸入值、記憶值和輸出值。而在GRU模型中只有兩個門:分別是更新門和重置門。具體結構以下圖所示:
圖中的update gate和reset gate分別表示更新門和重置門。
更新門的做用相似於LSTM的遺忘和輸入門。它決定前一時刻的狀態信息哪些被丟棄和哪些被更新新信息,更新門的值越大說明前一時刻的狀態信息帶入越多。
重置門控制前一狀態有多少信息被寫入到當前的候選集 h t ~ \tilde{h_{t}} ht~上,重置門越小,前一狀態的信息被寫入的越少。
詳見下圖:
python
GRU使用門控機制學習長期依賴關係的基本思想和 LSTM 一致,但仍是有一些關鍵區別:算法
一、GRU 有兩個門(重置門與更新門),而 LSTM 有三個門(輸入門、遺忘門和輸出門)。
二、GRU 並不會控制並保留內部記憶(c_t),且沒有 LSTM 中的輸出門。
三、LSTM 中的輸入與遺忘門對應於 GRU 的更新門,重置門直接做用於前面的隱藏狀態。
四、在計算輸出時並不該用二階非線性。markdown
因爲 GRU 參數更少,收斂速度更快,所以其實際花費時間要少不少,這能夠大大加速了咱們的迭代過程。 而從表現上講,兩者之間孰優孰劣並無定論,這要依據具體的任務和數據集而定,而實際上,兩者之間的 performance 差距每每並不大,遠沒有調參所帶來的效果明顯,我一般的作法,首先選擇GRU做爲基本的單元,由於其收斂速度快,能夠加速試驗進程,快速迭代,而我認爲快速迭代這一特色很重要。若是實現沒其他優化技巧,纔會嘗試將 GRU 換爲 LSTM,看看有沒有變化。網絡
實戰——使用GRU預測股票數據的下載地址:http://quotes.money.163.com/trade/lsjysj_zhishu_000001.html
首先選擇查詢數據的範圍,而後選擇下載數據:
點擊下載數據後,彈出個框,這個框裏能夠看到起止日期,數據的列,默認勾選所有,而後單擊下載。
將下載後的數據放到工程的根目錄下面,而後作一些處理。
第一步 讀入csv文件,編碼方式設置爲gbk。
第二步 將日期列轉爲「年-月-日」這樣的格式。
第三步 根據日期排序,數據默認是從降序,改成升序。
最後 保存到「sh.csv」中。
代碼以下:app
import pandas as pd df = pd.read_csv("000001.csv", encoding='gbk') df["日期"] = pd.to_datetime(df["日期"], format="%Y-%m-%d") df.sort_values(by=["日期"], ascending=True) df = df.sort_values(by=["日期"], ascending=True) df.to_csv("sh.csv", index=False, sep=',')
到這裏數據集製做完成了ide
generate_df_affect_by_n_days函數,經過一個序列來生成一個矩陣(用於處理時序的數據)。就是把當天的前n天做爲參數,當天的數據做爲label。
readData中的文件名爲:sh.csv ,參數column,表明選用的數據,本次預測只用了一列數據,列名就是column,參數n就是以前模型中所說的n,表明column前n天的數據。train_end表示的是後面多少個數據做爲測試集。函數
import pandas as pd import matplotlib.pyplot as plt import datetime import torch import torch.nn as nn import numpy as np from torch.utils.data import Dataset, DataLoader def generate_df_affect_by_n_days(series, n, index=False): if len(series) <= n: raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n)) df = pd.DataFrame() for i in range(n): df['c%d' % i] = series.tolist()[i:-(n - i)] df['y'] = series.tolist()[n:] if index: df.index = series.index[n:] return df def readData(column='收盤價', n=30, all_too=True, index=False, train_end=-300): df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8') df.fillna(0, inplace=True) df.replace(to_replace="None", value=0) del df["股票代碼"] del df["名稱"] df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index)) df_column = df[column].copy() df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:] df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index) print(df_generate_from_df_column_train) if all_too: return df_generate_from_df_column_train, df_column, df.index.tolist() return df_generate_from_df_column_train
模型見RNN類,採用GRU+全鏈接。隱藏層設置64,GRU的輸出是64維的,因此設置全鏈接也是64。
TrainSet是數據讀取類,將輸出數據的最後一列作標籤,前面的列作輸入,類的寫法是Pytorch的固定模式。學習
class RNN(nn.Module): def __init__(self, input_size): super(RNN, self).__init__() self.rnn = nn.GRU( input_size=input_size, hidden_size=64, num_layers=1, batch_first=True, ) self.out = nn.Sequential( nn.Linear(64, 1), ) self.hidden = None def forward(self, x): r_out, self.hidden = self.rnn(x) # None 表示 hidden state 會用全0的 state out = self.out(r_out) return out class TrainSet(Dataset): def __init__(self, data): # 定義好 image 的路徑 self.data, self.label = data[:, :-1].float(), data[:, -1].float() def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.data)
n爲模型中的n
LR是模型的學習率
EPOCH是屢次循環
train_end這個在以前的數據集中有提到。(注意是負數)
n = 30
LR = 0.0001
EPOCH = 100
train_end = -500
loss選用mse
預測的數據選擇「收盤價」測試
n = 10 LR = 0.0001 EPOCH = 100 train_end = -500 # 數據集創建 df, df_all, df_index = readData('收盤價', n=n, train_end=train_end) df_all = np.array(df_all.tolist()) plt.plot(df_index, df_all, label='real-data') df_numpy = np.array(df) df_numpy_mean = np.mean(df_numpy) df_numpy_std = np.std(df_numpy) df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std df_tensor = torch.Tensor(df_numpy) trainset = TrainSet(df_tensor) trainloader = DataLoader(trainset, batch_size=16, shuffle=True) rnn = RNN(n) optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters loss_func = nn.MSELoss() for step in range(EPOCH): for tx, ty in trainloader: output = rnn(torch.unsqueeze(tx, dim=0)) loss = loss_func(torch.squeeze(output), ty) optimizer.zero_grad() # clear gradients for this training step loss.backward() # back propagation, compute gradients optimizer.step() print(step, loss) if step % 10: torch.save(rnn, 'rnn.pkl') torch.save(rnn, 'rnn.pkl') generate_data_train = [] generate_data_test = [] test_index = len(df_all) + train_end df_all_normal = (df_all - df_numpy_mean) / df_numpy_std df_all_normal_tensor = torch.Tensor(df_all_normal) for i in range(n, len(df_all)): x = df_all_normal_tensor[i - n:i] x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0) y = rnn(x) if i < test_index: generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) else: generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) plt.plot(df_index[n:train_end], generate_data_train, label='generate_train') plt.plot(df_index[train_end:], generate_data_test, label='generate_test') plt.legend() plt.show() plt.cla() plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data') plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test') plt.legend() plt.show()
本實例數據集使用上證的收盤價,構建時序數據集,使用GRU對數據預測,從結果上來看,走勢有必定的滯後性,因爲只是用了價格預測價格,仍是太簡單了,能夠考慮加入更多的特徵去優化這一算法。
import pandas as pd import matplotlib.pyplot as plt import datetime import torch import torch.nn as nn import numpy as np from torch.utils.data import Dataset, DataLoader import os os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' def generate_df_affect_by_n_days(series, n, index=False): if len(series) <= n: raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n)) df = pd.DataFrame() for i in range(n): df['c%d' % i] = series.tolist()[i:-(n - i)] df['y'] = series.tolist()[n:] if index: df.index = series.index[n:] return df def readData(column='收盤價', n=30, all_too=True, index=False, train_end=-300): df = pd.read_csv("sh.csv", index_col=0, encoding='utf-8') df.fillna(0, inplace=True) df.replace(to_replace="None", value=0) del df["股票代碼"] del df["名稱"] df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d").date(), df.index)) df_column = df[column].copy() df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:] df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index) print(df_generate_from_df_column_train) if all_too: return df_generate_from_df_column_train, df_column, df.index.tolist() return df_generate_from_df_column_train class RNN(nn.Module): def __init__(self, input_size): super(RNN, self).__init__() self.rnn = nn.GRU( input_size=input_size, hidden_size=64, num_layers=1, batch_first=True, ) self.out = nn.Sequential( nn.Linear(64, 1), ) self.hidden = None def forward(self, x): r_out, self.hidden = self.rnn(x) # None 表示 hidden state 會用全0的 state out = self.out(r_out) return out class TrainSet(Dataset): def __init__(self, data): # 定義好 image 的路徑 self.data, self.label = data[:, :-1].float(), data[:, -1].float() def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.data) n = 10 LR = 0.0001 EPOCH = 100 train_end = -500 # 數據集創建 df, df_all, df_index = readData('收盤價', n=n, train_end=train_end) df_all = np.array(df_all.tolist()) plt.plot(df_index, df_all, label='real-data') df_numpy = np.array(df) df_numpy_mean = np.mean(df_numpy) df_numpy_std = np.std(df_numpy) df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std df_tensor = torch.Tensor(df_numpy) trainset = TrainSet(df_tensor) trainloader = DataLoader(trainset, batch_size=64, shuffle=True) rnn = RNN(n) optimizer = torch.optim.Adam(rnn.parameters(), lr=LR) # optimize all cnn parameters loss_func = nn.MSELoss() for step in range(EPOCH): for tx, ty in trainloader: output = rnn(torch.unsqueeze(tx, dim=0)) loss = loss_func(torch.squeeze(output), ty) optimizer.zero_grad() # clear gradients for this training step loss.backward() # back propagation, compute gradients optimizer.step() print(step, loss) if step % 10: torch.save(rnn, 'rnn.pkl') torch.save(rnn, 'rnn.pkl') generate_data_train = [] generate_data_test = [] test_index = len(df_all) + train_end df_all_normal = (df_all - df_numpy_mean) / df_numpy_std df_all_normal_tensor = torch.Tensor(df_all_normal) for i in range(n, len(df_all)): x = df_all_normal_tensor[i - n:i] x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0) y = rnn(x) if i < test_index: generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) else: generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean) plt.plot(df_index[n:train_end], generate_data_train, label='generate_train') plt.plot(df_index[train_end:], generate_data_test, label='generate_test') plt.legend() plt.show() plt.cla() plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data') plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test') plt.legend() plt.show()