Updated 19/Nov/2021 by Yoshihisa Nitta  

Variational Auto Encoder Analysis for MNIST dataset with Tensorflow 2 on Google Colab

To run this notebook, we assume it is in the state after training with VAE_MNIST_Train.ipynb.

MNIST データセットを用いて Variational Auto Encoder をGooble Colab 上の Tensorflow 2 で解析する

このノートブックを実行するには VAE_MNIST_Train.ipynb で訓練した後の状態であることを仮定している。

In [1]:
#! pip install tensorflow==2.7.0
In [2]:
%tensorflow_version 2.x

import tensorflow as tf
print(tf.__version__)
2.7.0

Check the Google Colab runtime environment

Google Colab 実行環境を確認する

In [3]:
! nvidia-smi
! cat /proc/cpuinfo
! cat /etc/issue
! free -h
Mon Nov 22 09:20:10 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.44       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   34C    P0    26W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+
processor	: 0
vendor_id	: GenuineIntel
cpu family	: 6
model		: 79
model name	: Intel(R) Xeon(R) CPU @ 2.20GHz
stepping	: 0
microcode	: 0x1
cpu MHz		: 2199.998
cache size	: 56320 KB
physical id	: 0
siblings	: 2
core id		: 0
cpu cores	: 1
apicid		: 0
initial apicid	: 0
fpu		: yes
fpu_exception	: yes
cpuid level	: 13
wp		: yes
flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsaveopt arat md_clear arch_capabilities
bugs		: cpu_meltdown spectre_v1 spectre_v2 spec_store_bypass l1tf mds swapgs taa
bogomips	: 4399.99
clflush size	: 64
cache_alignment	: 64
address sizes	: 46 bits physical, 48 bits virtual
power management:

processor	: 1
vendor_id	: GenuineIntel
cpu family	: 6
model		: 79
model name	: Intel(R) Xeon(R) CPU @ 2.20GHz
stepping	: 0
microcode	: 0x1
cpu MHz		: 2199.998
cache size	: 56320 KB
physical id	: 0
siblings	: 2
core id		: 0
cpu cores	: 1
apicid		: 1
initial apicid	: 1
fpu		: yes
fpu_exception	: yes
cpuid level	: 13
wp		: yes
flags		: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd ibrs ibpb stibp fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm rdseed adx smap xsaveopt arat md_clear arch_capabilities
bugs		: cpu_meltdown spectre_v1 spectre_v2 spec_store_bypass l1tf mds swapgs taa
bogomips	: 4399.99
clflush size	: 64
cache_alignment	: 64
address sizes	: 46 bits physical, 48 bits virtual
power management:

Ubuntu 18.04.5 LTS \n \l

              total        used        free      shared  buff/cache   available
Mem:            12G        738M          9G        1.2M        2.0G         11G
Swap:            0B          0B          0B

Mount Google Drive from Google Colab

Google Colab から GoogleDrive をマウントする

In [4]:
from google.colab import drive
drive.mount('/content/drive')
Mounted at /content/drive
In [5]:
! ls /content/drive
MyDrive  Shareddrives

Download source file from Google Drive or nw.tsuda.ac.jp

Basically, gdown from Google Drive. Download from nw.tsuda.ac.jp above only if the specifications of Google Drive change and you cannot download from Google Drive.

Google Drive または nw.tsuda.ac.jp からファイルをダウンロードする

基本的に Google Drive から gdown してください。 Google Drive の仕様が変わってダウンロードができない場合にのみ、nw.tsuda.ac.jp からダウンロードしてください。

In [6]:
# Download source file
nw_path = './nw'
! rm -rf {nw_path}
! mkdir -p {nw_path}

if True:   # from Google Drive
    url_model =  'https://drive.google.com/uc?id=1ZCihR7JkMOity4wCr66ZCp-3ZOlfwwo3'
    ! (cd {nw_path}; gdown {url_model})
else:      # from nw.tsuda.ac.jp
    URL_NW = 'https://nw.tsuda.ac.jp/lec/GoogleColab/pub'
    url_model = f'{URL_NW}/models/VariationalAutoEncoder.py'
    ! wget -nd {url_model} -P {nw_path}
