DQN(Deep Q-learning)入門教程(六)之DQN Play Flappy-bird ,MountainCar

DQN(Deep Q-learning)入門教程(四)之Q-learning Play Flappy Bird中,咱們使用q-learning算法去對Flappy Bird進行強化學習,而在這篇博客中咱們將使用神經網絡模型來代替Q-table,關於DQN的介紹,能夠參考我前一篇博客:DQN(Deep Q-learning)入門教程(五)之DQN介紹html

在這篇博客中將使用DQN作以下操做:python

  • Flappy Bird
  • MountainCar-v0

再回顧一下DQN的算法流程:git

項目地址:Githubgithub

MountainCar-v0

MountainCar的訓練好的Gif示意圖以下所示,汽車起始位置位於山的底部,最終目標是駛向右邊山的插旗的地方,其中,汽車的引擎不可以直接駛向終點,必須藉助左邊的山體的重力加速度纔可以駛向終點。算法

MountainCar-v0由OpenAI提供,python包爲gym,官網網站爲https://gym.openai.com/envs/MountainCar-v0/。在Gym包中,提供了不少能夠用於強化學習的環境(env):數組

在MountainCar-v0中,狀態有2個變量,car position(汽車的位置),car vel(汽車的速度),action一共有3種 Accelerate to the Left Don't accelerateAccelerate to the Right,而後當車達到旗幟的地方(position = 0.5)會獲得\(reward = 1\)的獎勵,若是沒有達到則爲\(-1\)。可是若是當你運行步驟超過200次的時候,遊戲就會結束。詳情能夠參考源代碼(ps:官方文檔中沒有這些說明)。網絡

下面介紹一下gym中幾個經常使用的函數:app

  • env = gym.make("MountainCar-v0")

    這個就是建立一個MountainCar-v0的遊戲環境。dom

  • state = env.reset()

    重置環境,返回重置後的state函數

  • env.render()

    將運行畫面展現在屏幕上面,當咱們在訓練的時候能夠不使用這個來提高速度。

  • next_state, reward, done, _ = env.step(action)

    執行action動做,返回下一個狀態,獎勵,是否完成,info。

初始化Agent

初始化Agent直接使用代碼說明吧,這個仍是比較簡單的:

import keras
import random
from collections import deque

import gym
import numpy as np
from keras.layers import Dense
from keras.models import Sequential


class Agent():
    def __init__(self, action_set, observation_space):
        """
        初始化
        :param action_set: 動做集合
        :param observation_space: 環境屬性,咱們須要使用它獲得state的shape
        """
        # 獎勵衰減
        self.gamma = 1.0
        # 從經驗池中取出數據的數量
        self.batch_size = 50
        # 經驗池
        self.memory = deque(maxlen=2000000)
        # 探索率
        self.greedy = 1.0
        # 動做集合
        self.action_set = action_set
        # 環境的屬性
        self.observation_space = observation_space
        # 神經網路模型
        self.model = self.init_netWork()

    def init_netWork(self):
        """
        構建模型
        :return: 模型
        """
        model = Sequential()
        # self.observation_space.shape[0],state的變量的數量
        model.add(Dense(64 * 4, activation="tanh", input_dim=self.observation_space.shape[0]))
        model.add(Dense(64 * 4, activation="tanh"))
        # self.action_set.n 動做的數量
        model.add(Dense(self.action_set.n, activation="linear"))
        model.compile(loss=keras.losses.mean_squared_error,
                      optimizer=keras.optimizers.RMSprop(lr=0.001))
        return model

咱們使用隊列來保存經驗,這樣的話新的數據就會覆蓋遠古的數據。此時咱們定義一個函數,專門用來將數據保存到經驗池中,而後定義一個函數用來更新\(\epsilon\)探索率。

def add_memory(self, sample):
    self.memory.append(sample)

def update_greedy(self):
    # 小於最小探索率的時候就不進行更新了。
    if self.greedy > 0.01:
        self.greedy *= 0.995

訓練模型

首先先看代碼:

