【轉】【強化學習】Deep Q Network(DQN)算法詳解

 原文地址:https://blog.csdn.net/qq_30615903/article/details/80744083算法

DQN(Deep Q-Learning)是將深度學習deeplearning與強化學習reinforcementlearning相結合,實現了從感知到動做的端到端的革命性算法。使用DQN玩遊戲的話簡直6的飛起,其中fladdy bird這個遊戲就已經被DQN玩壞了。當咱們的Q-table他過於龐大沒法創建的話,使用DQN是一種很好的選擇ruby

一、算法思想

       DQN與Qleanring相似都是基於值迭代的算法,可是在普通的Q-learning中,當狀態和動做空間是離散且維數不高時可以使用Q-Table儲存每一個狀態動做對的Q值,而當狀態和動做空間是高維連續時,使用Q-Table不動做空間和狀態太大十分困難。markdown


       因此在此處能夠把Q-table更新轉化爲一函數擬合問題,經過擬合一個函數function來代替Q-table產生Q值,使得相近的狀態獲得相近的輸出動做。所以咱們能夠想到深度神經網絡對複雜特徵的提取有很好效果,因此能夠將DeepLearning與Reinforcement Learning結合。這就成爲了DQN
這裏寫圖片描述
DL與RL結合存在如下問題 :網絡

  • DL是監督學習須要學習訓練集,強化學習不須要訓練集只經過環境進行返回獎勵值reward,同時也存在着噪聲和延遲的問題,因此存在不少狀態state的reward值都是0也就是樣本稀疏
  • DL每一個樣本之間互相獨立,而RL當前狀態的狀態值是依賴後面的狀態返回值的。
  • 當咱們使用非線性網絡來表示值函數的時候可能出現不穩定的問題

DQN中的兩大利器解決了以上問題app

  • 經過Q-Learning使用reward來構造標籤
  • 經過experience replay(經驗池)的方法來解決相關性及非靜態分佈問題
  • 使用一個MainNet產生當前Q值,使用另一個Target產生Target Q

 

二、experience replay 經驗池

       經驗池DQN中的記憶庫用來學習以前的經歷,又由於Q learning 是一種 off-policy 離線學習法, 它能學習當前經歷着的, 也能學習過去經歷過的, 甚至是學習別人的經歷,因此在學習過程當中隨機的加入以前的經驗會讓神經網絡更有效率。 dom


       因此經驗池解決了相關性及非靜態分佈問題。他經過在每一個timestep下agent與環境交互獲得的轉移樣本 $(s_t,a_t,r_t,s_{t+1})$ 儲存到回放記憶網絡,要訓練時就隨機拿出一些(minibatch)來訓練所以打亂其中的相關性。函數

 

三、Q-target 目標網絡

       Q-targets的做用其實也是一種打亂相關性的機制,使用Q-targets會使得DQN中出現兩個結構徹底相同可是參數卻不一樣的網絡,預測Q估計的的網絡MainNet使用的是最新的參數,而預測Q現實的神經網絡TargetNet參數使用的倒是好久以前的,$Q(s,a;θ_i)$表示當前網絡MainNet的輸出,用來評估當前狀態動做對的值函數;$Q(s,a;θ^−_i)$ 表示TargetNet的輸出,能夠解出targetQ並根據LossFunction更新MainNet的參數,每通過必定次數的迭代,將MainNet的參數複製給TargetNet。 post


       引入TargetNet後,再一段時間裏目標Q值使保持不變的,必定程度下降了當前Q值和目標Q值的相關性,提升了算法穩定性。學習

 

四、算法流程

4.一、前置公式

      DQN的更新方式和Qlearning同樣,詳細的值函數與動做值函數此處再也不推導,在Qlearning中有詳細講解不瞭解的請移步上一篇博客ui

$$Q(s,a)←Q(s,a)+α[r+γmax_{a′}Q(s′,a′)−Q(s,a)]$$

DQN的損失函數以下 θ表示網絡參數爲均方偏差損失

$$L(θ)=E[(TargetQ−Q(s,a;θ))^2]$$

$$TargetQ=r+γmax_{a′}Q(s′,a′;θ)$$
這裏寫圖片描述

4.二、算法僞代碼

      DQN中存在兩個結構徹底相同可是參數卻不一樣的網絡,預測Q估計的網絡MainNet使用的是最新的參數,而預測Q現實的神經網絡TargetNet參數使用的倒是好久以前的, $Q(s,a;θ_i)$表示當前網絡MainNet的輸出,用來評估當前狀態動做對的值函數; $Q(s,a;θ^−_i)$表示TargetNet的輸出,能夠解出targetQ,所以當agent對環境採起動做a時就能夠根據上述公式計算出Q並根據LossFunction更新MainNet的參數,每通過必定次數的迭代,將MainNet的參數複製給TargetNet。這樣就完成了一次學習過程


算法僞代碼

 

4.三、算法流程圖

 

算法流程圖

五、代碼實現

根據morvan老師的例子所得

class DeepQNetwork:
    def __init__(
            self,
            n_actions,
            n_features,
            learning_rate=0.01,
            reward_decay=0.9,
            e_greedy=0.9,
            replace_target_iter=300,
            memory_size=500,
            batch_size=32,
            e_greedy_increment=None,
            output_graph=True,
    ):
        self.n_actions = n_actions
        self.n_features = n_features
        self.lr = learning_rate
        self.gamma = reward_decay
        self.epsilon_max = e_greedy
        self.replace_target_iter = replace_target_iter
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.epsilon_increment = e_greedy_increment
        self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max

        # 統計訓練次數
        self.learn_step_counter = 0

        # 初始化記憶 memory [s, a, r, s_]
        self.memory = np.zeros((self.memory_size, n_features * 2 + 2))

        # 有兩個網絡組成 [target_net, evaluate_net]
        self._build_net()
        t_params = tf.get_collection('target_net_params')
        e_params = tf.get_collection('eval_net_params')
        self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]

        self.sess = tf.Session()

        if output_graph:
            # 開啓tensorboard
            # $ tensorboard --logdir=logs
            # tf.train.SummaryWriter soon be deprecated, use following
            tf.summary.FileWriter(r'D:\logs', self.sess.graph)

        self.sess.run(tf.global_variables_initializer())
        self.cost_his = []

    def _build_net(self):
        # -------------- 建立 eval 神經網絡, 及時提高參數 --------------
        self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s')  # 用來接收 observation
        self.q_target = tf.placeholder(tf.float32, [None, self.n_actions],
                                       name='Q_target')  # 用來接收 q_target 的值, 這個以後會經過計算獲得
        with tf.variable_scope('eval_net'):
            # c_names(collections_names) 是在更新 target_net 參數時會用到
            c_names, n_l1, w_initializer, b_initializer = \
                ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES], 10, \
                tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)  # config of layers

            # eval_net 的第一層. collections 是在更新 target_net 參數時會用到
            with tf.variable_scope('l1'):
                w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
                b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
                l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)

            # eval_net 的第二層. collections 是在更新 target_net 參數時會用到
            with tf.variable_scope('l2'):
                w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
                b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
                self.q_eval = tf.matmul(l1, w2) + b2

        with tf.variable_scope('loss'):  # 求偏差
            self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))
        with tf.variable_scope('train'):  # 梯度降低
            self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)

        # ---------------- 建立 target 神經網絡, 提供 target Q ---------------------
        self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_')  # 接收下個 observation
        with tf.variable_scope('target_net'):
            # c_names(collections_names) 是在更新 target_net 參數時會用到
            c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]

            # target_net 的第一層. collections 是在更新 target_net 參數時會用到
            with tf.variable_scope('l1'):
                w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
                b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
                l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)

            # target_net 的第二層. collections 是在更新 target_net 參數時會用到
            with tf.variable_scope('l2'):
                w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
                b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
                self.q_next = tf.matmul(l1, w2) + b2

    def store_transition(self, s, a, r, s_):
        # 判斷是否包含對應屬性 沒有就賦予初值
        if not hasattr(self, 'memory_counter'):
            self.memory_counter = 0

        # 縱向延伸
        transition = np.hstack((s, [a, r], s_))

        # 使用新的記憶替換掉舊網絡的記憶
        index = self.memory_counter % self.memory_size
        self.memory[index, :] = transition

        self.memory_counter += 1

    def choose_action(self, observation):
        # 給觀測值加上batch_size維度
        observation = observation[np.newaxis, :]

        if np.random.uniform() < self.epsilon:
            # forward feed the observation and get q value for every actions
            actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})
            action = np.argmax(actions_value)
        else:
            action = np.random.randint(0, self.n_actions)
        return action

    def learn(self):
        # 判斷是否應該更新target-net網絡了
        if self.learn_step_counter % self.replace_target_iter == 0:
            self.sess.run(self.replace_target_op)
            print('\ntarget_params_replaced\n')

        # 從之前的記憶中隨機抽取一些記憶
        if self.memory_counter > self.memory_size:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
        batch_memory = self.memory[sample_index, :]

        q_next, q_eval = self.sess.run(
            [self.q_next, self.q_eval],
            feed_dict={
                self.s_: batch_memory[:, -self.n_features:],  # fixed params
                self.s: batch_memory[:, :self.n_features],  # newest params
            })

        # change q_target w.r.t q_eval's action
        q_target = q_eval.copy()

        # 下面這幾步十分重要. q_next, q_eval 包含全部 action 的值,
        # 而咱們須要的只是已經選擇好的 action 的值, 其餘的並不須要.
        # 因此咱們將其餘的 action 值全變成 0, 將用到的 action 偏差值 反向傳遞回去, 做爲更新憑據.
        # 這是咱們最終要達到的樣子, 好比 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0]
        # q_eval = [-1, 0, 0] 表示這一個記憶中有我選用過 action 0, 而 action 0 帶來的 Q(s, a0) = -1, 因此其餘的 Q(s, a1) = Q(s, a2) = 0.
        # q_target = [1, 0, 0] 表示這個記憶中的 r+gamma*maxQ(s_) = 1, 並且無論在 s_ 上咱們取了哪一個 action,
        # 咱們都須要對應上 q_eval 中的 action 位置, 因此就將 1 放在了 action 0 的位置.

        # 下面也是爲了達到上面說的目的, 不過爲了更方面讓程序運算, 達到目的的過程有點不一樣.
        # 是將 q_eval 所有賦值給 q_target, 這時 q_target-q_eval 全爲 0,
        # 不過 咱們再根據 batch_memory 當中的 action 這個 column 來給 q_target 中的對應的 memory-action 位置來修改賦值.
        # 使新的賦值爲 reward + gamma * maxQ(s_), 這樣 q_target-q_eval 就能夠變成咱們所需的樣子.
        # 具體在下面還有一個舉例說明.

        batch_index = np.arange(self.batch_size, dtype=np.int32)
        eval_act_index = batch_memory[:, self.n_features].astype(int)
        reward = batch_memory[:, self.n_features + 1]

        q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)

        """
               假如在這個 batch 中, 咱們有2個提取的記憶, 根據每一個記憶能夠生產3個 action 的值:
               q_eval =
               [[1, 2, 3],
                [4, 5, 6]]

               q_target = q_eval =
               [[1, 2, 3],
                [4, 5, 6]]

               而後根據 memory 當中的具體 action 位置來修改 q_target 對應 action 上的值:
               好比在:
                   記憶 0 的 q_target 計算值是 -1, 並且我用了 action 0;
                   記憶 1 的 q_target 計算值是 -2, 並且我用了 action 2:
               q_target =
               [[-1, 2, 3],
                [4, 5, -2]]

               因此 (q_target - q_eval) 就變成了:
               [[(-1)-(1), 0, 0],
                [0, 0, (-2)-(6)]]

               最後咱們將這個 (q_target - q_eval) 當成偏差, 反向傳遞會神經網絡.
               全部爲 0 的 action 值是當時沒有選擇的 action, 以前有選擇的 action 纔有不爲0的值.
               咱們只反向傳遞以前選擇的 action 的值,
        """

        # 訓練eval網絡
        _, self.cost = self.sess.run([self._train_op, self.loss],
                                     feed_dict={self.s: batch_memory[:, :self.n_features],
                                                self.q_target: q_target})
        self.cost_his.append(self.cost)

        # 由於在訓練過程當中會逐漸收斂因此此處動態設置增加epsilon
        self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
        self.learn_step_counter += 1
相關文章
相關標籤/搜索