Downloading...
From: https://drive.google.com/uc?id=1ZCihR7JkMOity4wCr66ZCp-3ZOlfwwo3
To: /content/nw/VariationalAutoEncoder.py
100% 18.7k/18.7k [00:00<00:00, 14.9MB/s]
In [7]:
! cat {nw_path}/VariationalAutoEncoder.py
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

import os
import pickle
import datetime

class Sampling(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, inputs):
        mu, log_var = inputs
        epsilon = tf.keras.backend.random_normal(shape=tf.keras.backend.shape(mu), mean=0., stddev=1.)
        return mu + tf.keras.backend.exp(log_var / 2) * epsilon


class VAEModel(tf.keras.models.Model):
    def __init__(self, encoder, decoder, r_loss_factor, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.r_loss_factor = r_loss_factor


    @tf.function
    def loss_fn(self, x):
        z_mean, z_log_var, z = self.encoder(x)
        reconstruction = self.decoder(z)
        reconstruction_loss = tf.reduce_mean(
            tf.square(x - reconstruction), axis=[1,2,3]
        ) * self.r_loss_factor
        kl_loss = tf.reduce_sum(
            1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var),
            axis = 1
        ) * (-0.5)
        total_loss = reconstruction_loss + kl_loss
        return total_loss, reconstruction_loss, kl_loss


    @tf.function
    def compute_loss_and_grads(self, x):
        with tf.GradientTape() as tape:
            total_loss, reconstruction_loss, kl_loss = self.loss_fn(x)
        grads = tape.gradient(total_loss, self.trainable_weights)
        return total_loss, reconstruction_loss, kl_loss, grads


    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        total_loss, reconstruction_loss, kl_loss, grads = self.compute_loss_and_grads(data)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        return {
            "loss": tf.math.reduce_mean(total_loss),
            "reconstruction_loss": tf.math.reduce_mean(reconstruction_loss),
            "kl_loss": tf.math.reduce_mean(kl_loss),
        }

    def call(self,inputs):
        _, _, z = self.encoder(inputs)
        return self.decoder(z)


