pytorch で LSTM に入門したい...したくない?

この記事はなに

この記事は、PyTorch で LSTM を使ってみようという入門記事になります。
pytorch で LSTM を使おうと思った時に、英語のドキュメントは無理。 加えて、巷に転がってるチュートリアルや入門記事はいきなり言語処理の実装をしていて、ただpytorchでLSTMを使ってみたい人にとってはハードルが高い。 さらにさらに、落ちているプログラムが動かないこともザラ。

ということで、分かりやすく、サンプルプログラムが動く、かつ例がミニマムであることを心がけて本記事を書きました。
本記事ではガッツリLSTMの解説をすることはしません。期待していた人はごめんなさい。あくまで pytorch で LSTM を扱えるようにすることが主目的です。

2021.2 追記:加筆や内容の修正を行いました。

この記事で分かること、分からないこと

分かること
  • pytorch での LSTM の使い方。
分からないこと
  • LSTM や RNN などモデルの詳細。

対象

できるだけ分かりやすく書いていますが、次の知識がある人を対象にします。

  • ニューラルネットやRNN、LSTMそれ自体に関する基礎的な知識。
  • python3 の基礎文法。
  • 日本語。

実装すること

実装するのは、「sin波の予測」です。直前の何ステップかを入力することで、次のsinの値が何になるかを予想しようというものになります。
より具体的には、LSTM にノイズ混じりのsin波の値を50ステップ分(t = t_1, t_2, ... , t_50)入れて、次の値(t = t_51)を最終出力として得るようなモデルを、教師あり学習で学習します。
え、題材がつまんない? 面白いことがやりたいなら seq2seq の機械翻訳機でも作りましょう。

環境

念のため僕の実行環境を記しておきます。

  • OS:mac
  • python3.6.2
  • anaconda3-5.0.0
  • pytorch 0.4.0

2021.2 追記:古いバージョンの pytorch を使っていますが、複雑なことをしていないので最新のバージョンでも同じコードで動作すると思います。

LSTMって? ああ!

全く説明しないものあれなので、一応軽くLSTMの解説をしたいと思います。 と言っても、僕が説明するよりも分かりやすいであろう記事がいっぱい転がっているのですが。オススメはここです。図もいっぱい載っていてめちゃくちゃ分かりやすいと思います。
そんな記事読んでられねーよ! って人のために簡単に説明しますと、LSTMというのは頭のいいRNNです。
まず、RNNというのは好きな長さの時系列データをぶち込んで、時系列情報をいい感じに学習できるNNです。 しかし、その特性上、時系列データが長くなると、時系列で早くに読み込まれたデータの情報が後のデータに埋もれて消えてしまったり、誤差伝搬時に誤差(勾配)が爆発・消失してしまうという問題がありました。

それを(たぶんおそらくきっと)解決してくれるのがLSTMです。 LSTMは「(記憶)セル」と呼ばれるモジュールを持っており、このセルが「何を覚えておくべきか」=「何を忘れるべきか」、「何を新しく覚えるべきか」、「何を出力として出すべきか」なども一緒に学習してくれます。

pytorch ではそんな複雑っぽい LSTM をたった1行で実装させてくれるので、LSTM の原理は最悪分からなくても何とかなります。

早速実装

0.はじめに

一番最初に、必要なimport文を列挙しておきます。これをソースコードの先頭にコピペしておけば、「なんかimport足りないんだけど...」となることは無いと思います。
また、本記事の一番後ろに最終的なソースコードの全文を載せてあるので、ぜひそれを見ながら読んでみてください。

import torch
import torch.nn as nn
from torch.optim import SGD
import math
import numpy as np

1. データセットを作ろう

何はともあれ学習をするにはデータセットがないと始まらない。ここではsin波という簡単なデータを使うため、簡単に実装ができそうです。
ということで、次のようなDatasetを作る関数を作りましょう。

import math

def mkDataSet(data_size, data_length=50, freq=60.):
    """
    params
      data_size : データセットのサイズ
      data_length : 各データの時系列長
      freq : 周波数
    returns
      train_x : トレーニングデータ(t=1,2,...,size-1の値)
      train_t : トレーニングデータのラベル(t=sizeの値)
    """
    train_x = []
    train_t = []

    for offset in range(data_size):
        train_x.append([[math.sin(2 * math.pi * (offset + i) / freq)] for i in range(data_length)])
        train_t.append([math.sin(2 * math.pi * (offset + data_length) / freq)])

    return train_x, train_t

