【強化學習】用pandas 與 numpy 分別實現 q-learning, saras, saras(lambda)算法

本文做者:hhh5460html

本文地址:http://www.javashuo.com/article/p-tkoqyyqm-g.htmlgit

特別感謝:本文的三幅圖皆來自莫凡的教程 https://morvanzhou.github.io/github

 

pandas是基於numpy的,可是二者之間的操做有區別,故在實現上述算法時的細節有出入。故記錄之算法

幾點說明:編程

1). 爲了更好的說明問題,採用最簡單的例一app

2). 分離了環境與個體,採用類編程的形式。dom

3). 調整了環境與個體的變量、函數的位置,使得Agent徹底不須要改動函數

4). 個體與環境的互動邏輯更符合實際學習

 

〇、效果圖

 

1、pandas實現

1.q-learning

class RLQLearning(Agent):
    '''Agent的子類'''
    def __init__(self, env):
        super().__init__(env)
        
    def learn(self, alpha=0.01, gamma=0.9, episode=100, epsilon=0.4):
        '''學習'''
        print('q-learning算法')
        for _ in range(episode):
            s = self.env.reset()
            is_win = False
            while not is_win:
                a = self.observe(s, epsilon)
                r, s1, is_win = self.env.step(a)
                self.Q.ix[s, a] += alpha * (r + gamma * self.Q.ix[s1, self.env.get_valid_actions(s1)].max() - self.Q.ix[s, a])
                s = s1

 

2.saras

class RLSaras(Agent):
    '''Agent的子類'''
    def __init__(self, env):
        super().__init__(env)
        
    def learn(self, alpha=0.01, gamma=0.9, episode=100, epsilon=0.4):
        '''學習'''
        print('saras算法')
        for _ in range(episode):
            s = self.env.reset()
            a = self.observe(s, epsilon)
            is_win = False
            while not is_win:
                r, s1, is_win = self.env.step(a)
                a1 = self.observe(s1, epsilon)
                self.Q.ix[s, a] += alpha * (r + gamma * self.Q.ix[s1, a1] - self.Q.ix[s, a])
                s, a = s1, a1

 

3.saras(lambda)

class RLSarasLambda(Agent):
    '''Agent的子類'''
    def __init__(self, env):
        super().__init__(env)
        self.E = self.Q.copy() # 複製Q table
        
    def learn(self, alpha=0.01, gamma=0.9, lambda_=0.9, episode=100, epsilon=0.4):
        '''學習'''
        print('saras(lambda)算法,lambda_爲衰減值')
        for _ in range(episode):
            self.E *= 0
            s = self.env.reset()
            a = self.observe(s, epsilon)
            is_win = False
            while not is_win:
                r, s1, is_win = self.env.step(a)
                a1 = self.observe(s1, epsilon)
                delta = r + gamma * self.Q.ix[s1, a1] - self.Q.ix[s, a]
                #self.E.ix[s, a] += 1 # 效果不以下兩句
                self.E.ix[s] *= 0
                self.E.ix[s, a] = 1
                for s_ in self.env.states:
                    for a_ in self.env.actions:
                        self.Q.ix[s_, a_] += alpha * delta * self.E.ix[s_, a_]
                        self.E.ix[s_, a_] *= gamma * lambda_
                s, a = s1, a1

 