class VariationalAutoEncoder():
    def __init__(self, 
                 input_dim,
                 encoder_conv_filters,
                 encoder_conv_kernel_size,
                 encoder_conv_strides,
                 decoder_conv_t_filters,
                 decoder_conv_t_kernel_size,
                 decoder_conv_t_strides,
                 z_dim,
                 r_loss_factor,   ### added
                 use_batch_norm = False,
                 use_dropout = False,
                 epoch = 0
                ):
        self.name = 'variational_autoencoder'
        self.input_dim = input_dim
        self.encoder_conv_filters = encoder_conv_filters
        self.encoder_conv_kernel_size = encoder_conv_kernel_size
        self.encoder_conv_strides = encoder_conv_strides
        self.decoder_conv_t_filters = decoder_conv_t_filters
        self.decoder_conv_t_kernel_size = decoder_conv_t_kernel_size
        self.decoder_conv_t_strides = decoder_conv_t_strides
        self.z_dim = z_dim
        self.r_loss_factor = r_loss_factor   ### added
            
        self.use_batch_norm = use_batch_norm
        self.use_dropout = use_dropout

        self.epoch = epoch
            
        self.n_layers_encoder = len(encoder_conv_filters)
        self.n_layers_decoder = len(decoder_conv_t_filters)
            
        self._build()
 

    def _build(self):
        ### THE ENCODER
        encoder_input = tf.keras.layers.Input(shape=self.input_dim, name='encoder_input')
        x = encoder_input
        
        for i in range(self.n_layers_encoder):
            x = conv_layer = tf.keras.layers.Conv2D(
                filters = self.encoder_conv_filters[i],
                kernel_size = self.encoder_conv_kernel_size[i],
                strides = self.encoder_conv_strides[i],
                padding  = 'same',
                name = 'encoder_conv_' + str(i)
            )(x)

            if self.use_batch_norm:                                ### The order of layers is opposite to AutoEncoder
                x = tf.keras.layers.BatchNormalization()(x)        ###   AE: LeakyReLU -> BatchNorm
            x = tf.keras.layers.LeakyReLU()(x)                     ###   VAE: BatchNorm -> LeakyReLU
            
            if self.use_dropout:
                x = tf.keras.layers.Dropout(rate = 0.25)(x)
        
        shape_before_flattening = tf.keras.backend.int_shape(x)[1:]
        
        x = tf.keras.layers.Flatten()(x)
        
        self.mu = tf.keras.layers.Dense(self.z_dim, name='mu')(x)
        self.log_var = tf.keras.layers.Dense(self.z_dim, name='log_var')(x) 
        self.z = Sampling(name='encoder_output')([self.mu, self.log_var])
        
        self.encoder = tf.keras.models.Model(encoder_input, [self.mu, self.log_var, self.z], name='encoder')
        
        
        ### THE DECODER
        decoder_input = tf.keras.layers.Input(shape=(self.z_dim,), name='decoder_input')
        x = decoder_input
        x = tf.keras.layers.Dense(np.prod(shape_before_flattening))(x)
        x = tf.keras.layers.Reshape(shape_before_flattening)(x)
        
        for i in range(self.n_layers_decoder):
            x = conv_t_layer =   tf.keras.layers.Conv2DTranspose(
                filters = self.decoder_conv_t_filters[i],
                kernel_size = self.decoder_conv_t_kernel_size[i],
                strides = self.decoder_conv_t_strides[i],
                padding = 'same',
                name = 'decoder_conv_t_' + str(i)
            )(x)
            
            if i < self.n_layers_decoder - 1:
                if self.use_batch_norm:                           ### The order of layers is opposite to AutoEncoder
                    x = tf.keras.layers.BatchNormalization()(x)   ###     AE: LeakyReLU -> BatchNorm
                x = tf.keras.layers.LeakyReLU()(x)                ###      VAE: BatchNorm -> LeakyReLU                
                if self.use_dropout:
                    x = tf.keras.layers.Dropout(rate=0.25)(x)
            else:
                x = tf.keras.layers.Activation('sigmoid')(x)
       
        decoder_output = x
        self.decoder = tf.keras.models.Model(decoder_input, decoder_output, name='decoder')  ### added (name)
        
        ### THE FULL AUTOENCODER
        self.model = VAEModel(self.encoder, self.decoder, self.r_loss_factor)
        
        
    def save(self, folder):
        self.save_params(os.path.join(folder, 'params.pkl'))
        self.save_weights(folder)


    @staticmethod
    def load(folder, epoch=None):  # VariationalAutoEncoder.load(folder)
        params = VariationalAutoEncoder.load_params(os.path.join(folder, 'params.pkl'))
        VAE = VariationalAutoEncoder(*params)
        if epoch is None:
            VAE.load_weights(folder)
        else:
            VAE.load_weights(folder, epoch-1)
            VAE.epoch = epoch
        return VAE

        
    def save_params(self, filepath):
        dpath, fname = os.path.split(filepath)
        if dpath != '' and not os.path.exists(dpath):
            os.makedirs(dpath)
        with open(filepath, 'wb') as f:
            pickle.dump([
                self.input_dim,
                self.encoder_conv_filters,
                self.encoder_conv_kernel_size,
                self.encoder_conv_strides,
                self.decoder_conv_t_filters,
                self.decoder_conv_t_kernel_size,
                self.decoder_conv_t_strides,
                self.z_dim,
                self.r_loss_factor,
                self.use_batch_norm,
                self.use_dropout,
                self.epoch
            ], f)


    @staticmethod
    def load_params(filepath):
        with open(filepath, 'rb') as f:
            params = pickle.load(f)
        return params


    def save_weights(self, folder, epoch=None):
        if epoch is None:
            self.save_model_weights(self.encoder, os.path.join(folder, f'weights/encoder-weights.h5'))
            self.save_model_weights(self.decoder, os.path.join(folder, f'weights/decoder-weights.h5'))
        else:
            self.save_model_weights(self.encoder, os.path.join(folder, f'weights/encoder-weights_{epoch}.h5'))
            self.save_model_weights(self.decoder, os.path.join(folder, f'weights/decoder-weights_{epoch}.h5'))


    def save_model_weights(self, model, filepath):
        dpath, fname = os.path.split(filepath)
        if dpath != '' and not os.path.exists(dpath):
            os.makedirs(dpath)
        model.save_weights(filepath)


    def load_weights(self, folder, epoch=None):
        if epoch is None:
            self.encoder.load_weights(os.path.join(folder, f'weights/encoder-weights.h5'))
            self.decoder.load_weights(os.path.join(folder, f'weights/decoder-weights.h5'))
        else:
            self.encoder.load_weights(os.path.join(folder, f'weights/encoder-weights_{epoch}.h5'))
            self.decoder.load_weights(os.path.join(folder, f'weights/decoder-weights_{epoch}.h5'))


    def save_images(self, imgs, filepath):
        z_mean, z_log_var, z = self.encoder.predict(imgs)
        reconst_imgs = self.decoder.predict(z)
        txts = [ f'{p[0]:.3f}, {p[1]:.3f}' for p in z ]
        AutoEncoder.showImages(imgs, reconst_imgs, txts, 1.4, 1.4, 0.5, filepath)
      

    def compile(self, learning_rate):
        self.learning_rate = learning_rate
        optimizer = tf.keras.optimizers.Adam(lr=learning_rate)
        self.model.compile(optimizer=optimizer)     # CAUTION!!!: loss(y_true, y_pred) function is not specified.
        
        
    def train_with_fit(
            self,
            x_train,
            batch_size,
            epochs,
            run_folder='run/'
    ):
        history = self.model.fit(
            x_train,
            x_train,
            batch_size = batch_size,
            shuffle=True,
            initial_epoch = self.epoch,
            epochs = epochs
        )
        if (self.epoch < epochs):
            self.epoch = epochs

        if run_folder != None:
            self.save(run_folder)
            self.save_weights(run_folder, self.epoch-1)
        
        return history


    def train_generator_with_fit(
            self,
            data_flow,
            epochs,
            run_folder='run/'
    ):
        history = self.model.fit(
            data_flow,
            initial_epoch = self.epoch,
            epochs = epochs
        )
        if (self.epoch < epochs):
            self.epoch = epochs

        if run_folder != None:
            self.save(run_folder)
            self.save_weights(run_folder, self.epoch-1)
        
        return history


    def train_tf(
            self,
            x_train,
            batch_size = 32,
            epochs = 10,
            shuffle = False,
            run_folder = 'run/',
            optimizer = None,
            save_epoch_interval = 100,
            validation_data = None
    ):
        start_time = datetime.datetime.now()
        steps = x_train.shape[0] // batch_size

        total_losses = []
        reconstruction_losses = []
        kl_losses = []

        val_total_losses = []
        val_reconstruction_losses = []
        val_kl_losses = []

        for epoch in range(self.epoch, epochs):
            epoch_loss = 0
            indices = tf.range(x_train.shape[0], dtype=tf.int32)
            if shuffle:
                indices = tf.random.shuffle(indices)
            x_ = x_train[indices]

            step_total_losses = []
            step_reconstruction_losses = []
            step_kl_losses = []
            for step in range(steps):
                start = batch_size * step
                end = start + batch_size

                total_loss, reconstruction_loss, kl_loss, grads = self.model.compute_loss_and_grads(x_[start:end])
                optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
                
                step_total_losses.append(np.mean(total_loss))
                step_reconstruction_losses.append(np.mean(reconstruction_loss))
                step_kl_losses.append(np.mean(kl_loss))
            
            epoch_total_loss = np.mean(step_total_losses)
            epoch_reconstruction_loss = np.mean(step_reconstruction_losses)
            epoch_kl_loss = np.mean(step_kl_losses)

            total_losses.append(epoch_total_loss)
            reconstruction_losses.append(epoch_reconstruction_loss)
            kl_losses.append(epoch_kl_loss)

            val_str = ''
            if not validation_data is None:
                x_val = validation_data
                tl, rl, kl = self.model.loss_fn(x_val)
                val_tl = np.mean(tl)
                val_rl = np.mean(rl)
                val_kl = np.mean(kl)
                val_total_losses.append(val_tl)
                val_reconstruction_losses.append(val_rl)
                val_kl_losses.append(val_kl)
                val_str = f'val loss total {val_tl:.3f} reconstruction {val_rl:.3f} kl {val_kl:.3f} '

            if (epoch+1) % save_epoch_interval == 0 and run_folder != None:
                self.save(run_folder)
                self.save_weights(run_folder, self.epoch)

            elapsed_time = datetime.datetime.now() - start_time
            print(f'{epoch+1}/{epochs} {steps} loss: total {epoch_total_loss:.3f} reconstruction {epoch_reconstruction_loss:.3f} kl {epoch_kl_loss:.3f} {val_str}{elapsed_time}')

            self.epoch += 1

        if run_folder != None:
            self.save(run_folder)
            self.save_weights(run_folder, self.epoch-1)

        dic = { 'loss' : total_losses, 'reconstruction_loss' : reconstruction_losses, 'kl_loss' : kl_losses }
        if not validation_data is None:
            dic['val_loss'] = val_total_losses
            dic['val_reconstruction_loss'] = val_reconstruction_losses
            dic['val_kl_loss'] = val_kl_losses

        return dic
            

    def train_tf_generator(
            self,
            data_flow,
            epochs = 10,
            run_folder = 'run/',
            optimizer = None,
            save_epoch_interval = 100,
            validation_data_flow = None
    ):
        start_time = datetime.datetime.now()
        steps = len(data_flow)

        total_losses = []
        reconstruction_losses = []
        kl_losses = []

        val_total_losses = []
        val_reconstruction_losses = []
        val_kl_losses = []

        for epoch in range(self.epoch, epochs):
            epoch_loss = 0

            step_total_losses = []
            step_reconstruction_losses = []
            step_kl_losses = []

            for step in range(steps):
                x, _ = next(data_flow)

                total_loss, reconstruction_loss, kl_loss, grads = self.model.compute_loss_and_grads(x)
                optimizer.apply_gradients(zip(grads, self.model.trainable_weights))
                
                step_total_losses.append(np.mean(total_loss))
                step_reconstruction_losses.append(np.mean(reconstruction_loss))
                step_kl_losses.append(np.mean(kl_loss))
            
            epoch_total_loss = np.mean(step_total_losses)
            epoch_reconstruction_loss = np.mean(step_reconstruction_losses)
            epoch_kl_loss = np.mean(step_kl_losses)

            total_losses.append(epoch_total_loss)
            reconstruction_losses.append(epoch_reconstruction_loss)
            kl_losses.append(epoch_kl_loss)

            val_str = ''
            if not validation_data_flow is None:
                step_val_tl = []
                step_val_rl = []
                step_val_kl = []
                for i in range(len(validation_data_flow)):
                    x, _ = next(validation_data_flow)
                    tl, rl, kl = self.model.loss_fn(x)
                    step_val_tl.append(np.mean(tl))
                    step_val_rl.append(np.mean(rl))
                    step_val_kl.append(np.mean(kl))
                val_tl = np.mean(step_val_tl)
                val_rl = np.mean(step_val_rl)
                val_kl = np.mean(step_val_kl)
                val_total_losses.append(val_tl)
                val_reconstruction_losses.append(val_rl)
                val_kl_losses.append(val_kl)
                val_str = f'val loss total {val_tl:.3f} reconstruction {val_rl:.3f} kl {val_kl:.3f} '

            if (epoch+1) % save_epoch_interval == 0 and run_folder != None:
                self.save(run_folder)
                self.save_weights(run_folder, self.epoch)

            elapsed_time = datetime.datetime.now() - start_time
            print(f'{epoch+1}/{epochs} {steps} loss: total {epoch_total_loss:.3f} reconstruction {epoch_reconstruction_loss:.3f} kl {epoch_kl_loss:.3f} {val_str}{elapsed_time}')

            self.epoch += 1

        if run_folder != None:
            self.save(run_folder)
            self.save_weights(run_folder, self.epoch-1)

        dic = { 'loss' : total_losses, 'reconstruction_loss' : reconstruction_losses, 'kl_loss' : kl_losses }
        if not validation_data_flow is None:
            dic['val_loss'] = val_total_losses
            dic['val_reconstruction_loss'] = val_reconstruction_losses
            dic['val_kl_loss'] = val_kl_losses

        return dic


    @staticmethod
    def showImages(imgs1, imgs2, txts, w, h, vskip=0.5, filepath=None):
        n = len(imgs1)
        fig, ax = plt.subplots(2, n, figsize=(w * n, (2+vskip) * h))
        for i in range(n):
            if n == 1:
                axis = ax[0]
            else:
                axis = ax[0][i]
            img = imgs1[i].squeeze()
            axis.imshow(img, cmap='gray_r')
            axis.axis('off')

            axis.text(0.5, -0.35, txts[i], fontsize=10, ha='center', transform=axis.transAxes)

            if n == 1:
                axis = ax[1]
            else:
                axis = ax[1][i]
            img2 = imgs2[i].squeeze()
            axis.imshow(img2, cmap='gray_r')
            axis.axis('off')

        if not filepath is None:
            dpath, fname = os.path.split(filepath)
            if dpath != '' and not os.path.exists(dpath):
                os.makedirs(dpath)
            fig.savefig(filepath, dpi=600)
            plt.close()
        else:
            plt.show()

    @staticmethod
    def plot_history(vals, labels):
        colors = ['red', 'blue', 'green', 'orange', 'black', 'pink']
        n = len(vals)
        fig, ax = plt.subplots(1, 1, figsize=(9,4))
        for i in range(n):
            ax.plot(vals[i], c=colors[i], label=labels[i])
        ax.legend(loc='upper right')
        ax.set_xlabel('epochs')
        # ax[0].set_ylabel('loss')
        
        plt.show()

