## 導入所需的包python
import pandas as pd網絡
import numpy as npapp
import matplotlib.pyplot as pltdom
import tensorflow as tf函數
tf.reset_default_graph()學習
plt.rcParams['font.sans-serif'] = 'SimHei' ##設置字體爲SimHei顯示中文測試
plt.rcParams['axes.unicode_minus'] = False ##設置正常顯示符號字體
## 導入所需數據rest
df = pd.read_csv('美圓-人民幣.csv',encoding='gbk',engine='python')code
df['時間'] = pd.to_datetime(df['時間'],format='%Y/%m/%d')
df = df.sort_values(by='時間')
df.head()
## 用折線圖展現數據
plt.figure(figsize=(12,8))
plt.title('1999年1月1日到2018年8月21日最高價數據曲線')
plt.plot(df['時間'],df['高'])
plt.show()
### 提取測試數據
data = df.loc[:,['時間','高']]
## 標準化數據
data['高'] = (data['高']-np.mean(data['高']))/np.std(data['高'])
data['高(預)'] = data['高'].shift(-1)
data = data.iloc[:data.shape[0]-1]
data.columns = ['time','x','y']
data.head()
#獲取最高價序列
data=np.array(df['高'])
normalize_data=(data-np.mean(data))/np.std(data) #標準化
normalize_data=normalize_data[:,np.newaxis] #增長維度
#———————————————造成訓練集——————————————————
#設置常量
time_step=20 #時間步
rnn_unit=10 #hidden layer units
batch_size=60 #每一批次訓練多少個樣例
input_size=1 #輸入層維度
output_size=1 #輸出層維度
lr=0.0006 #學習率
train_x,train_y=[],[] #訓練集
for i in range(len(normalize_data)-time_step-1):
x=normalize_data[i:i+time_step]
y=normalize_data[i+1:i+time_step+1]
train_x.append(x.tolist())
train_y.append(y.tolist())
test_x = train_x[len(train_x)-31:len(train_x)-1]
test_y = train_y[len(train_y)-31:len(train_y)-1]
X=tf.placeholder(tf.float32, [None,time_step,input_size]) #每批次輸入網絡的tensor
Y=tf.placeholder(tf.float32, [None,time_step,output_size]) #每批次tensor對應的標籤
#輸入層、輸出層權重、偏置
weights={
'in':tf.Variable(tf.random_normal([input_size,rnn_unit])),
'out':tf.Variable(tf.random_normal([rnn_unit,1]))
}
biases={
'in':tf.Variable(tf.constant(0.1,shape=[rnn_unit,])),
'out':tf.Variable(tf.constant(0.1,shape=[1,]))
}
def lstm(batch): #參數:輸入網絡批次數目
w_in=weights['in']
b_in=biases['in']
input=tf.reshape(X,[-1,input_size]) #須要將tensor轉成2維進行計算,計算後的結果做爲隱藏層的輸入
input_rnn=tf.matmul(input,w_in)+b_in
input_rnn=tf.reshape(input_rnn,[-1,time_step,rnn_unit]) #將tensor轉成3維,做爲lstm cell的輸入
cell=tf.nn.rnn_cell.BasicLSTMCell(rnn_unit)
init_state=cell.zero_state(batch,dtype=tf.float32)
output_rnn,final_states=tf.nn.dynamic_rnn(cell,input_rnn,initial_state=init_state, dtype=tf.float32) #output_rnn是記錄lstm每一個輸出節點的結果,final_states是最後一個cell的結果
output=tf.reshape(output_rnn,[-1,rnn_unit]) #做爲輸出層的輸入
w_out=weights['out']
b_out=biases['out']
pred=tf.matmul(output,w_out)+b_out
return pred,final_states
def train_lstm():
global batch_size
pred,_=lstm(batch_size)
#損失函數
loss=tf.reduce_mean(tf.square(tf.reshape(pred,[-1])-tf.reshape(Y, [-1])))
train_op=tf.train.AdamOptimizer(lr).minimize(loss)
saver=tf.train.Saver(tf.global_variables())
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
#重複訓練100次
for i in range(100):
step=0
start=0
end=start+batch_size
while(end<len(train_x)): _,loss_=sess.run([train_op,loss],feed_dict={X:train_x[start:end],Y:train_y[start:end]})
start+=batch_size
end=start+batch_size
#每10步保存一次參數
if step%10==0:
print(i,step,loss_)
print("保存模型:",saver.save(sess,'.\stock.model'))
step+=1
def prediction():
pred,_=lstm(1) #預測時只輸入[1,time_step,input_size]的測試數據
saver=tf.train.Saver(tf.global_variables())
with tf.Session() as sess:
#參數恢復
module_file = tf.train.latest_checkpoint('./')
saver.restore(sess, module_file)
#取訓練集最後一行爲測試樣本。shape=[1,time_step,input_size]
prev_seq=train_x[-31]
predict=[]
#獲得以後100個預測結果
for i in range(100):
next_seq=sess.run(pred,feed_dict={X:[prev_seq]})
predict.append(next_seq[-1])
#每次獲得最後一個時間步的預測結果,與以前的數據加在一塊兒,造成新的測試樣本
prev_seq=np.vstack((prev_seq[1:],next_seq[-1]))
#以折線圖表示結果
plt.figure()
plt.plot(list(range(len(normalize_data))), normalize_data, color='b')
plt.plot(list(range(len(normalize_data), len(normalize_data) + len(predict))), predict, color='r')
plt.show()
with tf.variable_scope('train'):
train_lstm()
with tf.variable_scope('train',reuse=True):
prediction()