4.完整代碼

  1 import pandas as pd
  2 import random
  3 import time
  4 
  5 
  6 '''
  7 -o---T
  8 # T 就是寶藏的位置, o 是探索者的位置
  9 '''
 10 
 11 # 做者:hhh5460
 12 # 時間:20181221
 13 # 地點:Tai Zi Miao
 14 
 15 class Env(object):
 16     '''環境'''
 17     def __init__(self):
 18         '''初始化'''
 19         self.board = list('-----T')
 20         self.states = range(6)
 21         self.actions = ['left', 'right']
 22         self.rewards = [0,0,0,0,0,1]
 23         
 24     def get_valid_actions(self, state):
 25         '''取當前狀態下全部的合法動做'''
 26         valid_actions = []
 27         if state != 5:              # 除末狀態(位置),皆可向右
 28             valid_actions.append('right')
 29         if state != 0:              # 除首狀態(位置),皆可向左
 30             valid_actions.append('left')
 31         return valid_actions
 32         
 33     def _step(self, action):
 34         '''執行動做,到達新狀態'''
 35         if action == 'right' and self.state != self.states[-1]: # 除末狀態(位置),向右+1
 36             self.state += 1
 37         elif action == 'left' and self.state != self.states[0]: # 除首狀態(位置),向左-1
 38             self.state -= 1
 39         
 40     def reset(self):
 41         '''重置環境,返回狀態0'''
 42         self.board = list('-----T')
 43         self.state = 0
 44         self.board[self.state] = 'o'
 45         print('\r                  ', end='')
 46         print('\r{}'.format(''.join(self.board)), end='')
 47         return self.state
 48         
 49     def step(self, action, step_time=0.1):
 50         '''執行動做 返回獎勵、新狀態、勝利標誌'''
 51         self.board[self.state] = '-' # 擦除舊位置'o'
 52         self._step(action)           # 到達新位置
 53         self.board[self.state] = 'o' # 改變新位置
 54         
 55         reward = self.rewards[self.state] # 獎勵
 56         is_win = [False, True][self.state == self.states[-1]] # 勝利標誌
 57         if is_win == True:
 58             print('\r{}  WIN!'.format(''.join(self.board)), end='') # 勝利,則加特寫鏡頭
 59         else:
 60             print('\r{}'.format(''.join(self.board)), end='')
 61         time.sleep(step_time)
 62         
 63         return reward, self.state, is_win
 64 
 65 
 66 class Agent(object):
 67     '''智能體'''
 68     def __init__(self, env):
 69         '''初始化'''
 70         # 環境
 71         self.env = env
 72         # 大腦
 73         self.Q = pd.DataFrame(data=[[0 for _ in self.env.actions] for _ in self.env.states],
 74                                     index=self.env.states, 
 75                                     columns=self.env.actions)
 76     
 77     def observe(self, state, epsilon=0.4):
 78         '''觀察'''
 79         # 根據自身所處狀態,按某種策略選擇相應的動做
 80         if random.uniform(0,1) < epsilon:   # 貪婪
 81             s = self.Q.ix[state].filter(items=self.env.get_valid_actions(state))
 82             action = random.choice(s[s==s.max()].index) # 可能多個最大值!
 83         else:                               # 探索
 84             action = random.choice(self.env.get_valid_actions(state))
 85         return action
 86         
 87     def learn(self,*args, **kw):
 88         '''學習'''
 89         pass
 90         
 91     def play(self, step_time=0.5):
 92         '''玩耍'''
 93         # 學有所成
 94         s = self.env.reset()
 95         is_win = False
 96         while not is_win:
 97             a = self.observe(s, epsilon=1.) # 1.,100%貪婪,即利用
 98             _, s1, is_win = self.env.step(a, step_time)
 99             s = s1
100         print()
101     
102 class RLQLearning(Agent):
103     '''Agent的子類'''
104     def __init__(self, env):
105         super().__init__(env)
106         
107     def learn(self, alpha=0.01, gamma=0.9, episode=100, epsilon=0.4):
108         '''學習'''
109         print('q-learning算法')
110         for _ in range(episode):
111             s = self.env.reset()
112             is_win = False
113             while not is_win:
114                 a = self.observe(s, epsilon)
115                 r, s1, is_win = self.env.step(a)
116                 self.Q.ix[s, a] += alpha * (r + gamma * self.Q.ix[s1, self.env.get_valid_actions(s1)].max() - self.Q.ix[s, a])
117                 s = s1
118     
119 class RLSaras(Agent):
120     '''Agent的子類'''
121     def __init__(self, env):
122         super().__init__(env)
123         
124     def learn(self, alpha=0.01, gamma=0.9, episode=100, epsilon=0.4):
125         '''學習'''
126         print('saras算法')
127         for _ in range(episode):
128             s = self.env.reset()
129             a = self.observe(s, epsilon)
130             is_win = False
131             while not is_win:
132                 r, s1, is_win = self.env.step(a)
133                 a1 = self.observe(s1, epsilon)
134                 self.Q.ix[s, a] += alpha * (r + gamma * self.Q.ix[s1, a1] - self.Q.ix[s, a])
135                 s, a = s1, a1
136     
137 class RLSarasLambda(Agent):
138     '''Agent的子類'''
139     def __init__(self, env):
140         super().__init__(env)
141         self.E = self.Q.copy() # 複製Q table
142         
143     def learn(self, alpha=0.01, gamma=0.9, lambda_=0.9, episode=100, epsilon=0.4):
144         '''學習'''
145         print('saras(lambda)算法,lambda_爲衰減值')
146         for _ in range(episode):
147             self.E *= 0
148             s = self.env.reset()
149             a = self.observe(s, epsilon)
150             is_win = False
151             while not is_win:
152                 r, s1, is_win = self.env.step(a)
153                 a1 = self.observe(s1, epsilon)
154                 delta = r + gamma * self.Q.ix[s1, a1] - self.Q.ix[s, a]
155                 #self.E.ix[s, a] += 1 # 效果不以下兩句
156                 self.E.ix[s] *= 0
157                 self.E.ix[s, a] = 1
158                 for s_ in self.env.states:
159                     for a_ in self.env.actions:
160                         self.Q.ix[s_, a_] += alpha * delta * self.E.ix[s_, a_]
161                         self.E.ix[s_, a_] *= gamma * lambda_
162                 s, a = s1, a1
163 
164 
165 if __name__ == '__main__':
166     env = Env()         # 環境
167     
168     agent = RLQLearning(env)  # 個體
169     agent.learn(episode=13) # 先學
170     agent.play()            # 再玩
171     
172     agent2 = RLSaras(env)  # 個體2
173     agent2.learn(episode=13) # 先學
174     agent2.play()            # 再玩
175     
176     agent3 = RLSarasLambda(env)  # 個體3
177     agent3.learn(episode=13) # 先學
178     agent3.play()            # 再玩

 

 