Preparing MNIST dataset

MNIST データセットを用意する

In [8]:
%tensorflow_version 2.x

import tensorflow as tf
import numpy as np

print(tf.__version__)
2.7.0
In [9]:
# prepare data
(x_train_raw, y_train_raw), (x_test_raw, y_test_raw) = tf.keras.datasets.mnist.load_data()
print(x_train_raw.shape)
print(y_train_raw.shape)
print(x_test_raw.shape)
print(y_test_raw.shape)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
(60000, 28, 28)
(60000,)
(10000, 28, 28)
(10000,)
In [10]:
x_train = x_train_raw.reshape(x_train_raw.shape+(1,)).astype('float32') / 255.0
x_test = x_test_raw.reshape(x_test_raw.shape+(1,)).astype('float32') / 255.0
print(x_train.shape)
print(x_test.shape)
(60000, 28, 28, 1)
(10000, 28, 28, 1)
In [11]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

N = 10
selected_indices = np.random.choice(x_train_raw.shape[0], N)

fig, ax = plt.subplots(1, N, figsize=(2.8 * N, 2.8))
for i in range(N):
    ax[i].imshow(x_train_raw[selected_indices[i]],cmap='gray')
    ax[i].axis('off')

plt.show()

Load the Neural Network Model trained before

Use the VariationalAutoEndoer class downloaded from nw.tsuda.ac.jp. Load the saved weights of the Model.

