參考資料:html
https://morvanzhou.github.io/git
很是感謝莫煩老師的教程github
http://mnemstudio.org/path-finding-q-learning-tutorial.htm算法
http://www.cnblogs.com/dragonir/p/6224313.html網絡
這篇文章也是用很是簡單的說明將 Q-Learning 的過程給講解清楚了session
http://www.cnblogs.com/jinxulin/tag/%E5%A2%9E%E5%BC%BA%E5%AD%A6%E4%B9%A0/app
還有感謝這位園友,將加強學習的原理講解的很是清晰框架
深度加強學習(DRL)漫談 - 從DQN到AlphaGodom
這篇文章詳細描寫了 DQN 的演變過程,建議先看看 ide
目錄:
Deep Q-Network 學習筆記(一)—— Q-Learning
Deep Q-Network 學習筆記(二)—— Q-Learning與神經網絡結合使用
這裏將使用 tensorflow 框架重寫上一篇的示例。
Q-Learning與神經網絡結合使用就是 Deep Q-Network,簡稱 DQN。在現實中,狀態的數量極多,而且須要人工去設計特徵,並且一旦特徵設計很差,則得不到想要的結果。
神經網絡正是能處理解決這個問題,取代原來 Q 表的功能。
當神經網絡與Q-Learning結合使用的時候,又會碰到幾個問題:
加強學習是試錯學習(Trail-and-error),因爲沒有直接的指導信息,智能體要以不斷與環境進行交互,經過試錯的方式來得到最佳策略。
Q-Learning正是其中的一種,因此Q值表中表示的是當前已學習到的經驗。而根據公式計算出的 Q 值是智能體經過與環境交互及自身的經驗總結獲得的一個分數(即:目標 Q 值)。
最後使用目標 Q 值(target_q)去更新原來舊的 Q 值(q)。
而目標 Q 值與舊的 Q 值的對應關係,正好是監督學習神經網絡中結果值與輸出值的對應關係。
因此,loss = (target_q - q)^2
即:整個訓練過程其實就是 Q 值(q)向目標 Q 值(target_q)逼近的過程。
在 DQN 中有 Experience Replay 的概念,就是經驗回放。
就是先讓智能體去探索環境,將經驗(記憶)池累積到必定程度,在隨機抽取出一批樣本進行訓練。
爲何要隨機抽取?由於智能體去探索環境時採集到的樣本是一個時間序列,樣本之間具備連續性,若是每次獲得樣本就更新Q值,受樣本分佈影響,會對收斂形成影響。
從如今開始,必定要理清楚算法的全部思路,好比何時該作什麼,怎麼隨機選擇動做,神經網絡的參數是否調試完成等等,各類問題調試都沒結果的,就由於這個卡在這裏大半個星期才搞定。
1.隨機初始化一個狀態 s,初始化記憶池,設置觀察值。
2.循環遍歷(是永久遍歷仍是隻遍歷必定次數這個本身設置):
(1)根據策略選擇一個行爲(a)。
(2)執行該行動(a),獲得獎勵(r)、執行該行爲後的狀態 s`和遊戲是否結束 done。
(3)保存 s, a, r, s`, done 到記憶池裏。
(4)判斷記憶池裏的數據是否足夠(即:記憶池裏的數據數量是否超過設置的觀察值),若是不夠,則轉到(5)步。
① 在記憶池裏隨機抽取出一部分數據作爲訓練樣本。
② 將全部訓練樣本的 s`作爲神經網絡的輸入值,進行批量處理,獲得 s`狀態下每一個行爲的 q 值的表。
③ 根據公式計算出 q 值表對應的 target_q 值表。
公式:Q(s, a) = r + Gamma * Max[Q(s`, all actions)]
④ 使用 q 與 target_q 訓練神經網絡。
(5)判斷遊戲是否結束。
① 遊戲結束,給 s 隨機設置一個狀態。
① 未結束,則當前狀態 s 更新爲 s`。
首先,建立一個類來實現 DQN。
import tensorflow as tf import numpy as np from collections import deque import random class DeepQNetwork: r = np.array([[-1, -1, -1, -1, 0, -1], [-1, -1, -1, 0, -1, 100.0], [-1, -1, -1, 0, -1, -1], [-1, 0, 0, -1, 0, -1], [0, -1, -1, 1, -1, 100], [-1, 0, -1, -1, 0, 100], ]) # 執行步數。 step_index = 0 # 狀態數。 state_num = 6 # 動做數。 action_num = 6 # 訓練以前觀察多少步。 OBSERVE = 1000. # 選取的小批量訓練樣本數。 BATCH = 20 # epsilon 的最小值,當 epsilon 小於該值時,將不在隨機選擇行爲。 FINAL_EPSILON = 0.0001 # epsilon 的初始值,epsilon 逐漸減少。 INITIAL_EPSILON = 0.1 # epsilon 衰減的總步數。 EXPLORE = 3000000. # 探索模式計數。 epsilon = 0 # 訓練步數統計。 learn_step_counter = 0 # 學習率。 learning_rate = 0.001 # γ經驗折損率。 gamma = 0.9 # 記憶上限。 memory_size = 5000 # 當前記憶數。 memory_counter = 0 # 保存觀察到的執行過的行動的存儲器,即:曾經經歷過的記憶。 replay_memory_store = deque() # 生成一個狀態矩陣(6 X 6),每一行表明一個狀態。 state_list = None # 生成一個動做矩陣。 action_list = None # q_eval 網絡。 q_eval_input = None action_input = None q_target = None q_eval = None predict = None loss = None train_op = None cost_his = None reward_action = None # tensorflow 會話。 session = None def __init__(self, learning_rate=0.001, gamma=0.9, memory_size=5000): self.learning_rate = learning_rate self.gamma = gamma self.memory_size = memory_size # 初始化成一個 6 X 6 的狀態矩陣。 self.state_list = np.identity(self.state_num) # 初始化成一個 6 X 6 的動做矩陣。 self.action_list = np.identity(self.action_num) # 建立神經網絡。 self.create_network() # 初始化 tensorflow 會話。 self.session = tf.InteractiveSession() # 初始化 tensorflow 參數。 self.session.run(tf.initialize_all_variables()) # 記錄全部 loss 變化。 self.cost_his = [] def create_network(self): """ 建立神經網絡。 :return: """ pass def select_action(self, state_index): """ 根據策略選擇動做。 :param state_index: 當前狀態。 :return: """ pass def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done): """ 保存記憶。 :param current_state_index: 當前狀態 index。 :param current_action_index: 動做 index。 :param current_reward: 獎勵。 :param next_state_index: 下一個狀態 index。 :param done: 是否結束。 :return: """ pass def step(self, state, action): """ 執行動做。 :param state: 當前狀態。 :param action: 執行的動做。 :return: """ pass def experience_replay(self): """ 記憶回放。 :return: """ pass def train(self): """ 訓練。 :return: """ pass def pay(self): """ 運行並測試。 :return: """ pass if __name__ == "__main__": q_network = DeepQNetwork() q_network.pay()
1.將狀態與動做初始化成如下矩陣,以方便處理。
圖 3.1
而後,建立一個神經網絡,並使用該神經網絡來替換掉 Q 值表(上一篇中的 Q 矩陣)
神經網絡的輸入是 Agent 當前的狀態,輸出是 Agent 當前狀態能夠執行的動做的 Q 值表。
因爲總共有 6 個狀態和 6 種動做,因此,這裏將建立一個簡單 3 層的神經網絡,輸入層的參數是 6 個和輸出層輸出 6 個值,運行並調試好參數,確認能正常運行。
測試代碼:
import tensorflow as tf import numpy as np input_num = 6 output_num = 6 x_data = np.linspace(-1, 1, 300).reshape((-1, input_num)) # 轉爲列向量 noise = np.random.normal(0, 0.05, x_data.shape) y_data = np.square(x_data) + 0.5 + noise xs = tf.placeholder(tf.float32, [None, input_num]) # 樣本數未知,特徵數爲 6,佔位符最後要以字典形式在運行中填入 ys = tf.placeholder(tf.float32, [None, output_num]) neuro_layer_1 = 3 w1 = tf.Variable(tf.random_normal([input_num, neuro_layer_1])) b1 = tf.Variable(tf.zeros([1, neuro_layer_1]) + 0.1) l1 = tf.nn.relu(tf.matmul(xs, w1) + b1) neuro_layer_2 = output_num w2 = tf.Variable(tf.random_normal([neuro_layer_1, neuro_layer_2])) b2 = tf.Variable(tf.zeros([1, neuro_layer_2]) + 0.1) l2 = tf.matmul(l1, w2) + b2 # reduction_indices=[0] 表示將列數據累加到一塊兒。 # reduction_indices=[1] 表示將行數據累加到一塊兒。 loss = tf.reduce_mean(tf.reduce_sum(tf.square((ys - l2)), reduction_indices=[1])) # 選擇梯度降低法 train = tf.train.GradientDescentOptimizer(0.001).minimize(loss) # train = tf.train.AdamOptimizer(1e-1).minimize(loss) init = tf.initialize_all_variables() sess = tf.Session() sess.run(init) for i in range(100000): sess.run(train, feed_dict={xs: x_data, ys: y_data}) if i % 1000 == 0: print(sess.run(loss, feed_dict={xs: x_data, ys: y_data}))
執行後 loss 一直持續減小,確認該神經網絡正常運行就好了,注意調好學習率和神經網絡的層數及神經元個數。
確認正常後,開始實現 DeepQNetwork 類中的 def create_network(self) 函數:
def create_network(self): """ 建立神經網絡。 :return: """ self.q_eval_input = tf.placeholder(shape=[None, self.state_num], dtype=tf.float32) self.action_input = tf.placeholder(shape=[None, self.action_num], dtype=tf.float32) self.q_target = tf.placeholder(shape=[None], dtype=tf.float32) neuro_layer_1 = 3 w1 = tf.Variable(tf.random_normal([self.state_num, neuro_layer_1])) b1 = tf.Variable(tf.zeros([1, neuro_layer_1]) + 0.1) l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1) w2 = tf.Variable(tf.random_normal([neuro_layer_1, self.action_num])) b2 = tf.Variable(tf.zeros([1, self.action_num]) + 0.1) self.q_eval = tf.matmul(l1, w2) + b2 # 取出當前動做的得分。 self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval, self.action_input), reduction_indices=1) self.loss = tf.reduce_mean(tf.square((self.q_target - self.reward_action))) self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss) self.predict = tf.argmax(self.q_eval, 1)
這裏說明一下 loss 的計算,因爲狀態是根據圖 3.1 的矩陣的方式顯示的,好比,當前狀態若是是在 1 號房間,
則輸入參數(q_eval_input)的值是:[[0, 1, 0, 0, 0, 0]]
因爲 Agent 執行了動做 3,也就是移動到了 3 號房間,
因此 Agent 的動做參數(action_input)的值是:[[0, 0, 0, 1, 0, 0]]
由於神經網絡的輸出結果(q_eval)是 Agent 當前狀態下可執行的動做的價值,因爲每一個狀態都有 6 個動做,而狀態數也是 6 個,因此
神經網絡的輸出結果(q_eval)與輸入參數是同樣的,因此輸出的格式也同樣,
假設輸出結果(q_eval)是:[[0.81, 0.5, 0.24, 0.513, 0.9, 0.71]]
tf.multiply(self.q_eval, self.action_input)
就是矩陣的點積,也就是每一個元素分別相乘。
這裏表示的就是得到 Agent 執行了 action_input 的價值(Q 值)。
也就是 q = q_eval * action_input = [[0, 0, 0, 0.513, 0, 0]]
因此:
self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval, self.action_input), reduction_indices=1)
也就至關於:q = SUM(q_eval * action_input) = SUM([[0, 0, 0, 0.513, 0, 0, 0]]) = 0.513
即:Agent 在 1 號房間執行移動到 3 號房間動做時,神經網絡給出的價值(q 值)是 0.513 分
而 q_target 在前面也提過,是 Agent 通過環境體驗後,根據公式計算出來的目標 q 值,假設該值是 1.03 分
因此:
self.loss = tf.reduce_mean(tf.square((self.q_target - self.reward_action)))
就至關於:
loss = ((1.03 - 0.513)^2) / 1 = 0.267289
而後,learning_rate 也就是學習率,通過調試,這裏是設置成 0.001 比較好。
這裏是 DQN 須要注意的地方之一,這裏的方法將直接影響到 DQN 是否能夠收斂,或者是不是陷入局部最小值等狀況。
如今在這裏選擇了最直接的方法,使用隨機的方式來選擇行動。
使用隨機的方式來選擇行動,可讓 Agent 能獲得更多的探索機會,這樣在訓練時纔能有效的跳出陷入局部最小值的狀況,當訓練時,能夠減小探索機會。
流程以下:
1.初始化 epsilon 變量,並設置它的最小值(FINAL_EPSILON)與最大值(INITIAL_EPSILON),並將 epsilon 的初始值設置成 INITIAL_EPSILON。
2.隨機生成一個數 n。
3.判斷 n 是否小於 epsilon,若是 n 小於 epsilon 則轉到 4,不然轉到 5。
4.使用隨機策略(增長探索機會),轉到 6。
隨機選擇一個在 Agent 當前狀態下能夠執行的動做。
5.使用神經網絡直接計算出結果(實際應用時也是應用這方法),轉到6。
神經網絡會輸出在當前狀態下全部動做的 Q 值,選擇其中最有價值(Q 值最大)的動做返回。
6.判斷是否開始訓練,若是是,則逐步減小 epsilon 來減小探索機會,不然跳過。
開始實現 DeepQNetwork 類中的 def select_action(self, state_index) 函數:
def select_action(self, state_index): """ 根據策略選擇動做。 :param state_index: 當前狀態。 :return: """ current_state = self.state_list[state_index:state_index + 1] if np.random.uniform() < self.epsilon: current_action_index = np.random.randint(0, self.action_num) else: actions_value = self.session.run(self.q_eval, feed_dict={self.q_eval_input: current_state}) action = np.argmax(actions_value) current_action_index = action # 開始訓練後,在 epsilon 小於必定的值以前,將逐步減少 epsilon。 if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON: self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE return current_action_index
這裏使用了一個先進先出的隊列,設置好隊列的 size,直接將「當前狀態」、「執行動做」、「獎勵分數」、「下一個狀態」和「遊戲是否結束」保存進去就好了。
開始實現 DeepQNetwork 類中的 def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done) 函數:
def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done): """ 保存記憶。 :param current_state_index: 當前狀態 index。 :param current_action_index: 動做 index。 :param current_reward: 獎勵。 :param next_state_index: 下一個狀態 index。 :param done: 是否結束。 :return: """ current_state = self.state_list[current_state_index:current_state_index + 1] current_action = self.action_list[current_action_index:current_action_index + 1] next_state = self.state_list[next_state_index:next_state_index + 1] # 記憶動做(當前狀態, 當前執行的動做, 當前動做的得分,下一個狀態)。 self.replay_memory_store.append(( current_state, current_action, current_reward, next_state, done)) # 若是超過記憶的容量,則將最久遠的記憶移除。 if len(self.replay_memory_store) > self.memory_size: self.replay_memory_store.popleft() self.memory_counter += 1
這裏就是取得遊戲是否結束狀態,動做獎勵和下一個狀態並返回就能夠了。
開始實現 DeepQNetwork 類中的 def step(self, state, action) 函數:
def step(self, state, action): """ 執行動做。 :param state: 當前狀態。 :param action: 執行的動做。 :return: """ reward = self.r[state][action] next_state = action done = False if action == 5: done = True return next_state, reward, done
這是 DQN 的重點之一,在記憶池裏隨機抽取出一小批的數據當作訓練樣本,並計算出目標 Q 值來訓練神經網絡。
流程以下:
1. 初始化時先設置抽取的樣本數。
2. 從記憶池裏隨機抽取出一批樣本。
3. 因爲每條樣本中,都保存有當時的數據(當前狀態,動做,獎勵分數,下一個狀態,是否結束),因此,爲了計算出這些樣本數據的目標 Q 值,就必須先取出樣本中「下一個狀態(next_state)」(注意:這裏取到的是全部這批樣本的「下一個狀態」的列表!)。
4. 將 next_state (這是批數據!!)當作參數傳入神經網絡,獲得 Agent 在 next_state 狀態時全部可執行的動做的 Q 值表(q_next),q_next 表示這批樣本中全部的 next_state 狀態的 Q 值表的集合。
5. 如今,已經拿到了
Agent 當時的狀態(state),
當時的動做(action),
當時的狀態(state)下執行動做(action)獲得的獎勵R(state, action),
當時的狀態(state)下執行動做(action)後的狀態(next_state)下全部可執行的動做的 Q 值表(q_next)。
如今就可使用上面提到的公式來計算出目標 Q 值Q(state, action)。
Q(state, action) = R(state, action) + Gamma * Max{q_next}
6. 根據遊戲狀態判斷,當前選擇的動做是不是違規(不可執行)的動做,若是是,則不作經驗計算,直接扣除分數,不然使用上面的公式來計算出Q(state, action)。違規的動做,在搜索動做的時候,是不該該能選擇出來的,即:能走到這一步的都必須是有效動做!
7. 將計算獲得的全部樣本的 Q(state, action) 保存到集合中(q_target)。
8. 將這批樣本的當前狀態的集合,動做的集合與 q_target 傳入神經網絡並進行訓練。
特別注意第 6 條的內容,若是這裏處理很差,同樣會得不到結果的,具體緣由能夠看上一篇 。
開始實現 DeepQNetwork 類中的 def experience_replay(self)函數:
def experience_replay(self): """ 記憶回放。 :return: """ # 隨機選擇一小批記憶樣本。 batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter minibatch = random.sample(self.replay_memory_store, batch) batch_state = None batch_action = None batch_reward = None batch_next_state = None batch_done = None for index in range(len(minibatch)): if batch_state is None: batch_state = minibatch[index][0] elif batch_state is not None: batch_state = np.vstack((batch_state, minibatch[index][0])) if batch_action is None: batch_action = minibatch[index][1] elif batch_action is not None: batch_action = np.vstack((batch_action, minibatch[index][1])) if batch_reward is None: batch_reward = minibatch[index][2] elif batch_reward is not None: batch_reward = np.vstack((batch_reward, minibatch[index][2])) if batch_next_state is None: batch_next_state = minibatch[index][3] elif batch_next_state is not None: batch_next_state = np.vstack((batch_next_state, minibatch[index][3])) if batch_done is None: batch_done = minibatch[index][4] elif batch_done is not None: batch_done = np.vstack((batch_done, minibatch[index][4])) # q_next:下一個狀態的 Q 值。 q_next = self.session.run([self.q_eval], feed_dict={self.q_eval_input: batch_next_state}) q_target = [] for i in range(len(minibatch)): # 當前即時得分。 current_reward = batch_reward[i][0] # # 遊戲是否結束。 # current_done = batch_done[i][0] # 更新 Q 值。 q_value = current_reward + self.gamma * np.max(q_next[0][i]) # 當得分小於 -1 時,表示走了不可走的位置。 if current_reward <= -1: q_target.append(current_reward) else: q_target.append(q_value) _, cost, reward = self.session.run([self.train_op, self.loss, self.reward_action], feed_dict={self.q_eval_input: batch_state, self.action_input: batch_action, self.q_target: q_target}) self.cost_his.append(cost) # if self.step_index % 1000 == 0: # print("loss:", cost) self.learn_step_counter += 1
這部分在前面都分析過了,看看就好了。
實現 DeepQNetwork 類中的 def train(self)函數:
def train(self): """ 訓練。 :return: """ # 初始化當前狀態。 current_state = np.random.randint(0, self.action_num - 1) self.epsilon = self.INITIAL_EPSILON while True: # 選擇動做。 action = self.select_action(current_state) # 執行動做,獲得:下一個狀態,執行動做的得分,是否結束。 next_state, reward, done = self.step(current_state, action) # 保存記憶。 self.save_store(current_state, action, reward, next_state, done) # 先觀察一段時間累積足夠的記憶在進行訓練。 if self.step_index > self.OBSERVE: self.experience_replay() if self.step_index > 10000: break if done: current_state = np.random.randint(0, self.action_num - 1) else: current_state = next_state self.step_index += 1
實現 DeepQNetwork 類中的 def pay(self)函數:
def pay(self): """ 運行並測試。 :return: """ self.train() # 顯示 R 矩陣。 print(self.r) for index in range(5): start_room = index print("#############################", "Agent 在", start_room, "開始行動", "#############################") current_state = start_room step = 0 target_state = 5 while current_state != target_state: out_result = self.session.run(self.q_eval, feed_dict={ self.q_eval_input: self.state_list[current_state:current_state + 1]}) next_state = np.argmax(out_result[0]) print("Agent 由", current_state, "號房間移動到了", next_state, "號房間") current_state = next_state step += 1 print("Agent 在", start_room, "號房間開始移動了", step, "步到達了目標房間 5") print("#############################", "Agent 在", 5, "結束行動", "#############################")
import tensorflow as tf import numpy as np from collections import deque import random class DeepQNetwork: r = np.array([[-1, -1, -1, -1, 0, -1], [-1, -1, -1, 0, -1, 100.0], [-1, -1, -1, 0, -1, -1], [-1, 0, 0, -1, 0, -1], [0, -1, -1, 1, -1, 100], [-1, 0, -1, -1, 0, 100], ]) # 執行步數。 step_index = 0 # 狀態數。 state_num = 6 # 動做數。 action_num = 6 # 訓練以前觀察多少步。 OBSERVE = 1000. # 選取的小批量訓練樣本數。 BATCH = 20 # epsilon 的最小值,當 epsilon 小於該值時,將不在隨機選擇行爲。 FINAL_EPSILON = 0.0001 # epsilon 的初始值,epsilon 逐漸減少。 INITIAL_EPSILON = 0.1 # epsilon 衰減的總步數。 EXPLORE = 3000000. # 探索模式計數。 epsilon = 0 # 訓練步數統計。 learn_step_counter = 0 # 學習率。 learning_rate = 0.001 # γ經驗折損率。 gamma = 0.9 # 記憶上限。 memory_size = 5000 # 當前記憶數。 memory_counter = 0 # 保存觀察到的執行過的行動的存儲器,即:曾經經歷過的記憶。 replay_memory_store = deque() # 生成一個狀態矩陣(6 X 6),每一行表明一個狀態。 state_list = None # 生成一個動做矩陣。 action_list = None # q_eval 網絡。 q_eval_input = None action_input = None q_target = None q_eval = None predict = None loss = None train_op = None cost_his = None reward_action = None # tensorflow 會話。 session = None def __init__(self, learning_rate=0.001, gamma=0.9, memory_size=5000): self.learning_rate = learning_rate self.gamma = gamma self.memory_size = memory_size # 初始化成一個 6 X 6 的狀態矩陣。 self.state_list = np.identity(self.state_num) # 初始化成一個 6 X 6 的動做矩陣。 self.action_list = np.identity(self.action_num) # 建立神經網絡。 self.create_network() # 初始化 tensorflow 會話。 self.session = tf.InteractiveSession() # 初始化 tensorflow 參數。 self.session.run(tf.initialize_all_variables()) # 記錄全部 loss 變化。 self.cost_his = [] def create_network(self): """ 建立神經網絡。 :return: """ self.q_eval_input = tf.placeholder(shape=[None, self.state_num], dtype=tf.float32) self.action_input = tf.placeholder(shape=[None, self.action_num], dtype=tf.float32) self.q_target = tf.placeholder(shape=[None], dtype=tf.float32) neuro_layer_1 = 3 w1 = tf.Variable(tf.random_normal([self.state_num, neuro_layer_1])) b1 = tf.Variable(tf.zeros([1, neuro_layer_1]) + 0.1) l1 = tf.nn.relu(tf.matmul(self.q_eval_input, w1) + b1) w2 = tf.Variable(tf.random_normal([neuro_layer_1, self.action_num])) b2 = tf.Variable(tf.zeros([1, self.action_num]) + 0.1) self.q_eval = tf.matmul(l1, w2) + b2 # 取出當前動做的得分。 self.reward_action = tf.reduce_sum(tf.multiply(self.q_eval, self.action_input), reduction_indices=1) self.loss = tf.reduce_mean(tf.square((self.q_target - self.reward_action))) self.train_op = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss) self.predict = tf.argmax(self.q_eval, 1) def select_action(self, state_index): """ 根據策略選擇動做。 :param state_index: 當前狀態。 :return: """ current_state = self.state_list[state_index:state_index + 1] if np.random.uniform() < self.epsilon: current_action_index = np.random.randint(0, self.action_num) else: actions_value = self.session.run(self.q_eval, feed_dict={self.q_eval_input: current_state}) action = np.argmax(actions_value) current_action_index = action # 開始訓練後,在 epsilon 小於必定的值以前,將逐步減少 epsilon。 if self.step_index > self.OBSERVE and self.epsilon > self.FINAL_EPSILON: self.epsilon -= (self.INITIAL_EPSILON - self.FINAL_EPSILON) / self.EXPLORE return current_action_index def save_store(self, current_state_index, current_action_index, current_reward, next_state_index, done): """ 保存記憶。 :param current_state_index: 當前狀態 index。 :param current_action_index: 動做 index。 :param current_reward: 獎勵。 :param next_state_index: 下一個狀態 index。 :param done: 是否結束。 :return: """ current_state = self.state_list[current_state_index:current_state_index + 1] current_action = self.action_list[current_action_index:current_action_index + 1] next_state = self.state_list[next_state_index:next_state_index + 1] # 記憶動做(當前狀態, 當前執行的動做, 當前動做的得分,下一個狀態)。 self.replay_memory_store.append(( current_state, current_action, current_reward, next_state, done)) # 若是超過記憶的容量,則將最久遠的記憶移除。 if len(self.replay_memory_store) > self.memory_size: self.replay_memory_store.popleft() self.memory_counter += 1 def step(self, state, action): """ 執行動做。 :param state: 當前狀態。 :param action: 執行的動做。 :return: """ reward = self.r[state][action] next_state = action done = False if action == 5: done = True return next_state, reward, done def experience_replay(self): """ 記憶回放。 :return: """ # 隨機選擇一小批記憶樣本。 batch = self.BATCH if self.memory_counter > self.BATCH else self.memory_counter minibatch = random.sample(self.replay_memory_store, batch) batch_state = None batch_action = None batch_reward = None batch_next_state = None batch_done = None for index in range(len(minibatch)): if batch_state is None: batch_state = minibatch[index][0] elif batch_state is not None: batch_state = np.vstack((batch_state, minibatch[index][0])) if batch_action is None: batch_action = minibatch[index][1] elif batch_action is not None: batch_action = np.vstack((batch_action, minibatch[index][1])) if batch_reward is None: batch_reward = minibatch[index][2] elif batch_reward is not None: batch_reward = np.vstack((batch_reward, minibatch[index][2])) if batch_next_state is None: batch_next_state = minibatch[index][3] elif batch_next_state is not None: batch_next_state = np.vstack((batch_next_state, minibatch[index][3])) if batch_done is None: batch_done = minibatch[index][4] elif batch_done is not None: batch_done = np.vstack((batch_done, minibatch[index][4])) # q_next:下一個狀態的 Q 值。 q_next = self.session.run([self.q_eval], feed_dict={self.q_eval_input: batch_next_state}) q_target = [] for i in range(len(minibatch)): # 當前即時得分。 current_reward = batch_reward[i][0] # # 遊戲是否結束。 # current_done = batch_done[i][0] # 更新 Q 值。 q_value = current_reward + self.gamma * np.max(q_next[0][i]) # 當得分小於 -1 時,表示走了不可走的位置。 if current_reward <= -1: q_target.append(current_reward) else: q_target.append(q_value) _, cost, reward = self.session.run([self.train_op, self.loss, self.reward_action], feed_dict={self.q_eval_input: batch_state, self.action_input: batch_action, self.q_target: q_target}) self.cost_his.append(cost) # if self.step_index % 1000 == 0: # print("loss:", cost) self.learn_step_counter += 1 def train(self): """ 訓練。 :return: """ # 初始化當前狀態。 current_state = np.random.randint(0, self.action_num - 1) self.epsilon = self.INITIAL_EPSILON while True: # 選擇動做。 action = self.select_action(current_state) # 執行動做,獲得:下一個狀態,執行動做的得分,是否結束。 next_state, reward, done = self.step(current_state, action) # 保存記憶。 self.save_store(current_state, action, reward, next_state, done) # 先觀察一段時間累積足夠的記憶在進行訓練。 if self.step_index > self.OBSERVE: self.experience_replay() if self.step_index > 10000: break if done: current_state = np.random.randint(0, self.action_num - 1) else: current_state = next_state self.step_index += 1 def pay(self): """ 運行並測試。 :return: """ self.train() # 顯示 R 矩陣。 print(self.r) for index in range(5): start_room = index print("#############################", "Agent 在", start_room, "開始行動", "#############################") current_state = start_room step = 0 target_state = 5 while current_state != target_state: out_result = self.session.run(self.q_eval, feed_dict={ self.q_eval_input: self.state_list[current_state:current_state + 1]}) next_state = np.argmax(out_result[0]) print("Agent 由", current_state, "號房間移動到了", next_state, "號房間") current_state = next_state step += 1 print("Agent 在", start_room, "號房間開始移動了", step, "步到達了目標房間 5") print("#############################", "Agent 在", 5, "結束行動", "#############################") if __name__ == "__main__": q_network = DeepQNetwork() q_network.pay()