かなーり複雑なことをしているように見えますが、全然難しくありません。
train_x, train_tはそれぞれトレーニングデータとラベルを表しています。つまり、

train_x[0] = [[t_0のときのsinの値], [t_1のときのsinの値], ..., [t_50のときのsinの値]]
train_x[1] = [[t_1のときのsinの値], [t_2のときのsinの値], ..., [t_51のときのsinの値]]
.
.
.
train_x[N] = [[t_Nのときのsinの値], [t_N+1のときのsinの値], ..., [t_N+50のときのsinの値]]
train_t = [[t_51のときのsinの値], [t_52のときのsinの値], ... ]

というデータになっているだけです。train_xは一言で言えば「時系列データ(値の配列)の配列」です。train_x[i]に対して、正解ラベルがtrain_t[i]になっているのですね。

ここで、視力の高い方は「ん?」と思われたかもしれません。なぜ、train_x や data_x の各系列の中身が配列になっているのだろうと。 何が言いたいかというと、「時刻t_iのときのsinの値」はどう考えても一つのスカラー値(0.234とか-0.761とか)なわけです。 上の式ではわざわざそれを [] で囲んでいる、つまり [[0.123], [0.315], ... ]] みたいなよくわからんことをやっているわけです。

これにはちゃんと理由があります。今回のサンプルでは各時刻における入力は1つのスカラー値(つまり、t_iにおけるsinの値)なわけですが、一般にはスカラー値が入力されるとは限りません。
例えば、単語の embedding(単語をベクトル化したもの)といった、ベクトルを入力する場合もあります。 つまり、各時刻の入力が [0.123] ではなく、[0.1, 0.2, 0.5, 0.4, ..., 0.8] のようなベクトルになり得るということです(というかむしろ一般的にはベクトルを入力します)。

pytorchのLSTMでは、入力テンソル*1 は3次元であると規定されており、具体的なサイズはSequence_Length x Batch_Size x Vector_Sizeを入れるようにとの決まりがあります。
今回の例では、Sequnece_size(入力時系列長)= 50、Vector_Size(入力ベクトルサイズ)= 1のテンソルを入力することになります(Batch_Size については後で言及します)。

とまあ、このような理由で一つのスカラー値をいちいち配列にしなくちゃいけなかったわけです。

さて、これでDatasetがどう言う風に作られているかはご理解頂けたと思います。 ただ、今のままでは綺麗なsin波が出るだけなので、少しノイズを乗せてみましょう。

import math
import numpy as np

def mkDataSet(data_size, data_length=50, freq=60., noise=0.02):
    """
    params
      data_size : データセットサイズ
      data_length : 各データの時系列長
      freq : 周波数
      noise : ノイズの振幅
    returns
      train_x : トレーニングデータ(t=1,2,...,size-1の値)
      train_t : トレーニングデータのラベル(t=sizeの値)
    """
    train_x = []
    train_t = []

    for offset in range(data_size):
        train_x.append([[math.sin(2 * math.pi * (offset + i) / freq) + np.random.normal(loc=0.0, scale=noise)] for i in range(data_length)])
        train_t.append([math.sin(2 * math.pi * (offset + data_length) / freq)])

    return train_x, train_t

ノイズは、正規分布に従って乗せました。
ノイズ無しverとノイズありverのグラフをプロットするとこんな感じです。オレンジの点は、次に予測すべき点です。

f:id:hilinker:20180622185115p:plain
ノイズなし

f:id:hilinker:20180622185159p:plain
ノイズあり

これでデータセットの用意は終わったので、次に行きませう。

2.モデルを作ろう

pytorch では自分でモデルを宣言します。 モデルの宣言は class を定義することで行います。 クラスを1つのモデルとみなして、その中でどういう層を使って、どういう順番で計算してやるかを自分で組み立てるわけです。

では、早速実装の方を見ていきましょう。

import torch
import torch.nn as nn