学習済みのニューラルネットワーク・モデルをロードする

nw.tsuda.ac.jp からダウンロードした VariationalAutoEncoder クラスを使う。 ネットワークの重みは、保存したものをロードする。

In [12]:
save_path1 = '/content/drive/MyDrive/ColabRun/VAE01/'
save_path2 = '/content/drive/MyDrive/ColabRun/VAE02/'
save_path3 = '/content/drive/MyDrive/ColabRun/VAE03/'
In [13]:
from nw.VariationalAutoEncoder import VariationalAutoEncoder
import os

vae = VariationalAutoEncoder.load(save_path3)

print(vae.epoch)
200
In [14]:
# [example] load the weights of the specified epoch.
vae_young = VariationalAutoEncoder.load(save_path3, 3)

print(vae_young.epoch)
3

Display the distribution of points in the latent space.

Encode 10000 images of x_test to generated 2D coordinates and draw as points. The label of the image is expressed by the color of the dots.

潜在空間における点の分布を表示する

x_test の10000枚の画像をエンコードして2次元座標を生成し、点として描画する。各画像に描かれている数字は、点の色で表現する。

In [15]:
n_to_show = len(x_test)
example_idx = np.random.choice(range(len(x_test)), n_to_show)
example_images = x_test[example_idx]
example_labels = y_test_raw[example_idx]

