Jul/30/2023 updated by Yoshihisa Nitta
Feb/03/2023 written by Yoshihisa Nitta
Google Colab 対応コード
# by nitta
import os
is_colab = 'google.colab' in str(get_ipython()) # for Google Colab
if is_colab:
from google.colab import drive
drive.mount('/content/drive')
SAVE_PREFIX='/content/drive/MyDrive/DeepLearning/openai_gym_rt'
else:
SAVE_PREFIX='.'
# Check if running on Colab
#is_colab = 'google.colab' in str(get_ipython()) # for Google Colab
# packages
if is_colab:
!apt update -qq
!apt upgrade -qq
!apt install -qq xvfb
!pip -q install pyvirtualdisplay
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib
matplotlib.rcParams['animation.embed_limit'] = 2**32
# Show multiple images as animation
# Colab compatible
%matplotlib notebook
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import animation
from IPython import display
import os
import gym
def display_frames_as_anim(frames, filepath=None):
"""
Displays a list of frames as a gif, with controls
"""
H, W, _ = frames[0].shape
fig, ax = plt.subplots(1, 1, figsize=(W/100.0, H/100.0))
ax.axis('off')
patch = plt.imshow(frames[0])
def animate(i):
display.clear_output(wait=True)
patch.set_data(frames[i])
return patch
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50, repeat=False)
if not filepath is None:
dpath, fname = os.path.split(filepath)
if dpath != '' and not os.path.exists(dpath):
os.makedirs(dpath)
anim.save(filepath)
if is_colab:
display.display(display.HTML(anim.to_jshtml()))
#plt.close()
else:
plt.show()
import gym
ENV = 'CartPole-v1'
# ランダムに行動する
np.random.seed(12345)
env = gym.make(ENV, new_step_api=True, render_mode='rgb_array') # new_step_api, render_mode
observation = env.reset()
frames = [ env.render()[0] ]
states = [ observation ]
for step in range(100):
action = env.action_space.sample() ## np.random.choice(2) # 0: left, 1: right ##
obserbation, reward, done, truncated, info = env.step(action) # when new_step_api==True, two bool (done, truncated) returned
frames.append(env.render()[0]) # env.render(mode='rgb_array') -> env.render()[0]
states.append(observation)
if done:
break
print(len(states))
print(states)
# アニメーション表示する
%matplotlib notebook
import matplotlib.pyplot as plt
#display_frames_as_anim(frames) # display now
display_frames_as_anim(frames, 'cartpole_video5a.mp4') # save to file
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp cartpole_video5a.mp4 {SAVE_PREFIX} # copy to the Google Drive
ディープラーニングを使わない従来の強化学習として3通りの手法があった。
このうち、Q学習における行動価値関数 $Q(s,a)$ をディープ・ニューラルネットワークで実現する DQN が提案された。
# CartPole で観測した状態変数の値を名前をつけて保存するために namedtuple を使う
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
# 定数
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500
# Replay Memory
import random
class ReplayMemory:
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.index = 0
def push(self, state, action, state_next, reward):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = Transition(state, action, state_next, reward)
self.index = (self.index + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
A3C (Asynchronous Advantage Actor-Critic) は3つの工夫を組み合わせたアルゴリズムである。 Asynchronous は、「複数のエージェントを用意し非同期)で分散学習する」ことを意味する。 Advance は、「(1 step だけでなく、) 2 step 以上先までの状態を使用して更新を行う」ことを意味する。 Actor-Critic は、「方策反復法と価値反復法を組み合わせる」ことを意味し、Actor は方策を出力する関数、Critic は価値を出力する関数を表す。
A2C は、A3C から非同期的要素を取り除いたものである。複数のエージェントは1つのニューラルネットワークを共有する。
A2C は A3C という手法から派生した分散学習型の深層強化学習であり、 A2C という名前は、Advantage 学習と Actor-Critic に由来する。 この2つの手法を分散型の強化学習に組み合わせて利用する。
分散学習は、エージェントを複数用意して強化学習を行が、 全エージェントが同じディープ・ニューラルネットワークを共有する。
Q学習やDQNではQ関数の更新の際、 $Q(s_t,a_t)$ が $R(t+1) + \gamma \cdot \max [ Q(s_{t+1}, a) ]$ に近づくように、Q関数を学習する。 $Q(s_t, a_t)$ の学習に 1 step 先の行動価値関数の値 $Q(s_{t+1}, a)$ を使う。 Advantage 学習はこのQ関数の更新を 1 step 先でなく、2 step 以上先まで動かして更新する。
2 step 先まで考慮した場合のQ関数の更新式は次の通り。
$Q(s_t, a_t) \rightarrow R(t+1) + \gamma \cdot R(t+2) + (\gamma^2) \cdot \max_a [Q(s_{t+2}, a)]$
ただし、何ステップも先まで Advantage 学修すると、何ステップも最適ではない Q関数で行動を決定して間違った学習をする確率が増えてしまう。 そのため、適度なステップ数で Advantage 学習をするのが一般的である。
Q学習は価値反復法の手法であるが、Actor-Critic は方策反復法と価値反復法の両方を使用する。
Actor-Critic のニューラルネットワークでは、入力は DQN と同じく状態変数である。、たとえばCartPole では、「位置」「速度」「角度」「角速度」の4変数が入力となる。
Actor-Critic の出力は Actor と Critic それぞれの出力の集合である。 Actor は行動を出力するので、出力の個数は「行動の種類数」である。 また、Critic は状態価値 $V^{\pi}_{s_t}$を出力するので、出力の個数は1である。 たとえば CartPole では行動は2種類なので、全体の出力数は3個となる。
状態価値 $V^{\pi}_{s_t}$ は、状態 $s_t$ になった場合にその先得られるであろう割引報酬和の期待値である。
誤差関数を定義する。
Actor 側で最大化したいのは、状態 $s_t$ において、結合パラメータ $\theta$ のニューラルネットワークを使用して行動し続けたときに得られる割引報酬和 $J(\theta , s_t )$ である。 方策勾配法を使うと割引報酬和は次の通り。
$J(\theta , s_t) = \mathbb{E} [ \log \pi_{\theta} (a|s) (Q^{\pi}(s,a) - V^{\pi}_{s})]$
$\mathbb{E}[]$ は期待値を計算するという意味で、実装時にはミニバッチの平均を求める。 $\log \pi_{\theta} (a|s)$ は状態 $s$ のときに行動 $a$ を採用する確率の $\log$ を計算したものである。
$Q^{\pi}(s,a)$ は状態 $s$ で行動 $a$ を採用した場合の行動価値である。 ただし、$Q^{\pi}(s,a)$ は行動 $a$ についての変数ではなく、定数として扱う。 A2C では行動価値を Advantage 学習で計算する。 $V^{\pi}_s$ は状態価値であり、Critic の出力である。
A2C および A3C では、Actor の学習に方策のエントロピー項を追加する。 エントロピー項は次の通り。
$\displaystyle \mbox{Actor}_\mbox{entropy} = \sum^{a} [\pi_{\theta} (a|s) \log \pi_{\theta} (a|s)]$
総和は行動の種類について総和を計算する意味である。 このエントロピー項は方策が行動をランダムに選択する作戦の場合(学習初期)が最大の値となる。 どれか一つの行動しか選択しない方策の場合はエントロピーが最小になる。 エントロピー項を追加することによって、学習初期は学習がゆっくりとなり、局所解に落ちるのを避けている。
Critic 側は状態価値 $V^{\pi}_s$ を正しく出力するように学習したいので、実際に行動して得られた行動価値 $Q^{\pi}(s,a)$ と出力 $V^{\pi}_s$ が一致するように学習する。 損失関数は次の通り。
$\mbox{loss}_\mbox{{Critic}} = (Q^{\pi}(s,a) - V^{\pi}_s)^2$
# p.177
NUM_PROCESSES = 32 # 同時に実行する環境
NUM_ADVANCED_STEP = 5 # 何ステップ進めて報酬和を計算するのか設定
# A2C の損失関数の計算のための定数設定
value_loss_coef = 0.5
entropy_coef = 0.01
max_grad_norm = 0.5
# p.178
# メモリクラスの定義
class RolloutStorage(object):
'''
Advantage 学習をするためのメモリクラス
'''
def __init__(self, num_steps, num_processes, obj_shape):
self.observations = torch.zeros(num_steps + 1, num_processes, 4)
self.masks = torch.ones(num_steps + 1, num_processes, 1)
self.rewards = torch.zeros(num_steps, num_processes, 1)
self.actions = torch.zeros(num_steps, num_processes, 1).long()
# 割引報酬和を格納
self.returns = torch.zeros(num_steps + 1, num_processes, 1)
self.index = 0 # insert する index
def insert(self, current_obs, action, reward, mask):
'''
次のindexにtransactionを格納する
'''
self.observations[self.index+1].copy_(current_obs)
self.masks[self.index+1].copy_(mask)
self.rewards[self.index].copy_(reward)
self.actions[self.index].copy_(action)
self.index = (self.index + 1) % NUM_ADVANCED_STEP # indexの更新
def after_update(self):
'''
Advantage する step 数が完了したら、最新のものを index0 に格納する
'''
self.observations[0].copy_(self.observations[-1])
self.masks[0].copy_(self.masks[-1])
def compute_returns(self, next_value):
'''
Advantage するステップ中の各ステップの割引報酬和を計算する。
5 step 目から逆向きに計算する。5step -> Advantage1, 4step -> Advantage2
'''
self.returns[-1] = next_value
for ad_step in reversed(range(self.rewards.size(0))):
self.returns[ad_step] = self.returns[ad_step + 1] * GAMMA * self.masks[ad_step + 1] + self.rewards[ad_step]
# p.179
# A2C の Deep Neural Network を構築する
import torch
import torch.nn as nn
import torch.nn.functional as F
class Net(torch.nn.Module):
def __init__(self, n_in, n_mid, n_out):
super().__init__()
self.fc1 = torch.nn.Linear(n_in, n_mid)
self.fc2 = torch.nn.Linear(n_mid, n_mid)
self.actor = torch.nn.Linear(n_mid, n_out) # Actor
self.critic = torch.nn.Linear(n_mid, 1) # Critic
def forward(self, x):
h1 = torch.nn.functional.relu(self.fc1(x))
h2 = torch.nn.functional.relu(self.fc2(h1))
critic_output = self.critic(h2) # 状態価値
actor_output = self.actor(h2) # 行動
return critic_output, actor_output
def act(self, x):
'''
状態xから行動を確率的に求める
'''
value, actor_output = self(x) # forward()関数をcallする
action_probs = torch.nn.functional.softmax(actor_output, dim=1)
action = action_probs.multinomial(num_samples=1)
return action
def get_value(self, x):
'''
状態xから状態価値を計算する
'''
value, actor_output = self(x)
return value
def evaluate_actions(self, x, actions):
'''
状態xから状態価値、実際の行動actionsのlog確率とエントロピーを求める
'''
value, actor_output = self(x)
log_probs = torch.nn.functional.log_softmax(actor_output, dim=1)
action_log_probs = log_probs.gather(1, actions) # 実際の行動のlog_probsを求める
probs = torch.nn.functional.softmax(actor_output, dim=1)
entropy = -(log_probs * probs).sum(-1).mean()
return value, action_log_probs, entropy
# p.180
# エージェントの頭脳クラスを定義する。全エージェントで共有する
import torch
from torch import optim
class Brain(object):
def __init__(self, actor_critic):
self.actor_critic = actor_critic
self.optimizer = torch.optim.Adam(self.actor_critic.parameters(), lr=0.01)
def update(self, rollouts):
'''
Advantage で計算した5つのstepのすべてを使って更新する
'''
obs_shape = rollouts.observations.size()[2:] # torch.Size([4, 84, 84])
num_steps = NUM_ADVANCED_STEP
num_processes = NUM_PROCESSES
values, action_log_probs, entropy = self.actor_critic.evaluate_actions(
rollouts.observations[:-1].view(-1, 4),
rollouts.actions.view(-1, 1)
)
# rollouts.observations[:-1].view(-1, 4) torch.Size([80, 4])
# rollouts.actions.view(-1, 1) torch.Size([80, 1])
# values torch.Size([80, 1])
# action_log_probs torch.Size([80, 1])
# entropy torch.Size([])
values = values.view(num_steps, num_processes, 1) # torch.Size([5, 32, 1])
action_log_probs = action_log_probs.view(num_steps, num_processes, 1)
# advantage (行動価値-状態価値)の計算
advantages = rollouts.returns[:-1] - values # torch.Size([5, 32, 1])
# Critic の loss を計算
value_loss = advantages.pow(2).mean()
# Actor の gain を計算する。あとでマイナスをかけて loss にする
action_gain = (action_log_probs * advantages.detach()).mean()
# detach して advantages を定数として扱う
# 誤差関数の総和
total_loss = (value_loss * value_loss_coef - action_gain - entropy * entropy_coef)
# 結合パラメータを更新する
self.actor_critic.train() # 訓練モードに
self.optimizer.zero_grad() # 勾配をリセット
total_loss.backward() # back propagation を計算する
torch.nn.utils.clip_grad_norm(self.actor_critic.parameters(), max_grad_norm)
# 一気に変化しないように 0.5 でその値を取り出す
self.optimizer.step() # 結合パラメータを更新する
Agent クラスは今回は用意せず、Environment クラスでマルチエージェントを取り扱うことにする。
Environment クラスではエージェントを複数生成し、Advantage 学習による報酬の計算も行う。
# p.1782
# [自分へのメモ] env.step() の返り値の変更、env.render() の引数と返り値の変更に対応した
# run() の中で動画(frames)を表示せず、frames を返すように変更した
import copy
class Environment:
def __init__(self, max_step=200):
self.max_step = max_step ## by nitta
# 同時に実行するエージェント数の環境を生成する
self.envs = [ gym.make(ENV, new_step_api=True, render_mode='rgb_array') for i in range(NUM_PROCESSES) ]
# 全エージェントが共有して持つ頭脳 Brain を生成する
self.n_in = self.envs[0].observation_space.shape[0] # 状態の数は 4
self.n_out = self.envs[0].action_space.n # 行動の数は2
self.n_mid = 32
self.actor_critic = Net(self.n_in, self.n_mid, self.n_out)
self.global_brain = Brain(self.actor_critic)
def run(self):
# 格納用変数を生成する
obs_shape = self.n_in
current_obs = torch.zeros(NUM_PROCESSES, obs_shape) # torch.Size([32, 4])
rollouts = RolloutStorage(
NUM_ADVANCED_STEP,
NUM_PROCESSES,
obs_shape
)
episode_rewards = torch.zeros([NUM_PROCESSES, 1]) # 現在の試行の報酬
final_rewards = torch.zeros([NUM_PROCESSES, 1]) # 最後の試行の報酬
obs_np = np.zeros([NUM_PROCESSES, obs_shape])
reward_np = np.zeros([NUM_PROCESSES, 1])
done_np = np.zeros([NUM_PROCESSES, 1])
truncated_np = np.zeros([NUM_PROCESSES, 1]) ## by nitta
each_step = np.zeros(NUM_PROCESSES) # 各環境のstep数を記録
episode = 0 # 環境0の試行数
# 初期状態の開始
obs = [self.envs[i].reset() for i in range(NUM_PROCESSES)]
obs = np.array(obs)
obs = torch.from_numpy(obs).float() # torch.Size([32, 4])
current_obs = obs # 最新のobs
# advanced 学習用のオブジェクト rollouts の状態の1つ目に、現在の状態を保持
rollouts.observations[0].copy_(current_obs)
# 実行ループ
for j in range(NUM_EPISODES * NUM_PROCESSES):
# advanced 学習する step 数ごとに計算する
for step in range(NUM_ADVANCED_STEP):
# 行動を決める
with torch.no_grad():
action = self.actor_critic.act(rollouts.observations[step])
# (32, 1) -> (32,) -> tensor を Numpy へ
actions = action.squeeze(1).numpy()
# 1 step の実行
for i in range(NUM_PROCESSES):
obs_np[i], reward_np[i], done_np[i], truncated_np[i], _ = self.envs[i].step(actions[i]) ## by nitta
if each_step[i]+1 >= self.max_step: ## by nitta
truncated_np[i] = True
# episode の終了評価と、state_next を設定する
if done_np[i] or truncated_np[i]: ## by nitta
# 環境0 のときのみ出力する
if i == 0:
print(f'{episode} Episode: Finished after {each_step[i]+1} steps')
episode += 1
# 報酬を設定する
if each_step[i] < self.max_step - 5:
reward_np[i] = -1.0 # 途中でこけたら罰則として報酬(-1)を与える
else:
reward_np[i] = 1.0 # 立ったまま修了は報酬(1)を与える
each_step[i] = 0 # step数のリセット
obs_np[i] = self.envs[i].reset() # 実行環境のリセット
else:
reward_np[i] = 0.0 # 普段は報酬0
each_step[i] += 1
# 報酬を tensor に変換し、試行の報酬額に足す
reward = torch.from_numpy(reward_np).float()
episode_rewards += reward
# 各実行環境それぞれについて、done ならmaskは0に、継続中ならmask は1にする
masks_ = []
for done_, truncated_ in zip(done_np, truncated_np):
if done_ or truncated_:
masks_.append([0.0])
else:
masks_.append([1.0])
masks = torch.FloatTensor(masks_)
#masks = torch.FloatTensor(
# [[0.0] if done_ or truncated_ else [1.0] for done_, truncated_in zip(done_np, truncated_np)]
#)
# 最後の試行の総報酬額を更新する
final_rewards *= masks # 継続中は1をかけてそのまま、doneでは0をかけてリセット
# 継続中は0を足す。doneではepisode_rewardsを足す
final_rewards += (1 - masks) * episode_rewards
# 試行の総報酬額を更新する
episode_rewards *= masks # 継続中は1をかけてそのまま。doneでは0に
# 現在の状態をdoneで全部0にする
current_obs *= masks
# current_obs を更新する
obs = torch.from_numpy(obs_np).float() # torch.Size([32, 4])
current_obs = obs # 最新のobsを格納する
# メモリオブジェクトに現stepのtransitionを挿入する
rollouts.insert(current_obs, action.data, reward, masks)
# advanced のfor loop終了
# advanced した最終stepの状態から予想する状態価値を計算する
with torch.no_grad():
next_value = self.actor_critic.get_value(
rollouts.observations[-1] # torch.Size([6, 32, 4])
).detach()
# 全stepの割引報酬和を計算して、rollouts の変数 returns を更新する
rollouts.compute_returns(next_value)
# ネットワークとrolloutを更新する
self.global_brain.update(rollouts)
rollouts.after_update()
# 全部のNUM_PROCESSESが200step立ち続けたら成功
if final_rewards.sum().numpy() >= NUM_PROCESSES:
print('success')
break
# main クラス
cartpole_env = Environment(max_step=300)
cartpole_env.run()
frames = []
observation = cartpole_env.envs[0].reset()
for _ in range(cartpole_env.max_step):
frames.append(cartpole_env.envs[0].render()[0])
observation = torch.from_numpy(observation.reshape(-1,4)).float()
action = cartpole_env.actor_critic.act(observation)
action = action.squeeze().numpy()
observation, reward, done, truncated, info = cartpole_env.envs[0].step(action)
if done or truncated:
break
# アニメーション表示する
%matplotlib notebook
import matplotlib.pyplot as plt
#display_frames_as_anim(frames) # display now
display_frames_as_anim(frames, 'cartpole_video5b.mp4') # save to file
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp cartpole_video5b.mp4 {SAVE_PREFIX} # copy to the Google Drive