3.4.3 変分オートエンコーダの解析

GDL_code/03_04_vae_digits_analysis.ipynb

ダウンロードしたソースコードの 03_03_vae_digits.ipynb が train() でエラーを出すので、モデルを作成できなかった。

自分で checkpoint1 を取り除いて training すると、とりあえず学習は終了した。 学習済みのモデルは以下の場所にある。

ch03/run/vae/0002_digits/params.pkl
                        weights/weights.h5

In [1]:
%load_ext autoreload
%autoreload 2

ライブラリ

In [2]:
# DGL_code/utils/loaders.py
# gdl_ch03_01 と同じ
from tensorflow.keras.datasets import mnist

def load_mnist():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = x_train.astype('float32') / 255.0
    x_train = x_train.reshape(x_train.shape + (1,))
    x_test = x_test.astype('float32') / 255.0
    x_test = x_test.reshape(x_test.shape + (1,))
    return (x_train, y_train), (x_test, y_test)
In [3]:
# DGL_code/utils/loaders.py
import os
import pickle

def load_model(model_class, folder):
    with open(os.path.join(folder, 'params.pkl'), 'rb') as f:
        params = pickle.load(f)
    model = model_class(*params)
    model.load_weights(os.path.join(folder, 'weights/weights.h5'))
    return model 
In [4]:
# GDL_code/utils/callbacks.py
# gdl_ch03_01 と同じ

import os
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import Callback, LearningRateScheduler

class CustomCallback(Callback):
    def __init__(self, run_folder, print_every_n_batches, initial_epoch, vae):
        self.run_folder = run_folder
        self.print_every_n_batches = print_every_n_batches
        self.epoch = initial_epoch
        self.vae = vae
        
        
    def on_train_batch_end(self, batch, logs={}):
        if batch % self.print_every_n_batches == 0:
            z_new = np.random.normal(size=(1,self.vae.z_dim))
            reconst = self.vae.decoder.predict(np.array(z_new))[0].squeeze()
            
            filepath = os.path.join(self.run_folder, 'images', 'img_'+str(self.epoch).zfill(3)+'_'+str(batch)+'.jpg')
            if len(reconst.shape) == 2:
                plt.imsave(filepath, reconst, cmap='gray_r')
            else:
                plt.imsave(filepath, reconst)
        
        
    def on_epoch_begin(self, epoch, logs={}):
        self.epoch += 1
        
        
def step_decay_schedule(initial_lr, decay_factor=0.5, step_size=1):
    '''
    Wrapper function to create a LearningRateScheduler with step decay schedule.
    '''
    def schedule(epoch):
        new_lr = initial_lr * (decay_factor ** np.floor(epoch/step_size))
        return new_lr
    return LearningRateScheduler(schedule)

変分エンコーダ

In [5]:
# DGL_code/models/VAE.py
# gld_ch03_03.ipynb と同じ
# train(), train_with_generator() においてcallbacks からcheckpoint1 をはずした by nitta


from tensorflow.keras.layers import Input, Conv2D, LeakyReLU, BatchNormalization, Dropout, Flatten, Dense, Reshape, Conv2DTranspose, Activation
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ModelCheckpoint

import os
import pickle