class Predictor(nn.Module):
    def __init__(self, inputDim, hiddenDim, outputDim):
        super(Predictor, self).__init__()

        self.rnn = nn.LSTM(input_size = inputDim,
                            hidden_size = hiddenDim,
                            batch_first = True)
        self.output_layer = nn.Linear(hiddenDim, outputDim)
    
    def forward(self, inputs, hidden0=None):
        output, (hidden, cell) = self.rnn(inputs, hidden0) #LSTM層
        output = self.output_layer(output[:, -1, :]) #全結合層

        return output

Predictor と言っているのが定義するモデルです。 この名前はなんでも構いません。君だけのオリジナルクラス名を付けましょう。

まず、__init__の中で層を宣言しています。 Pytorch では基本的に __init__ 内でモデルの持つ層を宣言します。 このモデルは2つの層を持っており、nn.LSTMnn.Linear を持っていますね。 nn.LSTM は LSTM の層を、nn.Linear は全結合層を表します。

汎用性のためにモデルに引数を渡すまどろっこしい書き方をしていますが、実際は inputDim=1outputDim=1 です(各時刻の入力はsinの値ただ1つ、出力はsinの値ただ1つなので)。
LSTMの input_sizehidden_size は分かりにくいのですが、input_size は各時刻における入力ベクトルのサイズ、hidden_size は LSTM の隠れ層ベクトルのサイズを表します。
こうして、パラメータを渡すことで層を定義するんですね。 渡すパラメータは、Pytorch の公式ドキュメントに詳しく書かれているので、英語はつらいですが頑張って雰囲気を読み取りましょう。

パラメータの batch_first=True というオプションは結構大事です。 これは、入力するデータ系列の次元が batch_size が頭に来るよということを指定しています。 日本語だけだと分かりにくいので、実際に次元とサイズの数式を見てみましょう。
本来 LSTM の入力テンソルの形式は、Sequence_Length x Batch_Size x Vector_Size になっていなければなりません。 しかし、batch_first=True とすると、Batch_Size x Sequence_Length x Vector_Sizeの形式で入力することができるようになります。 batch の次元を真ん中にするか頭に持ってくるかは完全に好みです。 自分がより分かりやすい方にしましょう。 この辺りは結構ややこしく、次元を合わせないと動かないので注意してください。

次に、foward メソッドを見ていきます。pytorchでは、モデル内のfowardメソッドにどのように計算を伝搬していくかを記述します。 backwardforward の記述を読み取って、pytorchがいい感じにやってくれるので記述する必要はありません(ここがpytorchの最高なところ)。
forward は入力である inputs および、初期状態を受け取ります ((一応、forward は任意の引数を受け取れるのですが、基本的には入力や初期状態を渡すものととらえてしまって良いでしょう。))。 ここで初期状態は、(hid0, cell0)というタプルです。 つまりは隠れ層とセルのそれぞれの初期値を受け取るわけですね。 ここでは面倒くさいので、まとめて hidden0 としてしまっていますが、初期値をちゃんと指定したい場合は隠れ層とセルの初期値をタプルの形で渡す必要があるので注意が必要です。

さて、まず、inputs は時系列データ全体を丸ごと入れます。 つまり、こちらでタイムステップ単位で for ループを回したりする必要がありません。 ここは Chainer などとの違いです。 pytorch が一気に処理をしてくれるわけですね。 もちろん、for文で時系列データを一時刻ずつ入れることもできますが、今回のように予め入力する時系列データが決まっている場合はまとめて入れてしまうのがオススメです。
また、hidden0 は上述したとおり、隠れ層とセルの初期値です。 pytorch では LSTM に None を渡すと、ゼロベクトルが渡されたものとして処理してくれます。

そうして、__init__ で宣言したLSTM(self.rnn)に inputshidden0 を入れると、返り値が2つ返ってきます。 これは、「LSTM の各時刻の出力の系列」、「最後の隠れ層の状態・最後のセルの状態のタプル」です。
そのうち欲しいのは、output の一番最後の値だけなので、output[:, -1, :] で時系列の最後の値(ここではベクトル)を取り出します。 これを Linear(self.output_layer) にぶち込むことで、サイズ1のベクトル(要するに [0.123] みたいなやつ)を得ます。

...…と、ここまで一息に説明しましたが、ベクトルの流れについて改めて丁寧に次元を追ってみましょう。