2、numpy實現

1.q-learning

2.saras

3.saras(lambda)

4.完整代碼

  1 import numpy as np
  2 import time
  3 
  4 
  5 '''
  6 -o---T
  7 # T 就是寶藏的位置, o 是探索者的位置
  8 '''
  9 
 10 # 做者:hhh5460
 11 # 時間:20181221
 12 # 地點:Tai Zi Miao
 13 
 14 class Env(object):
 15     '''環境'''
 16     def __init__(self):
 17         '''初始化'''
 18         self.board = list('-----T')
 19         self.states = range(6)
 20         self.actions = ['left', 'right'] # 索引[0,1]
 21         self.rewards = [0,0,0,0,0,1]
 22         
 23     def get_valid_actions(self, state):
 24         '''取當前狀態下全部的合法動做(索引)'''
 25         valid_actions = []
 26         if state != self.states[0]:     # 除首狀態(位置),皆可向左
 27             valid_actions.append(self.actions.index('left'))
 28         if state != self.states[-1]:    # 除末狀態(位置),皆可向右
 29             valid_actions.append(self.actions.index('right'))
 30         return valid_actions
 31         
 32     def _step(self, action):
 33         '''執行動做(索引),到達新狀態'''
 34         if self.actions[action] == 'left' and self.state > self.states[0]:     # 除首狀態(位置),向左-1
 35             self.state = self.state - 1
 36         elif self.actions[action] == 'right' and self.state < self.states[-1]: # 除末狀態(位置),向右+1
 37             self.state = self.state + 1
 38         
 39     def reset(self):
 40         '''重置環境,返回狀態0'''
 41         self.board = list('-----T')
 42         self.state = 0
 43         self.board[self.state] = 'o'
 44         print('\r                  ', end='')
 45         print('\r{}'.format(''.join(self.board)), end='')
 46         return self.state
 47         
 48     def step(self, action, step_time=0.1):
 49         '''執行動做 返回獎勵、新狀態、勝利標誌'''
 50         self.board[self.state] = '-' # 擦除舊位置'o'
 51         self._step(action) # 到達新位置
 52         self.board[self.state] = 'o' # 改變新位置
 53         
 54         reward = self.rewards[self.state] # 獎勵
 55         is_win = [False, True][self.state == self.states[-1]] # 勝利標誌
 56         if is_win == True:
 57             print('\r{}  WIN!'.format(''.join(self.board)), end='') # 勝利,則加特寫鏡頭
 58         else:
 59             print('\r{}'.format(''.join(self.board)), end='')
 60         time.sleep(step_time)
 61         
 62         return reward, self.state, is_win
 63 
 64 
 65 class Agent(object):
 66     '''智能體'''
 67     def __init__(self, env):
 68         '''初始化'''
 69         # 環境
 70         self.env = env
 71         # 大腦
 72         self.Q = np.zeros((len(self.env.states), len(self.env.actions)), dtype=np.float32)
 73     
 74     def observe(self, state, epsilon=0.8):
 75         '''觀察'''
 76         # 根據自身所處狀態,按某種策略選擇相應的動做(索引)
 77         valid_actions = self.env.get_valid_actions(state)
 78         arr = self.Q[state, valid_actions]
 79         if (np.random.uniform() > epsilon 
 80             or arr.max() == 0
 81             or len(arr[arr==arr.max()]) > 1):
 82             action = np.random.choice(valid_actions) # 探索
 83         else:
 84             action = self.Q[state].argmax()          # 利用
 85         return action
 86     
 87     def learn(self, alpha=0.01, gamma=0.9, episode=100, epsilon=0.8):
 88         '''學習'''
 89         pass
 90     
 91     def play(self, step_time=0.5):
 92         '''玩耍'''
 93         # 學有所成
 94         s = self.env.reset()
 95         is_win = False
 96         while not is_win:
 97             a = self.observe(s, epsilon=1.) # 1.,100%貪婪,即利用
 98             _, s1, is_win = self.env.step(a, step_time)
 99             s = s1