class VariationalAutoencoder():
    def __init__(self, 
                input_dim,
                encoder_conv_filters,
                encoder_conv_kernel_size,
                encoder_conv_strides,
                decoder_conv_t_filters,
                decoder_conv_t_kernel_size,
                decoder_conv_t_strides,
                z_dim,
                 r_loss_factor,   ### added
                use_batch_norm = False,
                use_dropout = False
                ):
            self.name = 'variational_autoencoder'
            self.input_dim = input_dim
            self.encoder_conv_filters = encoder_conv_filters
            self.encoder_conv_kernel_size = encoder_conv_kernel_size
            self.encoder_conv_strides = encoder_conv_strides
            self.decoder_conv_t_filters = decoder_conv_t_filters
            self.decoder_conv_t_kernel_size = decoder_conv_t_kernel_size
            self.decoder_conv_t_strides = decoder_conv_t_strides
            self.z_dim = z_dim
            self.r_loss_factor = r_loss_factor   ### added
            
            self.use_batch_norm = use_batch_norm
            self.use_dropout = use_dropout
            
            self.n_layers_encoder = len(encoder_conv_filters)
            self.n_layers_decoder = len(decoder_conv_t_filters)
            
            self._build()
 

    def _build(self):
        ### THE ENCODER
        encoder_input = Input(shape=self.input_dim, name='encoder_input')
        x = encoder_input
        
        for i in range(self.n_layers_encoder):
            conv_layer =Conv2D(
                filters = self.encoder_conv_filters[i],
                kernel_size = self.encoder_conv_kernel_size[i],
                strides = self.encoder_conv_strides[i],
                padding  = 'same',
                name = 'encoder_conv_' + str(i)
            )
            x = conv_layer(x)

            if self.use_batch_norm:                   ### The order of layers is opposite to AutoEncoder
                x = BatchNormalization()(x)        ###   AE: LeakyReLU -> BatchNorm
            x = LeakyReLU()(x)                           ###   VAE: BatchNorm -> LeakyReLU
            
            if self.use_dropout:
                x = Dropout(rate = 0.25)(x)
        
        shape_before_flattening = K.int_shape(x)[1:]
        
        x = Flatten()(x)
        
        self.mu = Dense(self.z_dim, name='mu')(x)    ### added
        self.log_var = Dense(self.z_dim, name='log_var')(x)  ### added
        self.z = Sampling(name='encoder_output')([self.mu, self.log_var]) ### added
        
        self.encoder = Model(encoder_input, [self.mu, self.log_var, self.z], name='encoder')   ### added
        
        # encoder_output = Dense(self.z_dim, name='encoder_output')(x)   ### deleted      

        # self.encoder = Model(encoder_input, encoder_output)   ### deleted
        
        ### THE DECODER
        decoder_input = Input(shape=(self.z_dim,), name='decoder_input')
        x = Dense(np.prod(shape_before_flattening))(decoder_input)
        x = Reshape(shape_before_flattening)(x)
        
        for i in range(self.n_layers_decoder):
            conv_t_layer =   Conv2DTranspose(
                filters = self.decoder_conv_t_filters[i],
                kernel_size = self.decoder_conv_t_kernel_size[i],
                strides = self.decoder_conv_t_strides[i],
                padding = 'same',
                name = 'decoder_conv_t_' + str(i)
            )
            x = conv_t_layer(x)
            
            if i < self.n_layers_decoder - 1:
                if self.use_batch_norm:              ### The order of layers is opposite to AutoEncoder
                    x = BatchNormalization()(x)   ###     AE: LeakyReLU -> BatchNorm
                x = LeakyReLU()(x)                     ###      VAE: BatchNorm -> LeakyReLU                
                if self.use_dropout:
                    x = Dropout(rate=0.25)(x)
            else:
                x = Activation('sigmoid')(x)
       
        decoder_output = x
        self.decoder = Model(decoder_input, decoder_output, name='decoder')  ### added (name)
        #self.decoder = Model(decoder_input, decoder_output)                               ### deleted
        
        ### THE FULL AUTOENCODER
        self.model = VAEModel(self.encoder, self.decoder, self.r_loss_factor)
        
        #model_input = encoder_input                                       ### deleted
        #model_output = self.decoder(encoder_output)         ### deleted
        
        #self.model = Model(model_input, model_output)      ### deleted

        
    def compile(self, learning_rate):
        self.learning_rate = learning_rate
        optimizer = Adam(lr=learning_rate)
        #def r_loss(y_true, y_pred):                                                            ### deleted
        #    return K.mean(K.square(y_true - y_pred), axis = [1,2,3])     ### deleted
        #self.model.compile(optimizer=optimizer, loss = r_loss)              ### deleted
        self.model.compile(optimizer=optimizer)   ### added
        
        
    def save(self, folder):
        if not os.path.exists(folder):
            os.makedirs(folder)
            os.makedirs(os.path.join(folder, 'viz'))
            os.makedirs(os.path.join(folder, 'weights'))
            os.makedirs(os.path.join(folder, 'images'))
            
        with open(os.path.join(folder, 'params.pkl'), 'wb') as f:
            pickle.dump([
                self.input_dim,
                self.encoder_conv_filters,
                self.encoder_conv_kernel_size,
                self.encoder_conv_strides,
                self.decoder_conv_t_filters,
                self.decoder_conv_t_kernel_size,
                self.decoder_conv_t_strides,
                self.z_dim,
                self.use_batch_norm,
                self.use_dropout
            ], f)
            
        self.plot_model(folder)
        
        
    def plot_model(self, run_folder):
        ### start of section added by nitta
        path = os.path.join(run_folder, 'viz')
        if not os.path.exists(path):
            os.makedirs(path)
        ### end of section added by nitta
        plot_model(self.model, to_file=os.path.join(run_folder, 'viz/model.png'), show_shapes=True, show_layer_names=True)
        plot_model(self.encoder, to_file=os.path.join(run_folder, 'viz/encoder.png'), show_shapes=True, show_layer_names=True)
        plot_model(self.decoder, to_file=os.path.join(run_folder, 'viz/decoder.png'), show_shapes=True, show_layer_names=True)

        
    def load_weights(self, filepath):
        self.model.load_weights(filepath)
        
        
    def train(self, x_train, batch_size, epochs, run_folder, print_every_n_batches=100, initial_epoch=0, lr_decay=1):
        custom_callback = CustomCallback(run_folder, print_every_n_batches, initial_epoch, self)
        lr_sched = step_decay_schedule(initial_lr=self.learning_rate, decay_factor=lr_decay, step_size=1)
        
        checkpoint_filepath = os.path.join(run_folder, "weights/weights-{epoch:03d}-{loss:.2f}.h5")         ### added (Bug?)
        checkpoint1 = ModelCheckpoint(checkpoint_filepath, save_weights_only=True, verbose=1)           ### added
        checkpoint2 = ModelCheckpoint(os.path.join(run_folder, 'weights/weights.h5'), save_weights_only=True, verbose=1)
        #callbacks_list = [checkpoint1, checkpoint2, custom_callback, lr_sched]   ### added
        callbacks_list = [checkpoint2, custom_callback, lr_sched]  ### deleted
        self.model.fit(
            x_train,
            x_train,
            batch_size = batch_size,
            shuffle = True,
            epochs = epochs,
            initial_epoch = initial_epoch,
            callbacks = callbacks_list)
        
        ### added
        ###   第2引数のdata_flow はgeneratorである。model.fit()の第1引数x=input_dataにgeneratorが渡される場合は第2引数y=target_dataは指定しない(xから得られるので)。 
        ###   引数に steps_per_epoch が追加されて、model.fit()の引数に渡す点が異なる
    def train_with_generator(self, data_flow, epochs, steps_per_epoch, run_folder, print_every_n_batches=100, initial_epoch=0, lr_decay=1):
        custom_callback = CustomCallback(run_folder, print_every_n_batches, initial_epoch, self)
        lr_sched = step_decay_schedule(initial_lr=self.learning_rate, decay_factor=lr_decay, step_size=1)
        checkpoint_filepath = os.path.join(run_folder, "weights/weights-{epoch:03d}-{loss:.2f}.h5")         ###(Bug?)
        checkpoint1 = ModelCheckpoint(checkpoint_filepath, save_weights_only=True, verbose=1) 
        checkpoint2 = ModelCheckpoint(os.path.join(run_folder, 'weights/weights.h5'), save_weights_only=True, verbose=1)
        #callbacks_list = [checkpoint1, checkpoint2, custom_callback, lr_sched] 
        callbacks_list = [checkpoint2, custom_callback, lr_sched] 
        self.model.fit(
            data_flow,
            shuffle = True,
            epochs = epochs,
            initial_epoch = initial_epoch,
            callbacks = callbacks_list,
            steps_per_epoch=steps_per_epoch)
