Jul/29/2023 by Yoshihisa Nitta
Google Colab 対応コード
# by nitta
import os
is_colab = 'google.colab' in str(get_ipython()) # for Google Colab
if is_colab:
from google.colab import drive
drive.mount('/content/drive')
SAVE_PREFIX='/content/drive/MyDrive/DeepLearning/openai_gym_rt'
else:
SAVE_PREFIX='.'
# Check if running on Colab
#is_colab = 'google.colab' in str(get_ipython()) # for Google Colab
# packages
if is_colab:
!apt update -qq
!apt upgrade -qq
!apt install -qq xvfb
!pip -q install pyvirtualdisplay
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib
matplotlib.rcParams['animation.embed_limit'] = 2**32
# Show multiple images as animation
# Colab compatible
%matplotlib notebook
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import animation
from IPython import display
import os
import gym
def display_frames_as_anim(frames, filepath=None):
"""
Displays a list of frames as a gif, with controls
"""
H, W, _ = frames[0].shape
fig, ax = plt.subplots(1, 1, figsize=(W/100.0, H/100.0))
ax.axis('off')
patch = plt.imshow(frames[0])
def animate(i):
display.clear_output(wait=True)
patch.set_data(frames[i])
return patch
anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50, repeat=False)
if not filepath is None:
dpath, fname = os.path.split(filepath)
if dpath != '' and not os.path.exists(dpath):
os.makedirs(dpath)
anim.save(filepath)
if is_colab:
display.display(display.HTML(anim.to_jshtml()))
#plt.close()
else:
plt.show()
import gym
ENV = 'CartPole-v1'
# ランダムに行動する
np.random.seed(12345)
env = gym.make(ENV, new_step_api=True, render_mode='rgb_array') # new_step_api, render_mode
observation = env.reset()
frames = [ env.render()[0] ]
states = [ observation ]
for step in range(100):
action = env.action_space.sample() ## np.random.choice(2) # 0: left, 1: right ##
obserbation, reward, done, truncated, info = env.step(action) # when new_step_api==True, two bool (done, truncated) returned
frames.append(env.render()[0]) # env.render(mode='rgb_array') -> env.render()[0]
states.append(observation)
if done:
break
print(len(states))
print(states)
# アニメーション表示する
%matplotlib notebook
import matplotlib.pyplot as plt
#display_frames_as_anim(frames) # display now
display_frames_as_anim(frames, 'cartpole_video1b.mp4') # save to file
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp cartpole_video2a.mp4 {SAVE_PREFIX} # copy to the Google Drive
ディープラーニングを使わない従来の強化学習として3通りの手法があった。
このうち、Q学習における行動価値関数 $Q(s,a)$ をディープ・ニューラルネットワークで実現する DQN が提案された。
# CartPole で観測した状態変数の値を名前をつけて保存するために namedtuple を使う
from collections import namedtuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))
# 定数
GAMMA = 0.99
MAX_STEPS = 200
NUM_EPISODES = 500
# p.130
import random
class ReplayMemory:
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.index = 0
def push(self, state, action, state_next, reward):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.index] = Transition(state, action, state_next, reward)
self.index = (self.index + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
# p.149
import torch
class Net(torch.nn.Module):
def __init__(self, n_in, n_mid, n_out):
super().__init__()
self.fc1 = torch.nn.Linear(n_in, n_mid)
self.fc2 = torch.nn.Linear(n_mid, n_mid)
self.fc3 = torch.nn.Linear(n_mid, n_out)
def forward(self, x):
h1 = torch.nn.functional.relu(self.fc1(x))
h2 = torch.nn.functional.relu(self.fc2(h1))
output = self.fc3(h2)
return output
# Double DQN の実装
BATCH_SIZE = 32
CAPACITY = 10000
class Brain:
def __init__(self, num_states, num_actions):
self.num_actions = num_actions
self.memory = ReplayMemory(CAPACITY) # 経験を記憶するメモリ
n_in, n_mid, n_out = num_states, 32, num_actions # main と target という2つのネットワークを生成する
self.main_q_network = Net(n_in, n_mid, n_out)
self.target_q_network = Net(n_in, n_mid, n_out)
self.optimizer = torch.optim.Adam( # 最適化
self.main_q_network.parameters(),
lr = 0.0001
)
def replay(self):
'''
Experience Replay で学習する
'''
# 1. メモリサイズを確認する
if len(self.memory) < BATCH_SIZE:
return
# 2. mini-batch を作成する
self.batch, self.state_batch, self.action_batch, self.reward_batch, self.non_final_next_states = self.make_minibatch()
# 3. 教師信号となる Q(s_t, a_t) を求める
self.expected_state_action_values = self.get_expected_state_action_values()
# 4. パラメータを更新する
self.update_main_q_network()
def decide_action(self, state, episode):
# ε-greedy 法で徐々に最適行動の選択を増やす
epsilon = 0.5 * (1 / (episode + 1))
if epsilon <= np.random.uniform(0, 1):
self.main_q_network.eval() # 推論モードに切り替える
with torch.no_grad():
# torch.max(dim=1) は最大値とインデックスのtupleが返される
# インデックスにアクセスするのは [1] or .index
# .view(1, 1) により [torch.LongTensor(1,1)] にreshapeされる
action = self.main_q_network(state).max(dim=1).indices.view(1, 1)
else:
action = torch.LongTensor(
[[ random.randrange(self.num_actions) ]] # 0 or 1 の乱数
)
return action
def make_minibatch(self):
'''
2. mini-batch を作成する
'''
# 2.1 メモリから mini-batch 分のデータを取り出す
transitions = self.memory.sample(BATCH_SIZE)
# 2.2 各変数を mini-batch に対応する形へ変形する
# trainsition = [ (state, action, state_next, reward) ...]
# これを変形して次の形式にする
# ( [state, ...], [action, ...], [state_next, ...], [reward, ...])
batch = Transition(*zip(*transitions))
# 2.3 'Transitions' named tuple から項目を取り出し、
# concatenate して torch.FloatTensor size(BATCH_SIZE, 1) に変換する
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
return batch, state_batch, action_batch, reward_batch, non_final_next_states
def get_expected_state_action_values(self):
'''
3. Q(s_t, a_t) を求める
'''
# 3.1 推論モードに切り替える
self.main_q_network.eval()
self.target_q_network.eval()
# 3.2 Q(s_t, a_t) を求める。
# self.model(state_batch) は各action(左 or 右)に対するQ値を全部返すので
# 返り値は "torch.FloatTensor size (BATCH_SIZE, 2)"
# 実行した action a_t に対応する Q 値を gather(dim, index) で引っ張り出す。
self.state_action_values = self.main_q_network(
self.state_batch
).gather(1, self.action_batch)
# 3.3 次の状態がある index のmax{Q(s_{t+1}, a)} 値を求める。
# cartpole が done になっておらず、next_state があるかをチェックするインデックスマスクを作成する
non_final_mask = torch.ByteTensor(
tuple(map(lambda s: s is not None, self.batch.next_state))
)
next_state_values = torch.zeros(BATCH_SIZE) # initial value is 0
a_m = torch.zeros(BATCH_SIZE).type(torch.LongTensor)
# 次の状態での最大の行動 a_m を main_q_network から求める。
# 出力にアクセスし、max(dim=1)で列方向の最大値の [値, index] を求める
# (注意) PyTorch で Tensor 配列の最大値を求めるには torch.max(dim=n) を使う
# dimを指定した場合は値とインデックスのtupleが返されるので
# [1] or .indices としてそのインデックスの値 (index=1 ) を出力する
# detach でその値を取り出す
a_m[non_final_mask] = self.main_q_network(
self.non_final_next_states
).detach().max(dim=1).indices
# 次の状態のあるものだけフィルターし、size 32 を (32, 1) へ変形する
a_m_non_final_next_states = a_m[non_final_mask].view(-1, 1)
# 次の状態がある index の、行動 a_m の Q 値を target Q-Network から求める
# detach() で取り出す。
# squeeze で (minibatch, 1) を (minibatch,) へ
next_state_values[non_final_mask] = self.target_q_network(
self.non_final_next_states
).gather(1, a_m_non_final_next_states).detach().squeeze()
# 3.4 教師となる Q(s_t, a_t) 値を、Q学習の式から求める。
expected_state_action_values = self.reward_batch + GAMMA * next_state_values
return expected_state_action_values
def update_main_q_network(self):
'''
4. パラメータの更新
'''
# 4.1 訓練モードに切り替える
self.main_q_network.train()
# 4.2 損失関数 (smooth_l1_loss は Huberloss)
# expected_state_action_values は形状が (minibatch,)なので、
# unsqueezeで形状を (minibatch, 1) へ
loss = torch.nn.functional.smooth_l1_loss(
self.state_action_values,
self.expected_state_action_values.unsqueeze(1)
)
# 4.3 パラメータを更新する
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def update_target_q_network(self):
'''
target_q_network を main_q_network と同じにする
'''
self.target_q_network.load_state_dict(self.main_q_network.state_dict())
class Agent:
def __init__(self, num_states, num_actions):
self.brain = Brain(num_states, num_actions)
def update_q_function(self):
self.brain.replay()
def get_action(self, state, episode):
action = self.brain.decide_action(state, episode)
return action
def memorize(self, state, action, state_next, reward):
self.brain.memory.push(state, action, state_next, reward)
##### ADDED
def update_target_q_function(self):
self.brain.update_target_q_network()
# [自分へのメモ] env.step() の返り値の変更、env.render() の引数と返り値の変更に対応した
# run() の中で動画(frames)を表示せず、frames を返すように変更した
class Environment:
def __init__(self):
self.env = gym.make(ENV, new_step_api=True, render_mode='rgb_array')
self.num_states = self.env.observation_space.shape[0]
self.num_actions = self.env.action_space.n
self.agent = Agent(self.num_states, self.num_actions)
def run(self):
episode_10_list = np.zeros(10) # 10 試行分の立ち続けた step 数を格納し、平均ステップ数を出力に利用
complete_episodes = 0 # 195 steps 以上立ち続けた試行数
episode_final = False # 最後の試行であるか
frames = [] # 最後の試行を動画にするための配列
for episode in range(NUM_EPISODES):
observation = self.env.reset()
state = observation # 観測をそのまま状態 s として使用する
state = torch.from_numpy(state).type(torch.FloatTensor)
state = torch.unsqueeze(state, dim=0) # FloatTensor size(4) -> size(1,4)
for step in range(MAX_STEPS):
if episode_final is True:
frames.append(self.env.render()[0])
action = self.agent.get_action(state, episode)
# 行動 a_t の実行により、s_{t+1} と done フラグを求める。
# (注) new_api=True なのでtruncated も見るべし。
observation_next, _, done, truncated, _ = self.env.step(action.item())
if done or truncated:
state_next = None # 次の状態は無い
episode_10_list = np.hstack((episode_10_list[1:], step+1))
if step < 195:
reward = torch.FloatTensor([-1.0])
complete_episodes = 0
else:
reward = torch.FloatTensor([1.0])
complete_episodes = complete_episodes + 1
elif step == MAX_STEPS - 1:
state_next = None
episode_10_list = np.hstack((episode_10_list[1:], step+1))
reward = torch.FloatTensor([1.0])
complete_episodes += 1
truncated = True
else:
reward = torch.FloatTensor([0.0])
state_next = observation_next
state_next = torch.from_numpy(state_next).type(torch.FloatTensor)
# FloatTensor size (4) --> (1, 4)
state_next = torch.unsqueeze(state_next, dim=0)
# メモリに経験を追加
self.agent.memorize(state, action, state_next, reward)
# Experience Replay で Q 関数を更新する
self.agent.update_q_function()
# 観測の更新
state = state_next
# 終了時の処理
if done or truncated:
print(f'{episode} Episode: Finished after {step+1} steps: average steps = {episode_10_list.mean(): .1f}')
#########################
# Double-DQN により追加
#########################
# 2試行に一度 target_q_network を main_q_network と同じにする
if (episode % 2 == 0):
self.agent.update_target_q_function()
break
if episode_final is True:
# display_frames_as_gif(frames)
break
# 10 連続で 200 steps 立ち続けたら成功
if complete_episodes >= 10:
print('10 consecutive success')
episode_final = True
return frames
# main クラス
cartpole_env = Environment()
frames = cartpole_env.run()
# アニメーション表示する
%matplotlib notebook
import matplotlib.pyplot as plt
#display_frames_as_anim(frames) # display now
display_frames_as_anim(frames, 'cartpole_video2b.mp4') # save to file
if is_colab: # copy to google drive
! mkdir -p {SAVE_PREFIX}
! cp cartpole_video2b.mp4 {SAVE_PREFIX} # copy to the Google Drive