pytorchでLSTMに入門したい...したくない?

この記事はなに

この記事は、PyTorchでLSTMを使ってみようという入門記事になります。
pytorchでLSTMを使おうと思った時に、英語のドキュメントは無理。加えて、巷に転がってるチュートリアルや入門記事はいきなり言語処理の実装をしていて、ただpytorchでLSTMを使ってみたい人にとってはハードルが高い。さらにさらに、落ちているプログラムが動かないこともザラ。
ということで、分かりやすく、サンプルプログラムが動く、かつ例がミニマムであることを心がけて本記事を書きました。
本記事ではガッツリLSTMの解説をすることはしません。期待していた人はごめんなさい。あくまでpytorchでLSTMを扱えるようにすることが主目的です。

この記事で分かること、分からないこと

分かること
  • pytorchでのLSTMの使い方。
分からないこと
  • LSTMやRNNの詳細。

対象

できるだけ分かりやすく書いていますが、流石にニューラルネットの基礎から教えるわけにもいかないので次の知識がある人を対象にします。

  • ニューラルネットやRNN、LSTMそれ自体に関する基礎的な知識。
  • python3の基礎文法。
  • 日本語。

実装すること

実装するのは、「sin波の予測」です。直前の何ステップかを入力することで、次のsinの値が何になるかを予想しようというものになります。
より具体的には、LSTMにノイズ混じりのsin波の値を50ステップ分(t = t_1, t_2, ... , t_50)入れて、次の値(t = t_51)を最終出力として得るようなモデルを、教師あり学習で学習します。
え、題材がつまんない? 面白いことがやりたいならseq2seqの機械翻訳機でも作りましょう。

環境

念のため僕の実行環境を記しておきます。

  • OS:mac
  • python3.6.2
  • anaconda3-5.0.0
  • pytorch 0.4.0

LSTMって? ああ!

全く説明しないものあれなので、一応軽くLSTMの解説をしたいと思います。と言っても、僕が説明するよりも分かりやすいであろう記事がいっぱい転がっているのですが。オススメはここです。図もいっぱい載っていてめちゃくちゃ分かりやすいと思います。
そんな記事読んでられねーよ! って人のために簡単に説明しますと、LSTMというのは頭のいいRNNです。
まず、RNNというのは好きな長さの時系列データをぶち込んで、時系列情報をいい感じに学習できるNNです。しかし、その特性上、時系列データが長くなると、時系列で早くに読み込まれたデータの情報が後のデータに埋もれて消えてしまったり、誤差伝搬時に誤差(勾配)が爆発・消失してしまうという問題がありました。
それを(たぶんおそらくきっと)解決してくれるのがLSTMです。LSTMは「(記憶)セル」と呼ばれるモジュールを持っており、このセルが「何を覚えておくべきか」=「何を忘れるべきか」、「何を新しく覚えるべきか」、「何を出力として出すべきか」なども一緒に学習してくれます。

pytorchではそんな複雑っぽいLSTMをたった1行で実装させてくれるので、LSTMの原理は最悪分からなくても何とかなります。

早速実装

0.はじめに

一番最初に、必要なimport文を列挙しておきます。これをソースコードの先頭にコピペしておけば、「なんかimport足りないんだけど...」となることは無いと思います。
また、本記事の一番後ろに最終的なソースコードの全文を載せてあるので、ぜひそれを見ながら読んでみてください。

import torch
import torch.nn as nn
from torch.optim import SGD
import math
import numpy as np

1. データセットを作ろう

何はともあれ学習をするにはデータセットがないと始まらない。ここではsin波という簡単なデータを使うため、簡単に実装ができそうです。
ということで、次のようなDatasetを作る関数を作りましょう。

import math

def mkDataSet(data_size, data_length=50, freq=60.):
    """
    params
      data_size : データセットのサイズ
      data_length : 各データの時系列長
      freq : 周波数
    returns
      train_x : トレーニングデータ(t=1,2,...,size-1の値)
      train_t : トレーニングデータのラベル(t=sizeの値)
    """
    train_x = []
    train_t = []

    for offset in range(data_size):
        train_x.append([[math.sin(2 * math.pi * (offset + i) / freq)] for i in range(data_length)])
        train_t.append([math.sin(2 * math.pi * (offset + data_length) / freq)])

    return train_x, train_t

