Jul/30/2023 updated by Yoshihisa Nitta
Feb/03/2023 written by Yoshihisa Nitta
Google Colab 対応コード
# by nitta
import os
is_colab = 'google.colab' in str(get_ipython()) # for Google Colab
if is_colab:
from google.colab import drive
drive.mount('/content/drive')
SAVE_PREFIX='/content/drive/MyDrive/DeepLearning/openai_gym_rt'
else:
SAVE_PREFIX='.'
# Check if running on Colab
#is_colab = 'google.colab' in str(get_ipython()) # for Google Colab
# packages
if is_colab:
!apt update -qq
!apt upgrade -qq
!apt install -qq xvfb
!pip -q install pyvirtualdisplay
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib
matplotlib.rcParams['animation.embed_limit'] = 2**32
# Show multiple images as animation
# Colab compatible
%matplotlib notebook
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import animation
from IPython import display
import os
import gym
def display_frames_as_anim(frames, filepath=None):
"""
Displays a list of frames as a gif, with controls
"""
H, W, _ = frames[0].shape
fig, ax = plt.subplots(1, 1, figsize=(W/100.0, H/100.0))
ax.axis('off')
patch = plt.imshow(frames[0])
def animate(i):
display.clear_output(wait=True)
patch.set_data(frames[i])
return patch
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50, repeat=False)
if not filepath is None:
dpath, fname = os.path.split(filepath)
if dpath != '' and not os.path.exists(dpath):
os.makedirs(dpath)
anim.save(filepath)
if is_colab:
display.display(display.HTML(anim.to_jshtml()))
#plt.close()
else:
plt.show()
import gym
ENV = 'CartPole-v1'
# ランダムに行動する
np.random.seed(12345)
env = gym.make(ENV, new_step_api=True, render_mode='rgb_array') # new_step_api, render_mode
observation = env.reset()
frames = [ env.render()[0] ]
states = [ observation ]
for step in range(100):
action = env.action_space.sample() ## np.random.choice(2) # 0: left, 1: right ##
obserbation, reward, done, truncated, info = env.step(action) # when new_step_api==True, two bool (done, truncated) returned
frames.append(env.render()[0]) # env.render(mode='rgb_array') -> env.render()[0]
states.append(observation)
if done:
break
print(len(states))
print(states)
# アニメーション表示する
%matplotlib notebook
import matplotlib.pyplot as plt
#display_frames_as_anim(frames) # display now
display_frames_as_anim(frames, 'cartpole_video4a.mp4') # save to file
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp cartpole_video4a.mp4 {SAVE_PREFIX} # copy to the Google Drive
ディープラーニングを使わない従来の強化学習として3通りの手法があった。
このうち、Q学習における行動価値関数 $Q(s,a)$ をディープ・ニューラルネットワークで実現する DQN が提案された。
Prioritized Experience Replay は、DQN や DDQN における "Experience Replay" を工夫した手法である。学習に使用する transition をランダムに選択するのではなく、優先順位によって選択する。 教師信号との差である $| R_{t+1} + \gamma \max_{a} Q_t(S_{t+1}, a) - Q(S_t, a_t) |$ が大きい場合は、その行動価値関数 $Q(S_t, a_t)$ に対して学習が進んでいないことになるので、優先的に学習する。
# CartPole で観測した状態変数の値を名前をつけて保存するために namedtuple を使う
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
# 定数
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500
# Replay Memory
import random
class ReplayMemory:
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.index = 0
def push(self, state, action, state_next, reward):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = Transition(state, action, state_next, reward)
self.index = (self.index + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
TD 誤差を記録するためのクラス TDerrorMemory を実装する。
基本的には ReplayMemory クラスと同様だが、関数 get_prioritized_indexes
と関数 update_td_error
を用意する。
get_prioritized_indexes()
...
メモリに格納されている TD 誤差の大きさに応じて確率的に index を求める。
ただし、TD 誤差の絶対値を求める際に微小値 TD_ERROR_EPSILON を加算している。
update_td_error()
...
メモリに格納されている TD 誤差を更新する。
# p.163
TD_ERROR_EPSILON = 0.0001 # error to add to the bias
class TDerrorMemory:
def __init__(self, CAPACITY):
self.capacity = CAPACITY
self.memory = []
self.index = 0
def push(self, td_error):
'''
TD 誤差をメモリに保存する
'''
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = td_error
self.index = (self.index + 1) % self.capacity
def __len__(self):
return len(self.memory)
def get_prioritized_indexes(self, batch_size):
'''
TD 誤差に応じた確率で index を取得する
'''
# TD 誤差の和を計算する
sum_absolute_td_error = np.sum(np.absolute(self.memory))
sum_absolute_td_error += TD_ERROR_EPSILON * len(self.memory)
# batch_size 分の乱数を生成して、昇順に並べる
rand_list = np.random.uniform(0, sum_absolute_td_error, batch_size)
rand_list = np.sort(rand_list)
# 作成した乱数で串刺しにして、インデックスを求める
indexes = []
idx = 0
tmp_sum_absolute_td_error = 0
for rand_num in rand_list:
while tmp_sum_absolute_td_error < rand_num:
tmp_sum_absolute_td_error += abs(self.memory[idx]) + TD_ERROR_EPSILON
idx += 1
# 微小値を計算に使用したため、index がメモリの長さを超えた場合の補正
if idx >= len(self.memory):
idx = len(self.memory) - 1
indexes.append(idx)
return indexes
# p.159
import torch
class Net(torch.nn.Module):
def __init__(self, n_in, n_mid, n_out):
super().__init__()
self.fc1 = torch.nn.Linear(n_in, n_mid)
self.fc2 = torch.nn.Linear(n_mid, n_mid)
##################
# Dueling Network
##################
self.fc3_adv = torch.nn.Linear(n_mid, n_out) # Dueling Network
self.fc3_v = torch.nn.Linear(n_mid, 1) # 価値 V
def forward(self, x):
h1 = torch.nn.functional.relu(self.fc1(x))
h2 = torch.nn.functional.relu(self.fc2(h1))
################
# Dueling Network (どちらも ReLU しないことに注意)
################
adv = self.fc3_adv(h2) # shape (minibatch, 2) , adv.size(2)==2
val = self.fc3_v(h2).expand(-1, adv.size(1)) # shapeを変換しておく (minibatch, 1)-->(minibatch, 2)
# val+adv から advの平均を引く
output = val + adv - adv.mean(dim=1, keepdim=True).expand(-1, adv.size(1))
return output # shape (minibatch, 2)
関数 replay()
を Prioritized Experience Replay に変更するが、
学習の初期段階(episolde < 30)ではまだ学習が進んでいないので従来通り乱数で選択する。
このため、引数に変数 episode
を追加している。
関数 make_minibatch()
にも引数に変数 episode
を追加している。
関数 update_td_error_memory()
では、メモリオブジェクトに保存された全 trainsition の TD 誤差を再計算する。
PyTorch で計算した結果は Tensor 型なので、一旦 Numpy のデータ型にしてから、Python の list 型に変換する。
# p.164
BATCH_SIZE = 32
CAPACITY = 10000
class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions
self.memory = ReplayMemory(CAPACITY) # 経験を記憶するメモリ
n_in, n_mid, n_out = num_states, 32, num_actions # main と target という2つのネットワークを生成する
self.main_q_network = Net(n_in, n_mid, n_out)
self.target_q_network = Net(n_in, n_mid, n_out)
self.optimizer = torch.optim.Adam( # 最適化
self.main_q_network.parameters(),
lr = 0.0001
)
##############################
# Prioritzed Experience Replay
##############################
self.td_error_memory = TDerrorMemory(CAPACITY) # !!! TD誤差のメモリオブジェクトを生成する
### End of change ###
def replay(self, episode): # !!! added episode argument
'''
Experience Replay で学習する
'''
# 1. メモリサイズを確認する
if len(self.memory) < BATCH_SIZE:
return
# 2. mini-batch を作成する
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch(episode) # !!! argument added
# 3. 教師信号となる Q(s_t, a_t) を求める
self.expected_state_action_values = self.get_expected_state_action_values()
# 4. パラメータを更新する
self.update_main_q_network()
def decide_action(self, state, episode):
# ε-greedy 法で徐々に最適行動の選択を増やす
epsilon = 0.5 * (1 / (episode + 1))
if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # 推論モードに切り替える
with torch.no_grad():
# torch.max(dim=1) は最大値とインデックスのtupleが返される
# インデックスにアクセスするのは [1] or .index
# .view(1, 1) により [torch.LongTensor(1,1)] にreshapeされる
action = self.main_q_network(state).max(dim=1).indices.view(1, 1)
else:
action = torch.LongTensor(
[[ random.randrange(self.num_actions) ]] # 0 or 1 の乱数
)
return action
def make_minibatch(self, episode): # !!! argument episode added
'''
2. mini-batch を作成する
'''
# 2.1 メモリから mini-batch 分のデータを取り出す
###############################
# Prioritized Experience Replay
###############################
if episode < 30:
transitions = self.memory.sample(BATCH_SIZE)
else:
# TD 誤差に応じてミニバッチを取り出す
indexes = self.td_error_memory.get_prioritized_indexes(BATCH_SIZE)
transitions = [ self.memory.memory[n] for n in indexes ]
# transitions = self.memory.sample(BATCH_SIZE)
### End of change ###
# 2.2 各変数を mini-batch に対応する形へ変形する
# trainsition = [ (state, action, state_next, reward) ...]
# これを変形して次の形式にする
# ( [state, ...], [action, ...], [state_next, ...], [reward, ...])
batch = Transition(*zip(*transitions))
# 2.3 'Transitions' named tuple から項目を取り出し、
# concatenate して torch.FloatTensor size(BATCH_SIZE, 1) に変換する
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
return batch, state_batch, action_batch, reward_batch, non_final_next_states
def get_expected_state_action_values(self):
'''
3. Q(s_t, a_t) を求める
'''
# 3.1 推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()
# 3.2 Q(s_t, a_t) を求める。
# self.model(state_batch) は各action(左 or 右)に対するQ値を全部返すので
# 返り値は "torch.FloatTensor size (BATCH_SIZE, 2)"
# 実行した action a_t に対応する Q 値を gather(dim, index) で引っ張り出す。
self.state_action_values = self.main_q_network(
self.state_batch
).gather(1, self.action_batch)
# 3.3 次の状態がある index のmax{Q(s_{t+1}, a)} 値を求める。
# cartpole が done になっておらず、next_state があるかをチェックするインデックスマスクを作成する
non_final_mask = torch.ByteTensor(
tuple(map(lambda s: s is not None, self.batch.next_state))
)
next_state_values = torch.zeros(BATCH_SIZE) # initial value is 0
a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
# 次の状態での最大の行動 a_m を main_q_network から求める。
# 出力にアクセスし、max(dim=1)で列方向の最大値の [値, index] を求める
# (注意) PyTorch で Tensor 配列の最大値を求めるには torch.max(dim=n) を使う
# dimを指定した場合は値とインデックスのtupleが返されるので
# [1] or .indices としてそのインデックスの値 (index=1 ) を出力する
# detach でその値を取り出す
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states
).detach().max(dim=1).indices
# 次の状態のあるものだけフィルターし、size 32 を (32, 1) へ変形する
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
# 次の状態がある index の、行動 a_m の Q 値を target Q-Network から求める
# detach() で取り出す。
# squeeze で (minibatch, 1) を (minibatch,) へ
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states
).gather(1, a_m_non_final_next_states).detach().squeeze()
# 3.4 教師となる Q(s_t, a_t) 値を、Q学習の式から求める。
expected_state_action_values = self.reward_batch + GAMMA * next_state_values
return expected_state_action_values
def update_main_q_network(self):
'''
4. パラメータの更新
'''
# 4.1 訓練モードに切り替える
self.main_q_network.train()
# 4.2 損失関数 (smooth_l1_loss は Huberloss)
# expected_state_action_values は形状が (minibatch,)なので、
# unsqueezeで形状を (minibatch, 1) へ
loss = torch.nn.functional.smooth_l1_loss(
self.state_action_values,
self.expected_state_action_values.unsqueeze(1)
)
# 4.3 パラメータを更新する
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def update_target_q_network(self):
'''
target_q_network を main_q_network と同じにする
'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())
###############################
# Prioritized Experience Replay
###############################
def update_td_error_memory(self):
'''
TD 誤差メモリに格納されている TD 誤差を更新する
'''
self.main_q_network.eval()
self.target_q_network.eval()
# 全メモリでミニバッチを作成する
transitions = self.memory.memory
batch = Transition(*zip(*transitions))
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
# ネットワークが出力する Q(s_t, a_t)
state_action_values = self.main_q_network(
state_batch
).gather(1, action_batch)
# next_state があるかをチェックするインデックスマスク
non_final_mask = torch.ByteTensor(
tuple(map(lambda s: s is not None, batch.next_state))
)
next_state_values = torch.zeros(len(self.memory))
a_m = torch.zeros(len(self.memory)).type(torch.LongTensor)
# 次の状態での最大Q値の行動 a_m を Main Q-Network から求める
a_m[non_final_mask] = self.main_q_network(
non_final_next_states
).detach().max(dim=1).indices
# 次の状態があるものだけにフィルターし、shape (32,) --> (32, 1)
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
#次の状態がある index の、行動 a_m の Q 値を target Q-Network から求める
next_state_values[non_final_mask] = self.target_q_network(
non_final_next_states
).gather(1, a_m_non_final_next_states).detach().squeeze()
# TD 誤差を求める
td_errors = (reward_batch + GAMMA * next_state_values) - state_action_values.squeeze()
# TD 誤差メモリを更新する
self.td_error_memory.memory = td_errors.detach().numpy().tolist()
Agent クラスに、関数 memorize_td_error()
と関数 update_td_error_memory()
を追加する。
関数 memorize_td_error()
はその step での TD 誤差を格納する。
関数 update_td_error_memory()
は、各試行の最後に実行され、TD誤差を更新する。
Brain クラスの関数 replay()
の引数に episode
を追加したので、関数 update_q_function()
の引数にも追加する。
# p.170
class Agent:
def __init__(self, num_states, num_actions):
self.brain = Brain(num_states, num_actions)
def update_q_function(self, episode):
self.brain.replay(episode)
def get_action(self, state, episode):
action = self.brain.decide_action(state, episode)
return action
def memorize(self, state, action, state_next, reward):
self.brain.memory.push(state, action, state_next, reward)
def update_target_q_function(self):
self.brain.update_target_q_network()
###############################
# Prioritized Experience Replay
###############################
def memorize_td_error(self, td_error):
self.brain.td_error_memory.push(td_error)
def update_td_error_memory(self):
self.brain.update_td_error_memory()
Environment クラスは、関数 run()
の内容を 3 箇所変更する。
各ステップでの TD 誤差を TD 誤差メモリに追加する。 ただし、今回は 0 を保存している。 各試行の終了時に TD 誤差メモリの中身を更新するので、そのタイミングで正しい値が格納される。
Q Network の更新関数で、episode
変数を追加している。
各試行の最後で TD 誤差メモリの内容を更新する。
# p.171
# [自分へのメモ] env.step() の返り値の変更、env.render() の引数と返り値の変更に対応した
# run() の中で動画(frames)を表示せず、frames を返すように変更した
class Environment:
def __init__(self):
self.env = gym.make(ENV, new_step_api=True, render_mode='rgb_array')
self.num_states = self.env.observation_space.shape[0]
self.num_actions = self.env.action_space.n
self.agent = Agent(self.num_states, self.num_actions)
def run(self):
episode_10_list = np.zeros(10) # 10 試行分の立ち続けた step 数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195 steps 以上立ち続けた試行数
episode_final = False # 最後の試行であるか
frames = [] # 最後の試行を動画にするための配列
for episode in range(NUM_EPISODES):
observation = self.env.reset()
state = observation # 観測をそのまま状態 s として使用する
state = torch.from_numpy(state).type(torch.FloatTensor)
state = torch.unsqueeze(state, dim=0) # FloatTensor size(4) -> size(1,4)
for step in range(MAX_STEPS):
if episode_final is True:
frames.append(self.env.render()[0])
action = self.agent.get_action(state, episode)
# 行動 a_t の実行により、s_{t+1} と done フラグを求める。
# (注) new_api=True なのでtruncated も見るべし。
observation_next, _, done, truncated, _ = self.env.step(action.item())
if done or truncated:
state_next = None # 次の状態は無い
episode_10_list = np.hstack((episode_10_list[1:], step+1))
if step < 195:
reward = torch.FloatTensor([-1.0])
complete_episodes = 0
else:
reward = torch.FloatTensor([1.0])
complete_episodes = complete_episodes + 1
elif step == MAX_STEPS - 1:
state_next = None
episode_10_list = np.hstack((episode_10_list[1:], step+1))
reward = torch.FloatTensor([1.0])
complete_episodes += 1
truncated = True
else:
reward = torch.FloatTensor([0.0])
state_next = observation_next
state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
# FloatTensor size (4) --> (1, 4)
state_next = torch.unsqueeze(state_next, dim=0)
# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)
###############################
# Prioritized Experience Replay
###############################
self.agent.memorize_td_error(0) # 本当はTD誤差だが、一旦0としておく
###########################################
# Prioritized Experience Replay で Q 関数を更新する
############################################
self.agent.update_q_function(episode)
# 観測の更新
state = state_next
# 終了時の処理
if done or truncated:
print(f'{episode} Episode: Finished after {step+1} steps: average steps = {episode_10_list.mean(): .1f}')
################################
# Prioritiezed Experience Replay
################################
self.agent.update_td_error_memory()
#########################
# Double-DQN により追加
#########################
# 2試行に一度 target_q_network を main_q_network と同じにする
if (episode % 2 == 0):
self.agent.update_target_q_function()
break
if episode_final is True:
# display_frames_as_gif(frames)
break
# 10 連続で 200 steps 立ち続けたら成功
if complete_episodes >= 10:
print('10 consecutive success')
episode_final = True
return frames
# main クラス
cartpole_env = Environment()
frames = cartpole_env.run()
# アニメーション表示する
%matplotlib notebook
import matplotlib.pyplot as plt
#display_frames_as_anim(frames) # display now
display_frames_as_anim(frames, 'cartpole_video4b.mp4') # save to file
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp cartpole_video4b.mp4 {SAVE_PREFIX} # copy to the Google Drive