In [6]:
# GDL_code/models/VAE.py

from tensorflow.keras.layers import Layer
from tensorflow.keras import backend as K

class Sampling(Layer):
    def call(self, inputs):
        mu, log_var = inputs
        epsilon = K.random_normal(shape=K.shape(mu), mean=0., stddev=1.)
        return mu + K.exp(log_var / 2) * epsilon
In [7]:
import tensorflow as tf

class VAEModel(Model):
    def __init__(self, encoder, decoder, r_loss_factor, **kwargs):
        super(VAEModel, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.r_loss_factor = r_loss_factor

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.square(data - reconstruction), axis = [1,2,3]
            )
            reconstruction_loss *= self.r_loss_factor
            kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
            kl_loss = tf.reduce_sum(kl_loss, axis = 1)
            kl_loss *= -0.5
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }

    def call(self,inputs):
        latent = self.encoder(inputs)
        return self.decoder(latent)

定数

In [8]:
SECTION = 'vae'
RUN_ID = '0002'
DATA_NAME = 'digits'
RUN_FOLDER = 'run/{}/'.format(SECTION)
RUN_FOLDER += '_'.join([RUN_ID, DATA_NAME])

モデルのロード

In [9]:
vae = load_model(VariationalAutoencoder, RUN_FOLDER)

データのロード

In [10]:
(x_train, y_train), (x_test, y_test) = load_mnist()

Reconstructiong original paintings

テスト用画像から 10 個を選択し、一旦エンコードしてから、デコードして画像を生成してみる。上が元画像で、数字はデコードされた2次元座標、下が生成された画像。

In [11]:
n_to_show = 10
example_idx = np.random.choice(range(len(x_test)), n_to_show)
example_images = x_test[example_idx]

_, _, z_points = vae.encoder.predict(example_images)
reconst_images = vae.decoder.predict(z_points)
In [12]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

fig, ax = plt.subplots(2, n_to_show, figsize=(1.4 * n_to_show, 1.4 * 3))