かなーり複雑なことをしているように見えますが、全然難しくありません。
train_x, train_tはそれぞれトレーニングデータとラベルを表しています。つまり、

train_x[0] = [[t_0のときのsinの値], [t_1のときのsinの値], ..., [t_50のときのsinの値]]
train_x[1] = [[t_1のときのsinの値], [t_2のときのsinの値], ..., [t_51のときのsinの値]]
.
.
.
train_x[N] = [[t_Nのときのsinの値], [t_N+1のときのsinの値], ..., [t_N+50のときのsinの値]]
train_t = [[t_51のときのsinの値], [t_52のときのsinの値], ... ]

というデータになっているだけです。train_xは一言で言えば「時系列データ(値の配列)の配列」です。train_x[i]に対して、正解ラベルがtrain_t[i]になっているのですね。

ここで、視力のいい方は「ん?」と思われたかもしれません。なぜ、train_xやdata_xの各系列の中身が配列になっているのだろうと。何が言いたいかというと、「時刻t_iのときのsinの値」はどう考えても一つのスカラー値(0.234とか-0.761とか)なわけです。上の式ではわざわざそれを[]で囲んでいる、つまり[[0.123], [0.315], ... ]]みたいなよくわからんことをやっているわけです。

これにはちゃんと理由があります。今回のサンプルでは各時刻における入力は1つのスカラー値(つまり、t_iにおけるsinの値)なわけですが、一般にはスカラー値が入力されるとは限りません。
例えば、単語のembedding(単語をベクトル化したもの)といった、ベクトルを入力する場合もあるわけです。つまり、各時刻の入力が[0.123]ではなく、[0.1, 0.2, 0.5, 0.4, ..., 0.8]のようなベクトルになり得るということです(というかむしろ一般的にはベクトルを入力します)。

pytorchのLSTMでは、入力テンソル*1は3次元であると規定されており、具体的なサイズはSequence_Length x Batch_Size x Vector_Sizeを入れるようにとの決まりがあります。
今回の例では、Sequnece_size(入力時系列長)= 50、Vector_Size(入力ベクトルサイズ)= 1のテンソルを入力することになります(Batch_Sizeは後で言及します)。

とまあ、このような理由で一つのスカラー値をいちいち配列にしなくちゃいけなかったわけです。

さて、これでDatasetがどう言う風に作られているかはご理解頂けたと思います。ただ、今のままでは綺麗なsin波が出るだけなので、少しノイズを乗せてみましょう。

import math
import numpy as np

def mkDataSet(data_size, data_length=50, freq=60., noise=0.02):
    """
    params
      data_size : データセットサイズ
      data_length : 各データの時系列長
      freq : 周波数
      noise : ノイズの振幅
    returns
      train_x : トレーニングデータ(t=1,2,...,size-1の値)
      train_t : トレーニングデータのラベル(t=sizeの値)
    """
    train_x = []
    train_t = []

    for offset in range(data_size):
        train_x.append([[math.sin(2 * math.pi * (offset + i) / freq) + np.random.normal(loc=0.0, scale=noise)] for i in range(data_length)])
        train_t.append([math.sin(2 * math.pi * (offset + data_length) / freq)])

    return train_x, train_t

ノイズは、正規分布に従って乗せました。
ノイズ無しverとノイズありverのグラフをプロットするとこんな感じです。オレンジの点は、次に予測すべき点です。

f:id:hilinker:20180622185115p:plain
ノイズなし
f:id:hilinker:20180622185159p:plain
ノイズあり

これでデータセットの用意は終わったので、次に行きませう。

2.モデルを作ろう

pytorchでは自分でモデルを宣言します。どういう層を使って、どういう順番で計算してやるかを自分で組み立てるわけです。
では、早速実装の方を見ていきましょう。