100         print()
101 
102 class RLQLearning(Agent):
103     '''智能體'''
104     def __init__(self, env):
105         '''初始化'''
106         super().__init__(env)
107     
108     def learn(self, alpha=0.01, gamma=0.9, episode=100, epsilon=0.8):
109         '''學習'''
110         print('q-learning算法')
111         for _ in range(episode):
112             s = self.env.reset()
113             is_win = False
114             while not is_win:
115                 a = self.observe(s, epsilon)
116                 r, s1, is_win = self.env.step(a)
117                 self.Q[s, a] += alpha * (r + gamma * self.Q[s1, self.env.get_valid_actions(s1)].max() - self.Q[s, a])
118                 s = s1
119             
120 class RLSaras(Agent):
121     '''Agent的子類'''
122     def __init__(self, env):
123         super().__init__(env)
124         
125     def learn(self, alpha=0.01, gamma=0.9, episode=100, epsilon=0.4):
126         '''學習'''
127         print('saras算法')
128         for _ in range(episode):
129             s = self.env.reset()
130             a = self.observe(s, epsilon)
131             is_win = False
132             while not is_win:
133                 r, s1, is_win = self.env.step(a)
134                 a1 = self.observe(s1, epsilon)
135                 self.Q[s, a] += alpha * (r + gamma * self.Q[s1, a1] - self.Q[s, a])
136                 s, a = s1, a1
137     
138 class RLSarasLambda(Agent):
139     '''Agent的子類'''
140     def __init__(self, env):
141         super().__init__(env)
142         self.E = self.Q.copy() # 複製Q table
143         
144     def learn(self, alpha=0.01, gamma=0.9, lambda_=0.9, episode=100, epsilon=0.4):
145         '''學習'''
146         print('saras(lambda)算法,lambda_爲衰減值')
147         for _ in range(episode):
148             self.E *= 0
149             s = self.env.reset()
150             a = self.observe(s, epsilon)
151             is_win = False
152             while not is_win:
153                 r, s1, is_win = self.env.step(a)
154                 a1 = self.observe(s1, epsilon)
155                 delta = r + gamma * self.Q[s1, a1] - self.Q[s, a]
156                 #self.E.ix[s, a] += 1 # 效果不以下兩句
157                 self.E[s] *= 0
158                 self.E[s, a] = 1
159                 for s_ in self.env.states:
160                     for a_ in range(len(self.env.actions)): # 遍歷動做索引!!
161                         self.Q[s_, a_] += alpha * delta * self.E[s_, a_]
162                         self.E[s_, a_] *= gamma * lambda_
163                 s, a = s1, a1
164 
165 if __name__ == '__main__':
166     env = Env()         # 環境
167     agent = RLQLearning(env)  # 個體
168     agent.learn(episode=13) # 先學
169     agent.play()            # 再玩
170     
171     agent2 = RLSaras(env)  # 個體2
172     agent2.learn(episode=13) # 先學
173     agent2.play()            # 再玩
174     
175     agent3 = RLSarasLambda(env)  # 個體3
176     agent3.learn(episode=13) # 先學
177     agent3.play()            # 再玩
相關文章
相關標籤/搜索