for i in range(n_to_show):
    img = example_images[i].squeeze();
    ax[0][i].imshow(img, cmap='gray_r')
    ax[0][i].axis('off')
    ax[0][i].text(0.5, -0.35, str(np.round(z_points[i],1)), fontsize=12, ha='center', transform=ax[0][i].transAxes)
    
    img2 = reconst_images[i].squeeze();
    ax[1][i].imshow(img2, cmap='gray_r')
    
plt.subplots_adjust(hspace=1)
plt.show()

Mr N. Coder's Wall

5000 個のテスト用画像をランダムに選択して、エンコードしてみる。 エンコードされた2次元座標は、まず、白黒で表示してみる。

In [13]:
n_to_show = 5000
example_idx = np.random.choice(range(len(x_test)), n_to_show)
example_images = x_test[example_idx]
example_labels = y_test[example_idx]

_, _, z_points = vae.encoder.predict(example_images)
In [14]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

min_x = min(z_points[:, 0])
max_x = max(z_points[:, 0])
min_y = min(z_points[:, 1])
max_x = max(z_points[:, 1])

fig, ax = plt.subplots(1, 1, figsize=(12, 12))
ax.scatter(z_points[:, 0], z_points[:, 1], c='black', alpha=0.5, s=2)
plt.show()

正解ラベルに応じて、点の色を変化させると、次のようになる。 同じ正解ラベルを持つ画像が、連続された近い領域に写像されていることがわかる。

In [15]:
%matplotlib inline
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(12, 12))
map = ax.scatter(z_points[:, 0], z_points[:, 1], c=example_labels, cmap='rainbow', alpha=0.5, s=2)

plt.colorbar(map)   # plt.colorbar() だとエラーになるので注意。pltに対して描画していない場合は、colorbar()の引数にMappableを指定する必要がある。
plt.show()

The new generated art exhibition

$15 \times 2 = 30$ 個の座標をランダムに生成し、decoder で画像を生成してみる。

In [16]:
import numpy as np

n_rows = 15
n_lines = 2

x = np.random.normal(size = n_rows * n_lines)
y = np.random.normal(size = n_rows * n_lines)

z = np.array(list(zip(x, y)))
reconst = vae.decoder.predict(z)

まず、テスト用画像から5000個をランダムに選択して、エンコード結果を正解ラベル毎に色分けして潜在空間に表示する。

潜在空間から、$15 \times 2 = 30$ 個の点の座標をランダムに選択し、 デコードして画像を生成してみる。 選んだ点を洗剤空間に黒い点として表示すると次の図となる。

In [17]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(12, 12))

# training dataset
map = ax.scatter(z_points[:, 0], z_points[:, 1], c=example_labels, cmap='rainbow', alpha=0.5, s=2)

# random data
ax.scatter(z[:, 0], z[:, 1], c='black', alpha=1, s=20)

plt.colorbar(map)   # plt.colorbar() だとエラーになるので注意。pltに対して描画していない場合は、colorbar()の引数にMappableを指定する必要がある。
plt.show()

$15 \times 2 = 30$ 個の座標から画像をデコーダによって生成した。 生成された画像と、元の座標を示す。

In [18]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

fig, ax = plt.subplots(n_lines, n_rows, figsize=(1.4 * n_rows, 1.4 * (n_lines + 1)))
plt.subplots_adjust(hspace=1)

for y in range(n_lines):
    for x in range(n_rows):
        idx = y * n_rows + x
        img = reconst[idx].squeeze()
        ax[y][x].imshow(img, cmap='gray_r')
        ax[y][x].text(0.5, -0.35, str(np.round(z[idx], 1)), fontsize=12, ha='center', transform=ax[0][x].transAxes)
        ax[y][x].axis('off')
        
plt.show()

累積分布関数 (Cumulative Distribution Function) norm.cdf()

norm.cdf(x, loc=0, scale=1)

  • loc: 平均
  • scale: 標準偏差
  • 返り値: x 以下の値が発生する確率 (%)
  • loc, scale が省略された場合は、標準正規分布に対する累積分布関数となる。

この関数を使うことにより、$x$ の値が $X$ のとき、それ以下の値が発生する確率は何%かを計算できる。

In [19]:
import numpy as np
from scipy.stats import norm

n_to_show = 5000

example_idx = np.random.choice(range(len(x_test)), n_to_show)
example_images = x_test[example_idx]
example_labels = y_test[example_idx]

_, _, z_points = vae.encoder.predict(example_images)
p_points = norm.cdf(z_points)
In [20]:
%matplotlib inline
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1,2,figsize=(8*2, 8))

map1 = ax[0].scatter(z_points[:, 0], z_points[:, 1], cmap='rainbow', c=example_labels, alpha=0.5, s=2)
plt.colorbar(map1)

map2 = ax[1].scatter(p_points[:, 0], p_points[:, 1], cmap='rainbow', c=example_labels, alpha=0.5, s=5)

plt.show()