import torch
import torch.nn as nn

class Predictor(nn.Module):
    def __init__(self, inputDim, hiddenDim, outputDim):
        super(Predictor, self).__init__()

        self.rnn = nn.LSTM(input_size = inputDim,
                            hidden_size = hiddenDim,
                            batch_first = True)
        self.output_layer = nn.Linear(hiddenDim, outputDim)
    
    def forward(self, inputs, hidden0=None):
        output, (hidden, cell) = self.rnn(inputs, hidden0) #LSTM層
        output = self.output_layer(output[:, -1, :]) #全結合層

        return output

まず、__init__の中で、層を宣言しています。nn.LSTMというモジュールに適当なパラメータを入れてLSTM層を宣言し、LSTMの出力をnn.Linear(全結合層)で適切なoutputDimサイズにまとめています。 こんなまどろっこしい書き方をしていますが、ここでは、inputDim=1outputDim=1です(各時刻の入力はsinの値ただ1つ、出力はsinの値ただ1つなので)。
LSTMのinput_sizehidden_sizeは分かりにくいのですが、input_sizeは各時刻における入力ベクトルのサイズ、hidden_sizeはLSTMの隠れ層ベクトルのサイズを表します。

また、batch_first=Trueというオプションは結構大事です。これは、入力するデータ系列の次元がbatch_sizeが頭に来るよということを指定しています。日本語だけだと分かりにくいので実際に次元とサイズの数式を見れば分かりやすいでしょう。
本来LSTMの入力は先述したように、Sequence_Length x Batch_Size x Vector_Sizeの形式になっていなければなりません。しかし、batch_first=Trueとすると、Batch_Size x Sequence_Length x Vector_Sizeの形式で入力することができるようになります。この辺りは結構ややこしく、次元を合わせないと動かないので注意しましょう。

次に、fowardメソッドを見ていきます。pytorchでは、モデル内のfowardメソッドにどのように計算を伝搬していくかを記述します。backwardforwardの記述を読み取って、pytorchがいい感じにやってくれるので記述する必要はありません(ここがpytorchの最高なところ)。
forwardは入力であるinputsおよび、初期状態を受け取ります。ここで初期状態は、(hidden0, cell0)というタプルです。つまりは隠れ層とセルの初期値を受け取るわけですね。ここでは面倒くさいので、まとめてhidden0としてしまっていますが、初期値をちゃんと指定したい場合は隠れ層とセルの初期値をタプルの形で渡す必要があるので注意が必要です。
さて、まず、inputsは時系列データ全体を丸ごと入れます。つまり、こちらでタイムステップ単位でforループを回したりする必要がありません。ここはChainerなどとの違いです。pytorchが一気に処理をしてくれるわけですね。もちろん、for文で時系列データを一時刻ずつ入れることもできますが、今回のように予め入力する時系列データが決まっている場合はまとめて入れてしまうのがオススメです。
また、hidden0は上述したとおり、隠れ層とセルの初期値です。pytorchではLSTMにNoneを渡すと、ゼロベクトルが渡されたものとして処理してくれます。
そうして、__init__で宣言したLSTM(self.rnn)にinputshidden0を入れると、出力が2つ返ってきます。これは、LSTMの最後の層の出力、隠れ層の状態・セルの状態を時系列に並べたもののタプルになります。
そのうち欲しいのは、outputの一番最後の値だけなので、output[:, -1, :]で時系列の最後の値(ここではベクトル)を取り出します。これをLinear(self.output_layer)にぶち込むことで、サイズ1のベクトル(要するに[0.123]みたいなやつ)を得ます。

...…と、ここまで一息に説明しましたが、ベクトルの流れについて改めて丁寧に次元を追ってみましょう。

input (batch_size x sequence_size x 1)
--LSTM-->
output (batch_size x sequence_size x hidden_size)
--(一部を取り出す)--> outout[:, -1, :] (batch_size x 1 x hidden_size)
--Linear-->
output (batch_size x 1 x 1)