def train_model(self):
    # 從經驗池中隨機選擇部分數據
    train_sample = random.sample(self.memory, k=self.batch_size)

    train_states = []
    next_states = []
    for sample in train_sample:
        cur_state, action, r, next_state, done = sample
        next_states.append(next_state)
        train_states.append(cur_state)

    # 轉成np數組
    next_states = np.array(next_states)
    train_states = np.array(train_states)
    # 獲得next_state的q值
    next_states_q = self.model.predict(next_states)

    # 獲得state的預測值
    state_q = self.model.predict_on_batch(train_states)

    # 計算Q現實
    for index, sample in enumerate(train_sample):
        cur_state, action, r, next_state, done = sample
        if not done:
            state_q[index][action] = r + self.gamma * np.max(next_states_q[index])
        else:
            state_q[index][action] = r
    
    self.model.train_on_batch(train_states, state_q)

你們確定從上面的代碼發現一些問題,使用了兩個for循環,why?首先先說一下兩個for循環分別的做用:

  • 第一個for循環:獲得train_statesnext_states,其中next_states是爲了計算Q現實。
  • 第二個for循環:計算Q現實

可能有人會有一個疑問,爲何我不寫成一個for循環呢?實際上寫成一個for循環是徹底沒有問題的,很🆗,可是寫成一個for循環意味着咱們要屢次調用model.predict_on_batch,這樣會耗費必定的時間(親身試驗過,這樣會比較慢),所以,咱們寫成了兩個for循環,而後只須要調用一次predict

執行動做與選擇最佳動做

執行動做的代碼以下所示:

def act(self, env, action):
    """
    執行動做
    :param env: 執行環境
    :param action: 執行的動做
    :return: ext_state, reward, done
    """
    next_state, reward, done, _ = env.step(action)

    if done:
        if reward < 0:
            reward = -100
        else:
            reward = 10
    else:
        if next_state[0] >= 0.4:
            reward += 1

    return next_state, reward, done

其中,咱們能夠修改獎勵以加快網絡收斂。

選擇最好的動做的動做以下所示,會以必定的探索率隨機選擇動做。

def get_best_action(self, state):
    if random.random() < self.greedy:
        return self.action_set.sample()
    else:
        return np.argmax(self.model.predict(state.reshape(-1, 2)))

開始訓練

關於具體的解釋,在註釋中已經詳細的說明了:

if __name__ == "__main__":
    # 訓練次數
    episodes = 10000
    # 實例化遊戲環境
    env = gym.make("MountainCar-v0")
    # 實例化Agent
    agent = Agent(env.action_space, env.observation_space)
    # 遊戲中動做執行的次數(最大爲200)
    counts = deque(maxlen=10)

    for episode in range(episodes):
        count = 0
        # 重置遊戲
        state = env.reset()

        # 剛開始不當即更新探索率
        if episode >= 5:
            agent.update_greedy()

        while True:
            count += 1
            # 得到最佳動做
            action = agent.get_best_action(state)
            next_state, reward, done = agent.act(env, action)
            agent.add_memory((state, action, reward, next_state, done))
            # 剛開始不當即訓練模型,先填充經驗池
            if episode >= 5:
                agent.train_model()
            state = next_state
            if done:
                # 將執行的次數添加到counts中
                counts.append(count)
                print("在{}輪中,agent執行了{}次".format(episode + 1, count))

                # 若是近10次,動做執行的平均次數少於160,則保存模型並退出
                if len(counts) == 10 and np.mean(counts) < 160:
                    agent.model.save("car_model.h5")
                    exit(0)
                break

訓練必定的次數後,咱們就能夠獲得模型了。而後進行測試。

模型測試

測試的代碼沒什麼好說的,以下所示:

import gym
from keras.models import load_model
import numpy as np
model = load_model("car_model.h5")

env = gym.make("MountainCar-v0")

for i in range(100):
    state = env.reset()
    count = 0
    while True:
        env.render()
        count += 1
        action = np.argmax(model.predict(state.reshape(-1, 2)))
        next_state, reward, done, _ = env.step(action)
        state = next_state
        if done:
            print("遊戲的次數:", count)
            break

部分的結果以下:

Flappy Bird

FlappyBird的代碼我就不過多贅述了,裏面的一些函數介紹能夠參照這個來看:DQN(Deep Q-learning)入門教程(四)之Q-learning Play Flappy Bird,代碼思想與訓練Mountain-Car基本是一致的。

import random
from collections import deque

import keras
import numpy as np
from keras.layers import Dense
from keras.models import Sequential
from ple import PLE
from ple.games import FlappyBird