input (batch_size x sequence_size x 1)  

--LSTM-->  

output (batch_size x sequence_size x hidden_size)  

--(一部を取り出す)-->

outout[:, -1, :] (batch_size x 1 x hidden_size)  

--Linear-->  

output (batch_size x 1 x 1)  

こうしてみると、少しやっていることが分かりやすいのではないでしょうか。

ということで、モデルと計算手順の宣言がこれで完了しました。

3. traningしよう

ようやく学習を始めます。 まずは、上記で作った関数を使って、Dataset作りとmodelの宣言をしましょう。

from torch.optim import SGD

def main():
    training_size = 100 #traning dataのデータ数
    epochs_num = 1000 #traningのepoch回数
    hidden_size = 5 #LSTMの隠れ層の次元数

    train_x, train_t = mkDataSet(training_size) #Datasetの作成

    model = Predictor(1, hidden_size, 1) #modelの宣言

    criterion = nn.MSELoss() #評価関数の宣言
    optimizer = SGD(model.parameters(), lr=0.01) #最適化関数の宣言

if __name__ == '__main__':
    main()

pytorchでは、評価関数(Lossを計算する関数)を選べたり、最適化関数(重みの最適化を計算する関数)を選べます。 それぞれここでは、MSELossSGDを使っています。 optimizerには、model.parameters() で、model のもつ更新(学習)しなければならないパラメータを参照として渡しています。

次に実際にtrainingを行う部分を見ていきます。

    for epoch in range(epochs_num):
        # training
        running_loss = 0.0
        training_accuracy = 0.0
        for i in range(training_size):
            optimizer.zero_grad()
            data = torch.tensor([train_x[i]])
            label = torch.tensor([train_t[i]])

            output = model(data)

            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            running_loss += loss.data[0]
            training_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1) #outputとlabelの誤差が0.1以内なら正しいとみなす。
        traning_accuracy /= traning_size
        print('%d loss: %.3f, training_accuracy: %.5f' % (epoch + 1, running_loss, training_accuracy))

ここはかなり分かりやすいと思います。外側の for 文では epoch 数だけ回すようにしており、実際の training はもう1つ内側の for 文で行います。 内側の for 文は、Dataset のデータ数だけ回しています。
全てのデータを1巡したら1epoch ですね。

各エポックの処理を見ていきましょう。

まず optimizer.zero_grad() を呼ぶことで、optimizer に蓄積された勾配を全てゼロにします。 誤差逆伝播法では計算のために勾配を記憶するため、これをしないと前回の勾配がどんどん積み重なっていき、全然何を学習しているのか分からなくなってしまいます。

次に、training dataを、torch.tensor() で pytorch が扱えるようにテンソルしています。 [train_x[i]]と配列にしているのは、train_x[i]の次元は今2次元(sequence_length x 1)なので、batch_size(今は1)分の次元を足してやる必要があるからです。 今は batch_size が 1 なので意味がわからんことになっていますが、後ほどバッチ化するときに分かると思います。

そして、data を model に入力して出力 output を得ます。これを、先ほど宣言した criterion に label と一緒に入れてやると、output と教師である label との違いをロスとして計算してくれます。 あとは、そのロスについて loss.backward() とすると誤差を backward することができ、optimizer.step()で optmizer で指定した方法でパラメータの最適化を行います。 以下これを繰り返すことで学習が進みます。

試しにプログラムを動かしてみましょう。正しく実装できていれば、loss が減少して accuracy が上昇していくのが分かるはずです(割と遅いですが)。

4.バッチ化しよう

普通にやったんじゃ遅すぎるのでバッチ化しましょう。 バッチ化するとCPUでも多少速くなります。GPUだと爆速になります。
ということで、Datasetからランダムに抽出してバッチを返す関数を用意しましょう。

def mkRandomBatch(train_x, train_t, batch_size=10):
    """
    train_x, train_tを受け取ってbatch_x, batch_tを返す。
    """
    batch_x = []
    batch_t = []

    for _ in range(batch_size):
        idx = np.random.randint(0, len(train_x) - 1)
        batch_x.append(train_x[idx])
        batch_t.append(train_t[idx])
    
    return torch.tensor(batch_x), torch.tensor(batch_t)