こうしてみると、少しやっていることが分かりやすいのではないでしょうか。

ということで、モデルと計算手順の宣言がこれで完了しました。

3. traningしよう

ようやく学習を始めます。
まずは、上記で作った関数を使って、Dataset作りとmodelの宣言をしましょう。

from torch.optim import SGD

def main():
    training_size = 100 #traning dataのデータ数
    epochs_num = 1000 #traningのepoch回数
    hidden_size = 5 #LSTMの隠れ層の次元数

    train_x, train_t = mkDataSet(training_size) #Datasetの作成

    model = Predictor(1, hidden_size, 1) #modelの宣言

    criterion = nn.MSELoss() #評価関数の宣言
    optimizer = SGD(model.parameters(), lr=0.01) #最適化関数の宣言

if __name__ == '__main__':
    main()

pytorchでは、評価関数(Lossを計算する関数)を選べたり、最適化関数(重みの最適化を計算する関数)を選べます。それぞれここでは、MSELossSGDを使っています。optimizerには、model.parameters()で、modelのもつ更新(学習)しなければならないパラメータを参照として渡しています。

次に実際にtrainingを行う部分を見ていきます。

    for epoch in range(epochs_num):
        # training
        running_loss = 0.0
        training_accuracy = 0.0
        for i in range(training_size):
            optimizer.zero_grad()
            data = torch.tensor([train_x[i]])
            label = torch.tensor([train_t[i]])

            output = model(data)

            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            running_loss += loss.data[0]
            training_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1) #outputとlabelの誤差が0.1以内なら正しいとみなす。
        traning_accuracy /= traning_size
        print('%d loss: %.3f, training_accuracy: %.5f' % (epoch + 1, running_loss, training_accuracy))

ここはかなり分かりやすいと思います。外側のfor文ではepoch数だけ回すようにしており、実際のtrainingはもう1つ内側のfor文で行います。内側のfor文は、Datasetのサイズだけ回しています。
まずoptimizer.zero_grad()を呼ぶことで、optimizerに蓄積された勾配を全てゼロにします。誤差逆伝播法では計算のために勾配を記憶するため、これをしないと前回の勾配がどんどん積み重なっていき、全然何を学習しているのか分からなくなってしまいます。

次に、training dataを、torch.tensor()でpytorchが扱えるようにテンソルしています。[train_x[i]]と配列にしているのは、train_x[i]の次元は今2次元(sequence_length x 1)なので、batch_size(今は1)分の次元を足してやる必要があるからです。今はbatch_sizeが1なので意味がわからんことになっていますが、後ほどバッチ化するときに分かると思います。

そして、dataをmodelに入力して出力outputを得ます。これを、先ほど宣言したcriterionにlabelと一緒に入れてやると、outputと教師であるlabelとの違いをLossとして計算してくれます。あとは、そのLossについてloss.backward()とすると誤差をbackwardすることができ、optimizer.step()でoptmizerで指定して方法でパラメータの最適化を行います。以下これを繰り返すことで学習が進みます。

試しにプログラムを動かしてみましょう。正しく実装できていれば、lossが減少してaccuracyが上昇していくのが分かるはずです(割と遅いですが)。

4.バッチ化しよう

普通にやったんじゃ遅すぎるのでバッチ化しましょう。バッチ化するとCPUでも多少速くなります。GPUだと爆速になります。
ということで、Datasetからランダムに抽出してバッチを返す関数を用意しましょう。

def mkRandomBatch(train_x, train_t, batch_size=10):
    """
    train_x, train_tを受け取ってbatch_x, batch_tを返す。
    """
    batch_x = []
    batch_t = []

    for _ in range(batch_size):
        idx = np.random.randint(0, len(train_x) - 1)
        batch_x.append(train_x[idx])
        batch_t.append(train_t[idx])
    
    return torch.tensor(batch_x), torch.tensor(batch_t)

Datasetを受け取って適当に選んだbatchを返すだけです。めっちゃシンプル。