_, _, z_points = vae.encoder.predict(example_images)

min_x = min(z_points[:, 0])
max_x = max(z_points[:, 0])
min_y = min(z_points[:, 1])
max_y = max(z_points[:, 1])
In [16]:
%matplotlib inline
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(12, 12))
map = ax.scatter(z_points[:, 0], z_points[:, 1], c=example_labels, cmap='rainbow', alpha=0.5, s=2)

plt.colorbar(map)

plt.show()
In [17]:
# Display 30 images as points in the latent space.
# 30 枚の画像を潜在空間の点として表示する。
import numpy as np

table_row = 10    # 表の横方向サイズ
table_line = 3   #表の縦方向サイズ

x = np.random.uniform(min_x, max_x, size=table_line * table_row)
y = np.random.uniform(min_y, max_y, size=table_line * table_row)
z_grid = np.array(list(zip(x,y)))    # (x, y) : 2D coordinates
reconst = vae.decoder.predict(z_grid)
In [18]:
%matplotlib inline
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(12, 12))
map = ax.scatter(z_points[:, 0], z_points[:, 1], c=example_labels, cmap='rainbow', alpha=0.5, s=2)

ax.scatter(z_grid[:,0], z_grid[:,1], c='red', alpha=1, s=20)

for i in range(len(z_grid)):
    ax.text(z_grid[i][0], z_grid[i][1], str(i))