class Agent():
    def __init__(self, action_set):
        self.gamma = 1
        self.model = self.init_netWork()
        self.batch_size = 128
        self.memory = deque(maxlen=2000000)
        self.greedy = 1
        self.action_set = action_set

    def get_state(self, state):
        """
        提取遊戲state中咱們須要的數據
        :param state: 遊戲state
        :return: 返回提取好的數據
        """
        return_state = np.zeros((3,))
        dist_to_pipe_horz = state["next_pipe_dist_to_player"]
        dist_to_pipe_bottom = state["player_y"] - state["next_pipe_top_y"]
        velocity = state['player_vel']
        return_state[0] = dist_to_pipe_horz
        return_state[1] = dist_to_pipe_bottom
        return_state[2] = velocity
        return return_state

    def init_netWork(self):
        """
        構建模型
        :return:
        """
        model = Sequential()
        model.add(Dense(64 * 4, activation="tanh", input_shape=(3,)))
        model.add(Dense(64 * 4, activation="tanh"))
        model.add(Dense(2, activation="linear"))

        model.compile(loss=keras.losses.mean_squared_error,
                      optimizer=keras.optimizers.RMSprop(lr=0.001))
        return model

    def train_model(self):
        if len(self.memory) < 2500:
            return

        train_sample = random.sample(self.memory, k=self.batch_size)
        train_states = []
        next_states = []

        for sample in train_sample:
            cur_state, action, r, next_state, done = sample
            next_states.append(next_state)
            train_states.append(cur_state)
        # 轉成np數組
        next_states = np.array(next_states)
        train_states = np.array(train_states)

        # 獲得下一個state的q值
        next_states_q = self.model.predict(next_states)
        # 獲得預測值
        state_q = self.model.predict_on_batch(train_states)

        for index, sample in enumerate(train_sample):
            cur_state, action, r, next_state, done = sample
            # 計算Q現實
            if not done:
                state_q[index][action] = r + self.gamma * np.max(next_states_q[index])
            else:
                state_q[index][action] = r
        self.model.train_on_batch(train_states, state_q)

    def add_memory(self, sample):
        self.memory.append(sample)

    def update_greedy(self):
        if self.greedy > 0.01:
            self.greedy *= 0.995

    def get_best_action(self, state):
        if random.random() < self.greedy:
            return random.randint(0, 1)
        else:
            return np.argmax(self.model.predict(state.reshape(-1, 3)))

    def act(self, p, action):
        """
        執行動做
        :param p: 經過p來向遊戲發出動做命令
        :param action: 動做
        :return: 獎勵
        """
        r = p.act(self.action_set[action])
        if r == 0:
            r = 1
        if r == 1:
            r = 100
        else:
            r = -1000
        return r


if __name__ == "__main__":
    # 訓練次數
    episodes = 20000
    # 實例化遊戲對象
    game = FlappyBird()
    # 相似遊戲的一個接口,能夠爲咱們提供一些功能
    p = PLE(game, fps=30, display_screen=False)
    # 初始化
    p.init()
    # 實例化Agent,將動做集傳進去
    agent = Agent(p.getActionSet())
    max_score = 0
    scores = deque(maxlen=10)

    for episode in range(episodes):
        # 重置遊戲
        p.reset_game()
        # 得到狀態
        state = agent.get_state(game.getGameState())
        if episode > 150:
            agent.update_greedy()
        while True:
            # 得到最佳動做
            action = agent.get_best_action(state)
            # 而後執行動做得到獎勵
            reward = agent.act(p, action)
            # 得到執行動做以後的狀態
            next_state = agent.get_state(game.getGameState())
            agent.add_memory((state, action, reward, next_state, p.game_over()))
            agent.train_model()
            state = next_state
            if p.game_over():
                # 得到當前分數
                current_score = p.score()
                max_score = max(max_score, current_score)
                scores.append(current_score)
                print('第%s次遊戲,得分爲: %s,最大得分爲: %s' % (episode, current_score, max_score))
                if len(scores) == 10 and np.mean(scores) > 150:
                    agent.model.save("bird_model.h5")
                    exit(0)
                break

該部分相比較於Mountain-Car須要更長的時間,目前的我尚未訓練出比較好的效果,截至寫完這篇博客,最新的數據以下所示:

emm,我又不想讓個人電腦一直開着,😔。

總結

上面的兩個例子即是DQN最基本最基本的使用,咱們還能夠將上面的FlappyBird的問題稍微複雜化一點,好比說咱們沒法直接的知道環境的狀態,咱們則可使用CNN網絡去從遊戲圖片入手(關於這種作法,網絡上有不少人寫了相對應的博客)。

項目地址:Github

參考

相關文章
相關標籤/搜索