これを使ったtraningの実装が次のようになります。

    batch_size = 10

    for epoch in range(epochs_num):
        # training
        running_loss = 0.0
        training_accuracy = 0.0
        for i in range(int(training_size / batch_size)):
            optimizer.zero_grad()

            data, label = mkRandomBatch(train_x, train_t, batch_size)

            output = model(data)

            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            running_loss += loss.data[0]
            training_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1) #outputとlabelの誤差が0.1以内なら正しいとみなす。
        traning_accuracy /= traning_size
        print('%d loss: %.3f, training_accuracy: %.5f' % (epoch + 1, running_loss, training_accuracy))

以上で、実装はほぼ終わりです。

最後に

以下に、validationも書き加えたソースコードの全体を載せます。恐らくこれをそのままコピペすれば動くはずなので、ぜひ参考にしてください。

import torch
import torch.nn as nn
from torch.optim import SGD
import math
import numpy as np

class Predictor(nn.Module):
    def __init__(self, inputDim, hiddenDim, outputDim):
        super(Predictor, self).__init__()

        self.rnn = nn.LSTM(input_size = inputDim,
                            hidden_size = hiddenDim,
                            batch_first = True)
        self.output_layer = nn.Linear(hiddenDim, outputDim)
    
    def forward(self, inputs, hidden0=None):
        output, (hidden, cell) = self.rnn(inputs, hidden0)
        output = self.output_layer(output[:, -1, :])

        return output

def mkDataSet(data_size, data_length=50, freq=60., noise=0.00):
    """
    params\n
    data_size : データセットサイズ\n
    data_length : 各データの時系列長\n
    freq : 周波数\n
    noise : ノイズの振幅\n
    returns\n
    train_x : トレーニングデータ(t=1,2,...,size-1の値)\n
    train_t : トレーニングデータのラベル(t=sizeの値)\n
    """
    train_x = []
    train_t = []

    for offset in range(data_size):
        train_x.append([[math.sin(2 * math.pi * (offset + i) / freq) + np.random.normal(loc=0.0, scale=noise)] for i in range(data_length)])
        train_t.append([math.sin(2 * math.pi * (offset + data_length) / freq)])

    return train_x, train_t

def mkRandomBatch(train_x, train_t, batch_size=10):
    """
    train_x, train_tを受け取ってbatch_x, batch_tを返す。
    """
    batch_x = []
    batch_t = []

    for _ in range(batch_size):
        idx = np.random.randint(0, len(train_x) - 1)
        batch_x.append(train_x[idx])
        batch_t.append(train_t[idx])
    
    return torch.tensor(batch_x), torch.tensor(batch_t)

def main():
    training_size = 10000
    test_size = 1000
    epochs_num = 1000
    hidden_size = 5
    batch_size = 100

    train_x, train_t = mkDataSet(training_size)
    test_x, test_t = mkDataSet(test_size)

    model = Predictor(1, hidden_size, 1)
    criterion = nn.MSELoss()
    optimizer = SGD(model.parameters(), lr=0.01)

    for epoch in range(epochs_num):
        # training
        running_loss = 0.0
        training_accuracy = 0.0
        for i in range(int(training_size / batch_size)):
            optimizer.zero_grad()

            data, label = mkRandomBatch(train_x, train_t, batch_size)

            output = model(data)

            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            running_loss += loss.data[0]
            training_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1)

        #test
        test_accuracy = 0.0
        for i in range(int(test_size / batch_size)):
            offset = i * batch_size
            data, label = torch.tensor(test_x[offset:offset+batch_size]), torch.tensor(test_t[offset:offset+batch_size])
            output = model(data, None)

            test_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1)
        
        training_accuracy /= training_size
        test_accuracy /= test_size

        print('%d loss: %.3f, training_accuracy: %.5f, test_accuracy: %.5f' % (
            epoch + 1, running_loss, training_accuracy, test_accuracy))


if __name__ == '__main__':
    main()

*1:テンソルとは、高次元の配列っぽいものです(こんなことを言ったら怒られそう)