plt.colorbar(map)

plt.show()
In [19]:
# Display the 30 original images selected as samples.
# サンプルとして選んだ30枚の元画像を表示する
%matplotlib inline
import matplotlib.pyplot as plt

VSKIP=0.5   # vertical space between subplots

fig, ax = plt.subplots(table_line, table_row, figsize=(2.8 * table_row, 2.8 * table_line * (1+VSKIP)))
plt.subplots_adjust(hspace = VSKIP)
                       
for y in range(table_line):
    for x in range(table_row):
        idx = table_row * y + x
        img = reconst[idx].squeeze()
        ax[y][x].imshow(img, cmap='gray')
        ax[y][x].text(0.5, -0.35, f'{idx} ({z_grid[idx][0]:.2f}, {z_grid[idx][1]:.2f})', fontsize=16, ha='center', transform=ax[y][x].transAxes)
        ax[y][x].axis('off')
        
plt.show()

Divide the latent space into grids and find out what kind of image is generated (decoded) from each coordinates.

Generate images from points on 20x20 grid. The generated images are displayed as a table with 20 rows and 20 columns.

潜在空間をグリッドに区切って、各座標からどのような画像が生成(デコード)されるか調べる

20×20 のグリッドから、画像を生成する。 生成した画像は 20 行 20列の表として表示する。

In [20]:
import numpy as np

n_grid = 20

x = np.linspace(min_x, max_x, n_grid)
y = np.linspace(min_y, max_y, n_grid)

xv, yv = np.meshgrid(x, y)
xv = xv.flatten()
yv = yv.flatten()
z_grid2 = np.array(list(zip(xv, yv)))

reconst2 = vae.decoder.predict(z_grid2)
In [21]:
%matplotlib inline
import matplotlib.pyplot as plt

fig, ax = plt.subplots(n_grid, n_grid, figsize=(n_grid, n_grid))
for i in range(len(reconst2)):
    img = reconst2[i].squeeze()
    line = n_grid - 1 - i // n_grid
    row = i % n_grid
    ax[line][row].imshow(img, cmap='gray')
    ax[line][row].axis('off')
    
plt.show()

Draw a grid in the latent space.

グリッドを潜在空間に描画する。

In [22]:
%matplotlib inline
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 1, figsize=(12, 12))

# Encoded 10000 test images for x_test. The points are color-coded with the correct label.
# x_test の10000 個のテスト用画像をエンコードした点。点は正解ラベルで色分けした。
map = ax.scatter(z_points[:, 0], z_points[:, 1], c=example_labels, cmap='rainbow', alpha=0.5, s=2)

# Display the grid as black dots.
# グリッドを黒点として表示する。
ax.scatter(z_grid2[:, 0], z_grid2[:, 1], c='black', alpha=1, s=20)

plt.colorbar(map)   # plt.colorbar() だとエラーになるので注意。pltに対して描画していない場合は、colorbar()の引数にMappableを指定する必要がある。
plt.show()
In [22]: