Jul/29/2023 by Yoshihisa Nitta
Mar/24/2024 by Yoshihisa Nitta
Google Colab 対応コード
[本ノートブックの実行に関する注意点]
LOAD_MODEL
を False
に設定してノートブック全体を実行すること。学習したマリオのモデルが Google Drive 上に保存される。LOAD_MODEL
を True
に設定してノートブック全体を何度も実行すること。以前に訓練したマリオのモデルをロードして追加学習し、Google Drive 上のモデルを更新する。
LOAD_MODEL = True # Set to *False* for initial training and *True* for additional training.
このページは、以下の URL に示す "PyTorch の公式チュートリアルの強化学習(Mario)" の Web ページの内容を元に、nitta が自分で改変したものである。
Train A Mario-Playing RL Agent Authors: Yuansong Feng, Suraj Subramanian, Howard Wang, Steven Guo.https://pytorch.org/tutorials/intermediate/mario_rl_tutorial.html
モデルの保存 (checkpoint) では、項目が1変数 (mario.curr_step) 増えている。 元のページの github から checkpoint ファイルをダウンロードして使う場合は、注意すること。
# by nitta
import os
is_colab = 'google.colab' in str(get_ipython()) # for Google Colab
if is_colab:
from google.colab import drive
drive.mount('/content/drive')
SAVE_PREFIX='/content/drive/MyDrive/PyTorch/ReinforcementLearning'
else:
SAVE_PREFIX='.'
# packages
if is_colab:
! apt update -qq
! apt upgrade -qq
! apt install -qq xvfb
! pip -q install pyvirtualdisplay
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# by nitta
# Show multiple images as an animation (Colab compatible)
%matplotlib notebook
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import animation
from IPython import display
import os
import matplotlib
matplotlib.rcParams['animation.embed_limit'] = 2**128
def display_frames_as_anim(frames, filepath=None, html5=False):
"""
Displays a list of frames as a gif, with controls
"""
H, W, _ = frames[0].shape
fig, ax = plt.subplots(1, 1, figsize=(W/100.0, H/100.0))
ax.axis('off')
patch = plt.imshow(frames[0])
def animate(i):
display.clear_output(wait=True)
patch.set_data(frames[i])
return patch
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=30, repeat=False)
if not filepath is None:
dpath, fname = os.path.split(filepath)
if dpath != '' and not os.path.exists(dpath):
os.makedirs(dpath)
anim.save(filepath)
if is_colab:
if html5:
display.display(display.HTML(anim.to_html5_video()))
else:
display.display(display.HTML(anim.to_jshtml()))
#plt.close()
else:
plt.show()
# マリオゲームの使用可能なバージョンを調べる
# エラーが表示されるが、気にしない事。
! pip install gym-super-mario-bros==
! pip install gym-super-mario-bros==7.4.0
# PyTorch の強化学習用ライブラリのバージョンを調べる
# エラーが表示されるが、気にしない事。
! pip install tensordict==
! pip install tensordict==0.3.1
# PyTorch の強化学習用ライブラリのバージョンを調べる
# エラーが表示されるが、気にしない事。
! pip install torchrl==
!pip install torchrl==0.3.1
import gym
import gym_super_mario_bros
ENV = 'SuperMarioBros-1-1-v0'
#ENV = 'SuperMarioBros-1-1-v3'
import torch
import torchvision
import PIL
import pathlib
import numpy as np
import collections
import random, datetime, os, copy
print(gym.__version__)
# 本来のマリオゲームにおけるobservation は、画面そのもの(numpy.ndarray)。
# 強化学習時の observation は、以前は「画面そのもの」だったが、Mar/25/2024時点では LazyFrames クラスのオブジェクトに変更されている
# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace
from gym_super_mario_bros.actions import RIGHT_ONLY
if gym.__version__ < '0.26':
env = gym_super_mario_bros.make(ENV, new_step_api=True)
else:
env = gym_super_mario_bros.make(ENV, render_mode='rgb', apply_api_compatibility=True)
# action-space
# 0: walk right
# 1: jump right
#env = JoypadSpace(env, [["right"], ["right", "A"]])
env = JoypadSpace(env, RIGHT_ONLY)
observation = env.reset()
print(type(observation))
print(observation.shape)
print(env.action_space)
np.random.seed(12345)
observation = env.reset()
frames = [observation.copy()] # [注意] copy() してから保存しないと、全部同じ画像になってしまう
for _ in range(100):
action = env.action_space.sample() # np.random.choice(2) # 0: walk left, 1: jump
observation, reward, done, trunc, info = env.step(action) # needs action from DNN
frames.append(observation.copy())
if done or trunc or info["flag_get"]:
break;
print(len(frames))
# アニメーション表示 by nitta
display_frames_as_anim(frames, 'mario_video0.mp4')
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp mario_video0.mp4 {SAVE_PREFIX} # copy to the Google Drive
環境 の次の状態は next_state
変数でエージェントに返される。
各状態は [3, 240, 250] サイズの配列で表現されている。
状態 は、
パイプの色や空の色など、マリオの行動に関係ない情報も含んでいる。
環境から返されたデータを エージェント に渡す前に Wrapper が前処理を行う。
GrayScaleObservation は、RGB画像をグレースケール画像へ変換するときによく利用される Wrapper である。 必要な情報を失うこと無しに、状態表現のサイズを減らすことができる。 これにより、各状態 のサイズは [1, 240, 256] になる。
ResizeObservation は各観測を正方形画像へとダウンサンプルし、新しいサイズ [1, 84, 84] に変換する。
SkipFrame は独自のラッパーで、gym.Wrapper
を継承して step()
関数を実装している。
連続したフレームは変化があまりないので、多くの情報を失うことなく n-中間フレームをスキップできる。n番目のフレームはスキップされた各フレームの報酬を累積した報酬和を得る。
FrameStack は連続したフレームを一つの観測時点にまとめて学習モデルに与えることができるラッパーである。 事前の数フレームでの動作方向に基づいて、 マリオがジャンプしているのか着地しているのかを 識別できる。
(H,W,3) x 4 ---> (1, H, W) x 4 --> (1, h, w) x 4 --> (4, h, w) H = 240, W = 256 h = w = 84
class SkipFrame(gym.Wrapper):
def __init__(self, env, skip):
""" skip 枚スキップした後のフレームだけを返す """
super().__init__(env)
self._skip = skip
self.original_observations = [] # added by nitta
def step(self, action):
""" 行動を繰り返し、報酬和を求める """
total_reward = 0.0
self.original_observations = [] # added by nitta
for i in range(self._skip):
# 報酬を累積し、同じ行動を繰り返す
obs, reward, done, trunc, info = self.env.step(action)
self.original_observations.append(obs) # added by nitta
total_reward += reward
if done or trunc:
break
return obs, total_reward, done, trunc, info
def get_observations(self): # added by nitta
""" step で skip 枚の画面をまとめて処理しているが、元の画面の配列を返す。 """
return self.original_observations;
class GrayScaleObservation(gym.ObservationWrapper):
def __init__(self, env):
super().__init__(env)
obs_shape = self.observation_space.shape[:2]
self.observation_space = gym.spaces.Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def permute_orientation(self, observation):
# (H, W, C) array --> (C, H, W) tensor に変換する
observation = np.transpose(observation, (2, 0, 1))
observation = torch.tensor(observation.copy(), dtype=torch.float)
return observation
def observation(self, observation):
observation = self.permute_orientation(observation)
transform = torchvision.transforms.Grayscale()
observation = transform(observation)
return observation
class ResizeObservation(gym.ObservationWrapper):
def __init__(self, env, shape):
super().__init__(env)
if isinstance(shape, int):
self.shape = (shape, shape)
else:
self.shape = tuple(shape)
obs_shape = self.shape + self.observation_space.shape[2:]
self.observation_space = gym.spaces.Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def observation(self, observation):
transforms = torchvision.transforms.Compose([
torchvision.transforms.Resize(self.shape, antialias=True),
torchvision.transforms.Normalize(0, 255)
])
observation = transforms(observation).squeeze(0)
return observation
上の Wrapper を環境に適用すると、最終的にラップされた状態は、4枚の連続するグレースケールフレームがひとつに積み重ねた状態から構成されている。
マリオがアクションする度に、環境はこの構造の状態で応答する。 その構造は [4, 84, 84] のサイズの3次元配列で表現される。
if gym.__version__ < '0.26':
env = gym_super_mario_bros.make(ENV, new_step_api=True)
else:
env = gym_super_mario_bros.make(ENV, render_mode='rgb', apply_api_compatibility=True)
# action-space
# 0: walk right
# 1: jump right
#env = JoypadSpace(env, [["right"], ["right", "A"]])
env = JoypadSpace(env, RIGHT_ONLY)
env = SkipFrame(env, skip=4) # 1つのstep()関数で複数の step() を呼び出す
env = GrayScaleObservation(env) # グレースケール画像に変更する
env = ResizeObservation(env, shape=84) # 画面サイズを変更する
if gym.__version__ < '0.26':
env = gym.wrappers.FrameStack(env, num_stack=4, new_step_api=True)
else:
env = gym.wrappers.FrameStack(env, num_stack=4) # 連続したフレームを1つにまとめる
このゲームのエージェントを表現する Mario クラスを作成した。
Mario は以下のことができる。
act()
... 方策にしたがい、環境の現在の状態に応じた行動を選択し、実行する。Remember
... 経験を記憶する。
経験は (current_state, current_action, reward, next_state) である。
Marioは action policy を更新するために、経験を記憶 ( cache
)し、後で振り返る( recall
)learn()
... より良い方策を学ぶ。class Mario:
def __init__():
pass
def act(self, state):
"状態が与えられると、ε-greedy 法にしたがって、行動を選択する"
pass
def cache(self, experience):
""" 経験(experience) を記憶に加える """
pass
def recall(self):
"記憶から経験のサンプリングを行う"
pass
def learn(self):
"経験データのバッチを用いて、オンライン行動価値 Q 関数を更新する"
pass
各状態において、次の2通りの方法から行動を選択する。
self.exploration_rate
の値に基づく確率で、explore
を選択し、ランダムに行動を選択する。exploit
を選択した場合は、最適な行動を提供する MariNet
に基づいて行動する。
# Act
class Mario:
def __init__(self, state_dim, action_dim, save_dir):
self.state_dim = state_dim
self.action_dim = action_dim
self.save_dir = save_dir
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# 最適な行動を予測する Mario の DNN (Learn セクションで定義する)
self.net = MarioNet(self.state_dim, self.action_dim).float() # define in Train
self.net = self.net.to(device=self.device)
self.exploration_rate = 1
self.exploration_rate_decay = 0.99999975
self.exploration_rate_min = 0.1
self.curr_step = 0
self.base_step = 0 # added by nitta
self.save_every = 5e5 # Mario Net を保存する経験数 (save interval)
def act(self, state):
"""
与えられた状態において、ε-greedy 法で行動を選択し、ステップ値を更新する。
入力:
state (LazyFrame) : 現在の状態に対する1つの観測。state_dim 次元。
Return:
action_idx (int): Marioが実行した行動のインデックス値
"""
if np.random.rand() < self.exploration_rate: # explore
action_idx = np.random.randint(self.action_dim)
else: # exploit
state = state[0].__array__() if isinstance(state,tuple) else state.__array__()
state = torch.tensor(state, device=self.device).unsqueeze(0)
action_values = self.net(state, model="online")
action_idx = torch.argmax(action_values, axis=1).item()
# exploration_rate を減じる
self.exploration_rate *= self.exploration_rate_decay
self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
# step 数を増やす
self.curr_step += 1
return action_idx
cache()
... 行動を選択する度に、その経験(state, next_state, action, reward, done)を記憶する。recall()
... メモリからランダムに過去の経験をバッチサイズ個サンプリングし、学習する。# Cache and Recall
import torchrl
import tensordict
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
#self.memory = collections.deque(maxlen=100000)
self.memory = torchrl.data.TensorDictReplayBuffer(
storage=torchrl.data.LazyMemmapStorage(
100000,
device=torch.device("cpu")
)
)
self.batch_size = 32
def cache(self, state, next_state, action, reward, done):
"""
self.memory (リプレイバッファ)に経験を保存する
Inputs:
state (LazyFrame)
next_state (LazyFrame)
action (int)
reward (float)
done (bool)
"""
def first_if_tuple(x):
return x[0] if isinstance(x, tuple) else x
state = first_if_tuple(state).__array__()
next_state = first_if_tuple(next_state).__array__()
state = torch.tensor(state)
next_state = torch.tensor(next_state)
action = torch.tensor([action])
reward = torch.tensor([reward])
done = torch.tensor([done])
# self.memory.append([state, next_state, action, reward, done,])
self.memory.add(
tensordict.TensorDict({
'state': state,
'next_state': next_state,
'action': action,
'reward': reward,
"done" :done
}, batch_size=[])
)
def recall(self):
"""
メモリから経験のバッチを取り出す
"""
batch = self.memory.sample(self.batch_size).to(self.device)
state, next_state, action, reward, done = (batch.get(key) for key in ('state', 'next_state', 'action', 'reward', 'done'))
return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()
Mario は内部で DDQN アルゴリズムを用いる。
DDQN は $Q_{online}$ と $Q_{target}$ という2つの畳み込みネットワーク (ConvNets) を使用する。これらはそれぞれ独立して最適行動値関数を近似する。
本実装では、$Q_{online}$ と $Q_{target}$ では同じ特徴生成器を使うが、全結合層 (FC, Fully Connected) 分類器としては別々に更新される。 $Q_{target}$ のパラメータである $\theta_{target}$ は逆伝播時には、更新されないように固定される。 その代わり、 定期的に $\theta_{online}$ と同期される。
# Learn (Train)
# Neural Network
class MarioNet(torch.nn.Module):
"""
単純な CNN 構造。
入力 -> (conv2d + relu) * 3 -> flatten -> (dense + relu) * 2 -> 出力
"""
def __init__(self, input_dim, output_dim):
super().__init__()
c, h, w = input_dim
if h != 84:
raise ValueError(f'Expecting input height: 84, but got {h}')
if w != 84:
raise ValueError(f'Expecting input width: 84, but got {w}')
self.online = self.__build_cnn(c, output_dim)
self.target = self.__build_cnn(c, output_dim)
self.target.load_state_dict(self.online.state_dict())
# Q_target parameters are frozen.
for p in self.target.parameters():
p.requires_grad = False
def forward(self, input, model):
if model == 'online':
return self.online(input)
elif model == 'target':
return self.target(input)
def __build_cnn(self, c, output_dim):
return torch.nn.Sequential(
torch.nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
torch.nn.ReLU(),
torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
torch.nn.ReLU(),
torch.nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
torch.nn.ReLU(),
torch.nn.Flatten(),
torch.nn.Linear(3136, 512),
torch.nn.ReLU(),
torch.nn.Linear(512, output_dim),
)
学習時には2つの値が使われる。(TD = Temporal Difference)
現在の報酬と、次の状態 $s'$ における推測された $Q^{*}$ の合計は
$\quad a' = \mbox{argmax}_{a} Q_{online}(s',a)$
$\quad \displaystyle TD_{t} = r + \gamma ~ Q^{*}_{target}(s',a')$
次の行動 $a'$ はわからないので、次状態 $s'$ において $Q_{online}$ を最大化する行動 $a'$ を使う。
ここで勾配計算を無効にするために、
td_target()
に対して
@torch_grad()
修飾子を使うことに注意する必要がある。
($\theta_{target}$ に対して逆伝播する必要がないので)
# TD Estimate and TD Target
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.gamma = 0.9
def td_estimate(self, state, action):
current_Q = self.net(state, model='online')[
np.arange(0, self.batch_size),
action
] # Q_online(s, a)
return current_Q
@torch.no_grad()
def td_target(self, reward, next_state, done):
next_state_Q = self.net(next_state, model='online')
best_action = torch.argmax(next_state_Q, axis=1)
next_Q = self.net(next_state, model='target')[
np.arange(0, self.batch_size),
best_action
]
return (reward + (1 - done.float()) * self.gamma * next_Q).float()
$\alpha$ はオプティマイザに渡される学習率 $lr$ である。
$\quad \theta_{online} \leftarrow \theta_{online} + \alpha ~ \nabla (TD_{\epsilon} - TD_{t})$
$\theta_{target}$ は逆伝播の間は更新されない。 その代わり、定期的に$\theta_{online}$ を $\theta_{target}$ にコピーする。
$\quad \theta_{target} \leftarrow \theta_{online}$
# Update the model
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
self.loss_fn = torch.nn.SmoothL1Loss()
def update_Q_online(self, td_estimate, td_target):
loss = self.loss_fn(td_estimate, td_target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def sync_Q_target(self):
self.net.target.load_state_dict(self.net.online.state_dict())
元原稿の save()
関数は、MarioNet
のモデルと、exploration_rate
変数だけを保存するものであった。
以下のコードの save()
関数と load()
関数では、curr_step
変数も記録するように変更している。
この変数を使うと、追加で学習したときに通算で何ステップ学習したかがわかる。
元の save()
関数は save_org()
関数に変更した。また、save_org()
関数で保存したチェックポイントをロードする関数は load_org()
関数として定義した。
# Save a checkpoint
class Mario(Mario):
def save(self):
'''
save_path = (
self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
)
torch.save(
dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
save_path,
)
'''
# changed by nitta
os.makedirs(self.save_dir, exist_ok=True)
d = dict(
model=self.net.state_dict(),
exploration_rate=self.exploration_rate,
curr_step=self.curr_step,
)
save_path = self.save_dir / f"mario_net_{self.curr_step}.chkpt"
if (os.path.isfile(save_path)):
os.remove(save_path)
torch.save(d, save_path)
save_path2 = self.save_dir / "mario_net.chkpt"
if (os.path.isfile(save_path2)):
os.remove(save_path2)
torch.save(d, save_path2) # latest model
print(f"MarioNet saved to {save_path} at step {self.curr_step}")
def load(self, path): # defined by nitta
checkpoint = torch.load(load_path, map_location=(
'cuda' if torch.cuda.is_available else 'cpu'
))
mario.net.load_state_dict(checkpoint['model'])
mario.exploration_rate = checkpoint['exploration_rate']
mario.curr_step = checkpoint['curr_step']
mario.base_step = mario.curr_step
class Mario(Mario):
def save_org(self): # 元の save(self) 関数
save_path = (
self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
)
torch.save(
dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
save_path,
)
print(f"MarioNet saved to {save_path} at step {self.curr_step}")
def load_org(self, path): # defined by nitta
checkpoint = torch.load(load_path, map_location=(
'cuda' if torch.cuda.is_available else 'cpu'
))
mario.net.load_state_dict(checkpoint['model'])
mario.exploration_rate = checkpoint['exploration_rate']
mario.curr_step = 20000 * 200 # may be
mario.base_step = mario.curr_step
# Gather all into one
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.burnin = 1e4 # 訓練の前の経験の最小値
self.learn_every = 3 # Q_online を更新する間の経験数
self.sync_every = 1e4 # Q_target と Q_online を同期させる間の経験数
def learn(self):
if self.curr_step % self.sync_every == 0:
self.sync_Q_target()
if self.curr_step % self.save_every == 0:
self.save()
if self.curr_step - self.base_step < self.burnin: # changed by nitta (base_step)
return None, None
if self.curr_step % self.learn_every != 0:
return None, None
# sampling from memory
state, next_state, action, reward, done = self.recall()
# Get TD Estimate
td_est = self.td_estimate(state, action)
# Get TD Target
td_tgt = self.td_target(reward, next_state, done)
# Backpropagate loss through Q_online
loss = self.update_Q_online(td_est, td_tgt)
return (td_est.mean().item(), loss)
import numpy as np
import time, datetime
class MetricLogger:
def __init__(self, save_dir):
os.makedirs(save_dir, exist_ok=True)
self.save_log = save_dir / "log"
with open(self.save_log, "w") as f:
f.write(
f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
f"{'TimeDelta':>15}{'Time':>20}\n"
)
self.ep_rewards_plot = save_dir / "reward_plot.jpg"
self.ep_lengths_plot = save_dir / "length_plot.jpg"
self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
self.ep_avg_qs_plot = save_dir / "q_plot.jpg"
# history metrics
self.ep_rewards = []
self.ep_lengths = []
self.ep_avg_losses = []
self.ep_avg_qs = []
# moving averages, added for every call to record()
self.moving_avg_ep_rewards = []
self.moving_avg_ep_lengths = []
self.moving_avg_ep_avg_losses = []
self.moving_avg_ep_avg_qs = []
# current episode metric
self.init_episode()
# timing
self.record_time = time.time()
def log_step(self, reward, loss, q):
self.curr_ep_reward += reward
self.curr_ep_length += 1
if loss:
self.curr_ep_loss += loss
self.curr_ep_q += q
self.curr_ep_loss_length += 1
def log_episode(self):
"Mark end of episode"
self.ep_rewards.append(self.curr_ep_reward)
self.ep_lengths.append(self.curr_ep_length)
if self.curr_ep_loss_length == 0:
ep_avg_loss = 0
ep_avg_q = 0
else:
ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
self.ep_avg_losses.append(ep_avg_loss)
self.ep_avg_qs.append(ep_avg_q)
self.init_episode()
def init_episode(self):
self.curr_ep_reward = 0.0
self.curr_ep_length = 0
self.curr_ep_loss = 0.0
self.curr_ep_q = 0.0
self.curr_ep_loss_length = 0
def record(self, episode, epsilon, step):
mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
self.moving_avg_ep_rewards.append(mean_ep_reward)
self.moving_avg_ep_lengths.append(mean_ep_length)
self.moving_avg_ep_avg_losses.append(mean_ep_loss)
self.moving_avg_ep_avg_qs.append(mean_ep_q)
last_record_time = self.record_time
self.record_time = time.time()
time_since_last_record = np.round(self.record_time - last_record_time, 3)
print(
f"Episode {episode} - "
f"Step {step} - "
f"Epsilon {epsilon} - "
f"Mean Reward {mean_ep_reward} - "
f"Mean Length {mean_ep_length} - "
f"Mean Loss {mean_ep_loss} - "
f"Mean Q Value {mean_ep_q} - "
f"Time Delta {time_since_last_record} - "
f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
)
with open(self.save_log, "a") as f:
f.write(
f"{episode:8d}{step:8d}{epsilon:10.3f}"
f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}"
f"{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
f"{time_since_last_record:15.3f}"
f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
)
for metric in ["ep_lengths", "ep_avg_losses", "ep_avg_qs", "ep_rewards"]:
plt.clf()
plt.plot(getattr(self, f"moving_avg_{metric}"),label=f"moving_avg_{metric}")
plt.legend()
plt.savefig(getattr(self,f"{metric}_plot"))
LOAD_MODEL
が False
の場合は、新しくモデルを生成して Training (学習)を行う。
LOAD_MODEL
が True
の場合は、保存されているモデルを読み込んで、追加 Training (学習)を行う。
元の Web ページによれば、 マリオが自分の世界でのやり方を本当に学ぶには、少なくとも 40,000エピソードを繰り返す必要があるとのこと。
from pathlib import Path
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}\n")
save_dir = Path(SAVE_PREFIX + "/checkpoints")
#os.makedirs(save_dir, exist_ok=True)
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)
logger = MetricLogger(save_dir / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S"))
### Load the pre-trained Model of Mario
if LOAD_MODEL:
load_path = Path(SAVE_PREFIX + "/checkpoints") / "mario_net.chkpt"
mario.load(load_path)
###
mario.save_every = 10000 # 20000
episodes = 100 # 2000
for e in range(episodes):
state = env.reset()
# Play the game
while True:
# 状態の中でエージェントを動かす
action = mario.act(state)
# エージェントは行動する
next_state, reward, done, trunc, info = env.step(action)
# 記憶する
mario.cache(state, next_state, action, reward, done)
# 訓練する
q, loss = mario.learn()
# ログに記録する
logger.log_step(reward, loss, q)
# 状態を更新する
state = next_state
if done or info["flag_get"] or trunc:
break
logger.log_episode()
if ((e+1) %20 == 0) or ((e+1) == episodes):
logger.record(episode=(e+1), epsilon=mario.exploration_rate, step=mario.curr_step)
ここで、環境 env
から返される観測 observation
は LazyFrames
クラスのインスタンスであり、エージェント mario
が見ているゲーム画面を表す。
observation = env.reset()
observation, reward, done, trunc, info = env.step(action)
LazyFrames
クラスのインスタンスは
\_\_array\_\_()
で配列に変換できる。
上書きされないように copy()
しておくと安全である。
変換された配列の形式は
(C, H, W) = (4, 84, 84)
であり、4枚のグレースケール画像 ($84 \times 84$ )である。
C(=4) は時系列順にまとめてグループ化したものであり、ある画面に至る前の数ステップ分の画面があれば、画面内のキャラクタの動きを判定できる。
observation = env.reset()
print(type(observation))
print(observation.__array__().shape)
エージェントが見ている「環境」を動画にした。
時刻 $t$ においてエージェントに与えられる「環境」は、 $84 \times 84$のグレースケール画像を時系列順 ($t-3$, $t-2$, $t-1$, $t$) に4フレーム重ねた $4 \times 84 \times 84$ のデータである。
# ゲームを実行してみる
bak_exploration_rate = mario.exploration_rate
mario.exploration_rate = 0.0
observation = env.reset() # gym.wrappers.frame_stack.LazyFrames (4,84,84)
frames = [ observation.__array__().copy().transpose(1,2,0) ] # [注意] copy() してから保存しないと、全部同じ画像になってしまう
for step in range(2000):
action = mario.act(observation) # np.random.choice(2) # 0: walk left, 1: jump
observation, reward, done, trunc, info = env.step(action) # needs action from DNN
frames.append(observation.__array__().copy().transpose(1,2,0) )
if done or trunc or info["flag_get"]:
break
print(len(frames))
print(frames[0].shape)
mario.exploration_rate = bak_exploration_rate
# アニメーション表示 by nitta
display_frames_as_anim(frames, 'mario_video1.mp4')
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp mario_video1.mp4 {SAVE_PREFIX} # copy to the Google Drive
「エージェント mario 」はサイズが $84\times 84$ のグレースケール画像に変換されたゲーム画面を、$skip (=4)$ 画面毎に与えられ、行動を選択する。 エージェントは $skip(=4)$ された画面の間は、同じ行動を選択し続ける。
[自分へのメモ by nitta]
SkipFrame
クラスの step()
関数は $skip (=4)$ 枚ごとに画像を処理しているが、元の画像をリストとして保存するコードを追加した。
また、 保存した元のゲーム画面を返す get_observations()
関数を追加した。
bak_exploration_rate = mario.exploration_rate
mario.exploration_rate = 0.0
frames = []
observation = env.reset()
c,h,w = observation.__array__().shape
for step in range(4000):
action = mario.act(observation)
observation, reward, done, trunc, info = env.step(action)
for color_obs in env.get_observations():
frames.append(color_obs.copy())
if done or trunc or info["flag_get"]:
break;
print(len(frames))
print(frames[0].shape)
mario.exploration_rate = bak_exploration_rate
# アニメーション表示 with HTML5 (data is too many for jstml)
display_frames_as_anim(frames, 'mario_video2.mp4', True) # save to file
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp mario_video2.mp4 {SAVE_PREFIX} # copy to the Google Drive
# 最終状態のモデルを保存する
mario.save()
6500000 steps の訓練をしたあたりから、たまにマリオがゴールに到達できるようになってきた。
約8320000 steps 訓練後したモデルでは exploration_rate = 0.0 で実行して、 数回に1回の割合でゴールまでマリオが進む。 1 episode で約 200 steps 平均実行するので、これは 約 40000 episode 訓練した計算になる。