Datasetを受け取って適当に選んだbatchを返すだけです。めっちゃシンプル。

これを使ったtraningの実装が次のようになります。

    batch_size = 10

    for epoch in range(epochs_num):
        # training
        running_loss = 0.0
        training_accuracy = 0.0
        for i in range(int(training_size / batch_size)):
            optimizer.zero_grad()

            data, label = mkRandomBatch(train_x, train_t, batch_size)

            output = model(data)

            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            running_loss += loss.data[0]
            training_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1) #outputとlabelの誤差が0.1以内なら正しいとみなす。
        traning_accuracy /= traning_size
        print('%d loss: %.3f, training_accuracy: %.5f' % (epoch + 1, running_loss, training_accuracy))

以上で、実装はほぼ終わりです。

最後に

以下に、validationも書き加えたソースコードの全体を載せます。恐らくこれをそのままコピペすれば動くはずなので、ぜひ参考にしてください。

import torch
import torch.nn as nn
from torch.optim import SGD
import math
import numpy as np

class Predictor(nn.Module):
    def __init__(self, inputDim, hiddenDim, outputDim):
        super(Predictor, self).__init__()

        self.rnn = nn.LSTM(input_size = inputDim,
                            hidden_size = hiddenDim,
                            batch_first = True)
        self.output_layer = nn.Linear(hiddenDim, outputDim)
    
    def forward(self, inputs, hidden0=None):
        output, (hidden, cell) = self.rnn(inputs, hidden0)
        output = self.output_layer(output[:, -1, :])

        return output

def mkDataSet(data_size, data_length=50, freq=60., noise=0.00):
    """
    params\n
    data_size : データセットサイズ\n
    data_length : 各データの時系列長\n
    freq : 周波数\n
    noise : ノイズの振幅\n
    returns\n
    train_x : トレーニングデータ(t=1,2,...,size-1の値)\n
    train_t : トレーニングデータのラベル(t=sizeの値)\n
    """
    train_x = []
    train_t = []

    for offset in range(data_size):
        train_x.append([[math.sin(2 * math.pi * (offset + i) / freq) + np.random.normal(loc=0.0, scale=noise)] for i in range(data_length)])
        train_t.append([math.sin(2 * math.pi * (offset + data_length) / freq)])

    return train_x, train_t

def mkRandomBatch(train_x, train_t, batch_size=10):
    """
    train_x, train_tを受け取ってbatch_x, batch_tを返す。
    """
    batch_x = []
    batch_t = []

    for _ in range(batch_size):
        idx = np.random.randint(0, len(train_x) - 1)
        batch_x.append(train_x[idx])
        batch_t.append(train_t[idx])
    
    return torch.tensor(batch_x), torch.tensor(batch_t)

def main():
    training_size = 10000
    test_size = 1000
    epochs_num = 1000
    hidden_size = 5
    batch_size = 100

    train_x, train_t = mkDataSet(training_size)
    test_x, test_t = mkDataSet(test_size)

    model = Predictor(1, hidden_size, 1)
    criterion = nn.MSELoss()
    optimizer = SGD(model.parameters(), lr=0.01)

    for epoch in range(epochs_num):
        # training
        running_loss = 0.0
        training_accuracy = 0.0
        for i in range(int(training_size / batch_size)):
            optimizer.zero_grad()

            data, label = mkRandomBatch(train_x, train_t, batch_size)

            output = model(data)

            loss = criterion(output, label)
            loss.backward()
            optimizer.step()

            running_loss += loss.data[0]
            training_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1)

        #test
        test_accuracy = 0.0
        for i in range(int(test_size / batch_size)):
            offset = i * batch_size
            data, label = torch.tensor(test_x[offset:offset+batch_size]), torch.tensor(test_t[offset:offset+batch_size])
            output = model(data, None)

            test_accuracy += np.sum(np.abs((output.data - label.data).numpy()) < 0.1)
        
        training_accuracy /= training_size
        test_accuracy /= test_size

        print('%d loss: %.3f, training_accuracy: %.5f, test_accuracy: %.5f' % (
            epoch + 1, running_loss, training_accuracy, test_accuracy))


if __name__ == '__main__':
    main()

*1:テンソルとは、高次元の配列っぽいものです(こんなことを言ったら怒られそう)