nn.DataParallelを使えばデータをdeviceに送らなくていい

この投稿はrioyokotalab Advent Calendar 2020 13日目の投稿です。

adventar.org

nn.DataParallel

PyTorchでマルチGPUを使った機械学習を行いたい場合、

  • nn.DataParalllel
  • DistributedDataParallel

の二種類の方法があります。

DistributedDataParallelはマルチプロセスで動かせるので、データの読み込みやモデルのアップデートなども並列に行え、高い並列化性能を出すことが可能です。さらに、複数のコンピュータを並列で動かして学習することも可能で、大規模なマルチノード並列実行を行うことが可能です。

nn.DataParallelは、これらの利点を捨てる代わりに、特にコード変更を行うことなく、(新たにバグが生まれる可能性や、並列時の通信によるバグをふむ可能性を回避できる)マルチGPUの恩恵を受けることができるようになる機能です。なので、PyTorch的には、DistributerdDataParallelを推奨しているようですが、捨てられない機能の一つとなっています。

nn.DataParallelはシングルプロセスでデータの読み込み、デバイス間通信などを行いますが、データは適宜、それぞれのGPUに転送してくれて、それぞれのGPUでforward, backwardを行ってくれます。

バイス転送の管理を楽にする

nn.DataParallelは、

model_ = Model()
model = nn.DataParallel(model_, device_ids=[0, 1])

のように元々のmodelをwrapして使います。こうしてできたmodelはマルチGPUで動くので、

for data in train_loader:
    optimizer.zero_grad()
    data = data.to('cuda:0')
    out = model_(data)
    ...

などとして、cpuから特定のdeviceを指定して転送することができず、代わりに、

for data in train_loader:
    optimizer.zero_grad()
    out = model(data)
    ...

というふうに、dataの転送先を特に明示せず、実行することができます。実際には、modelの内部で、

  1. ミニバッチを適当にn等分して、それぞれのデバイスに転送
  2. それぞれのデバイスでのforwardを実行
  3. 出力を集約し、特定のdevice上に集める
  4. その他計算グラフに関わる計算を実行
  5. backwardパスに従って、model内部で、出力勾配を配る
  6. それぞれのdeviceで、backwardを実行

といった処理が行われているようです。

どういった場面で使いたいか

学習アルゴリズム中に、複数のモデルが出る場合などに便利です。それぞれのモデルをメモリの関係で、別々のデバイス上で学習させたいが、デバイスも複数になって、データをどのデバイスに転送しないといけないか、不透明になってきた場合、モデルが乗っているデバイスに転送するというルールの元、特に明示的に書くことなくデバイスへの転送が行えます。

例えば、

  • GANのgeneratorとdiscriminatorを分ける
  • Cross Validationで並列に複数モデルを学習させる

など。より、難しいアルゴリズムを考えれば、考えるほど、こういったデバイスの管理は煩雑になってくると思います。

まとめ

別に、楽にしたいだけで、これしか選択肢がない話でもないし、普通に、デバイスをちゃんと変数で持ってればいい話でもあるし...

普段は使ってないですが、たまに、面白半分で使ってます。

Jupyter Notebookを非インタラクティブに実行する

この投稿はrioyokotalab Advent Calendar 2020 11日目の投稿です。

adventar.org

リモートサーバーのJupyter Notebookを長時間動かしたい

Jupyter Notebookは

  • リモートサーバー上に配置したデータをダウンロードすることなく操作できる
  • インタラクティブに実行結果を確認できる

という2点において、機械学習やデータ分析のコーディングに有利なツールとなっています。しかし、これは同時に、コードの実行中はJupyter Notebookのサーバーを立てていないといけなかったり、実行中のブラウザ画面を常に表示していないと、勝手に中断されたりと、不便さも同時に引き起こします。

1日以内の実行であれば、これらの不便さは我慢できるかもしれないですが、3日を超えるコードの実行が必要な場合、なんらかの工夫をしなければ、サーバーのポートを無駄に食い潰したり、手元のPCの取り回しがめんどくさくなったりします。あとは、ノートブックだと実行が最後まで終了した後も、プロセスが終了しないので、GPUメモリが確保されたままになっていたり...

今回は、Jupyter Notebookを通常のプログラムと同じように、コマンドライン上から起動、実行、出力の集積を行う方法を紹介します。

nbconvert

Jupyter Notebookは、nbconvertコマンドを用いてスクリプト形式やHTML形式に変換することができます。変換形式にnotebookという形式を選択することが可能で、これを実行すると、選択したJupyter Notebookがすべて実行され、実行結果が保存されます。

この形式をとった方が、Jupyter Notebook固有のメソッドであるIPythonモジュールをを使えたり、インタラクティブな途中経過の確認を保存できたりして、便利です。データの操作が正しく行えているかを可視化できるとミスやデバッグの手間が減らせます。

コマンド

jupyter nbconvert --ExecutePreprocessor.timeout=-1 --to notebook --execute notebook.ipynb --output results/notebook.ipynb

ABCI1上など、スパコン上で使う場合は、次のようなシェルスクリプトrun.shを作成して投げています。

#!/bin/bash
#$ -cwd
#$ -l rt_F=1
#$ -l h_rt=24:00:00
#$ -o log/o.$JOB_ID
#$ -j y


source /etc/profile.d/modules.sh
module load gcc/7.4.0
module load python/3.7/3.7.6
module load cuda/10.1/10.1.243
module load cudnn/7.6/7.6.5
module load nccl/2.5/2.5.6-1

source ./venv/bin/activate

mkdir results/$JOB_ID
cp $1 results/$JOB_ID/
jupyter nbconvert --ExecutePreprocessor.timeout=-1 --to notebook --execute $1 --output results/$JOB_ID/$1

ABCI上でジョブを投げるときに使うqsubコマンドはshコマンドのように引数を渡すことができるので、

qsub -g [group_id] run.sh notebook.ipynb

とすることで、notebookの実行が可能になります。

まとめ

もともと、Kaggle Kernelのcommitのような実行結果も含めたバージョン管理、実験管理が目的でこの方式を考えました。

インタラクティブデバッグと可視化+実験の実行と保存をうまく組み合わせることができれば、機械学習の実験の取り回しがもっと楽になると思います。

Weights & Biases と pytorch-pfn-extrasをくっつけたら最強なんじゃないかと思った

この投稿はrioyokotalab Advent Calendar 2020 9日目の投稿です。

adventar.org

実験のトラッキング

深層学習の学習コードを動かすにあたって、学習の経過を確認したり、複数の学習の結果を比較したりといったことは、学習アルゴリズムの改善を行うにあたって非常に重要となってきます。これを便利に行えるようにしてくれるツールには、mlflow1, comet.ml2などがあります。

自分は普段はWeights & Biases3を使っているので、今回は、W&Bを使った自分の実験トラッキング環境を紹介します。

Weights & Biases

www.wandb.com

Weights & Biasesというのは、深層学習の実験トラッキングツールの一つで、自前で実験ログ用のストレージを持つ必要のないのが特徴のトラッキングツールです。そのため、アカウント一つで素早く導入することができ、必要に応じて情報の共有が楽だったりします。他にも、ハイパーパラメータサーチのための機能が入っていたり、実験ログをMarkdown形式でまとめたりと、様々な機能が揃っています。(また、コンタクトを取るとレスポンスが素早いという噂も)個人使用を想定している場合は、何も不足することはないと思います。

大体の使い方はこちらを参考にしていただければと思います。

note.com

今回は、実験のログ(学習率、損失、検証スコアなど)をWeights & Biasesに送るための自前のやり方を紹介しようと思います。

pytorch-pfn-extrasとの接続

自分は、学習コードに必要な諸機能をpytorch-pfn-extras4でまかなっています。この話については、同AdCのこちらの記事を参照してください。

deoxy.hatenablog.com

pytorch-pfn-extrasには学習のログを吐く機能があるのですが、このログこそが、W&Bに送りたい情報なのです。なので、この情報をどうやって取り出すかというのが、今回の話題になります。

extensionの作成

pytorch-pfn-extrasでは、実験のログの取得や可視化など、実験コードに付け加えたい便利機能の諸々はextensionと呼ばれるオブジェクトによって、実装されています。基本的な考え方は、ログの可視化(stdoutへの出力)を行うPrintReportの出力先をW&Bに変えてあげればいいということになります。なので、 pytorch-pfn-extrasのgithubコード中のPrintReportのコードを参照し、次のようなクラスを設計しました。

class SendWandB(ppe.training.extensions.PrintReport):
    def __init__(self, entries=None, log_report='LogReport', wandb=None):
        super().__init__(entries, log_report, None)
        self.wandb = wandb

    def __call__(self, manager):
        log_report = self.get_log_report(manager)
        log = log_report.log
        log_len = self._log_len
        while len(log) > log_len:
            self.wandb.log(log[log_len], step=log_len)
            log_len += 1
        self._log_len = log_len

あとは、

manager.extend(E.PrintReport(['epoch', 'iteration', 'lr', 'train/loss', 'valid/loss', 'valid/acc', 'elapsed_time']), trigger=standard_trigger)
manager.extend(SendWandB(['epoch', 'iteration', 'lr', 'train/loss', 'valid/loss', 'valid/acc', 'elapsed_time'], wandb=wandb), trigger=standard_trigger)

といった感じで、PrintReportと同じように書いてあげるとwandbへ送ることができるようになります。

引数のwandbには

import wandb

としてimportしたwandbモジュールを渡すことで、将来的にもしpytorch-pfn-extrasに導入された場合でも依存性を回避できるかなぁとか、別ファイルにモジュール化した場合でも、import文書かなくて良くて依存性回避できるなぁとか、出力先を明示した方がわかりやすいかなぁとか、結局wandbを起動するためにはメインのコード側で一度initメソッドとか呼ばないといけないしなぁとかいろいろ考えて、モジュールを渡すってデザイン大丈夫なのかとか思いながらこれにしました。

ちなみにwandb.initを呼ぶタイミングはログを開始する前であれば、extensionを作成した後でも大丈夫です。

これを使えば、pytorch-pfn-extrasを用いたテンプレートコードからそこまで実装を変えることなく、ログを転送できるようになります。

(確か、PrintReportで表示しようとしたログのカラムとSendWandBで送ろうとしたログのカラムの和集合が送られて表示されちゃってた気がして、manager周りで何か共有しちゃってる気がするけれど、別に困らないから直してない。)

テンプレートコード

class Evaluator(ppe.training.extension.Extension):
    priority = ppe.training.extension.PRIORITY_WRITER

    def __init__(self, model, device, metrics, loader, prefix='valid/'):
        self.model = model
        self.device = device
        self.metrics = metrics
        self.prefix = prefix
        self.loader = loader
        
    def __call__(self, manager):
        self.model.eval()
        logs = {name: [] for name, metric in self.metrics.items()}
        with torch.no_grad():
            for data in self.loader:
                data = to(data, device)
                out = self.model(**data)
                for name, metric in self.metrics.items():
                    met = metric(**out).item()
                    logs[name].append(met)
        for name, value in logs.items():
            ppe.reporting.report({
                self.prefix + name: np.mean(value)
            })

manager = ppe.training.ExtensionsManager(
    model, optimizer, num_epochs,
    iters_per_epoch=len(train_loader),
    out_dir=out_dir
)

standard_trigger = (1, 'epoch')
manager.extend(E.observe_lr(optimizer=optimizer), trigger=standard_trigger)
manager.extend(E.LogReport(trigger=standard_trigger))
manager.extend(E.PrintReport(['epoch', 'iteration', 'lr', 'train/loss', 'valid/loss', 'valid/acc', 'elapsed_time']), trigger=standard_trigger)
manager.extend(SendWandB(['epoch', 'iteration', 'lr', 'train/loss', 'valid/loss', 'valid/acc', 'elapsed_time'], wandb=wandb), trigger=standard_trigger)
manager.extend(Evaluator(model, device, {'loss': criterion, 'acc': metric}, valid_loader, prefix='valid/'), trigger=standard_trigger)
manager.extend(E.snapshot(target=model, filename='best.pth'), trigger=ppe.training.triggers.MaxValueTrigger(key='valid/acc', trigger=standard_trigger))
manager.extend(E.snapshot(target=model, filename='model.pth'), trigger=standard_trigger)

while not manager.stop_trigger:
    for data in train_loader:
        with manager.run_iteration():
            model.train()
            optimizer.zero_grad()
            data = to(data, device)
            out = model(**data)
            loss = criterion(**out)
            ppe.reporting.report({
                'train/loss': loss.item()
            })
            loss.backward()
            optimizer.step()
            scheduler.step()

前述しましたが、pytorch-pfn-extrasの使い方の方も参照していただければと思います。

まとめ

実際、自分がやり易ければ、トラッキングの方法なんてなんでもいいと思ってる。

もちろん自分はこれがやりやすいと思っているので、おすすめです。

pytorch-pfn-extrasが便利という話

この投稿はrioyokotalab Advent Calendar 2020 7日目の投稿です。

adventar.org

実験ログをきれいに取りたい

PyTorchでコードで学習コードを書いた時の悩みの一つ。それは、ログの取り方だと思います。取りたい情報は損失、学習ステップ、学習率など、様々です。それぞれの情報が、取得するために様々な実装を必要とします。

今回は、pytorch-pfn-extrasのステマ紹介をします。

github.com

pytorch-pfn-extrasは実験のログをきれいに表示したり、ファイル出力してくれたりはもちろん。学習の進捗状況の表示や、Chainerではおなじみだった遅延評価モデルモジュールも提供してくれたりしています。

普段、自分が使わない昨日も多々あるので、全部紹介することはできないですが、わかる範囲で紹介します。

以下

import pytorch_pfn_extras as ppe

します。

Reporter

ppe.reporting.reportメソッドは記録したい値をppeモジュールに保存していきます。後々出てくるmanagerやextensionsで可視化、集計などを行えます。

ppe.reporting.report({'loss': loss.item()})

みたいな感じで使います。

学習管理

ppe.training.ExtensionsManagerクラスは学習epoch数や、現在の合計イテレーション数などを管理し、学習の停止タイミングを決めたりします。ppe.reporting.reportで吐き出した値を元に停止するタイミングを決定したりすることもできます。

また、with文で、

with manager.run_iteration():

以下にコードを書いてあげると、ppe.reporting.reportに出力する値は、LogReportで集計されるまで蓄積され、LogReportで出力するタイミングで平均によって集計されます。この機能は感覚的でない気がしていますが...。 また、学習ステップが進んでいることをppeに伝えることができ、学習イテレーションを一つ増やすことができたりします。

便利ツール

pytorch-pfn-extrasの学習便利ツール。それは、extensionsと呼ばれるものです。学習率の監視や、ログの出力、進捗バーの表示など、学習をわかりやすくするための諸々の機能が詰まっています。

  • Evaluator
  • LogReport
  • MicroAverage
  • PrintReport
  • ProgressBar
  • ParameterStatistics
  • PlotReport
  • observe_lr
  • observe_value
  • snapshot
  • VariableStatisticsPlot

これらはトリガーと呼ばれるものを指定することで、学習中の望むタイミングで動作させることができます。例えば、学習エポックの変わり目や、1000イテレーション経過時点、他にも、validation損失が最小になったタイミングなどなどです。

これ以外にも、必要に応じてExtensionクラスを自作したり、関数をExtension化したりすることができます。

これらの機能からいくつかピックアップします。

observe_lr

optimizerを渡すことで、optimizerの学習率を監視してくれます。監視している学習率はデフォルトではlrという値でppeモジュール内で管理されます。

LogReport

指定ステップ数ごとに学習中の様々な値をjson形式にファイル出力する。ログを出力する時点での、経過時間や学習エポック数、イテレーション数、ovserve_lrで監視している学習率、指定すれば、ppe.reporting.reportで出力した値などを出力してくれます。

snapshot

指定したタイミングでモデルのスナップショットを取ってくれます。トリガーには、検証損失が最小になったタイミングで起動するものもあるので、それを指定することで、ベストモデルだけを保存したりすることができます。

学習テンプレートコード

普段自分はこんな感じで書いています。学習コード自体のベースはこちらの記事を参照してください。

deoxy.hatenablog.com

class Evaluator(ppe.training.extension.Extension):
    priority = ppe.training.extension.PRIORITY_WRITER

    def __init__(self, model, device, metrics, loader, prefix='valid/'):
        self.model = model
        self.device = device
        self.metrics = metrics
        self.prefix = prefix
        self.loader = loader
        
    def __call__(self, manager):
        self.model.eval()
        logs = {name: [] for name, metric in self.metrics.items()}
        with torch.no_grad():
            for data in self.loader:
                data = to(data, device)
                out = self.model(**data)
                for name, metric in self.metrics.items():
                    met = metric(**out).item()
                    logs[name].append(met)
        for name, value in logs.items():
            ppe.reporting.report({
                self.prefix + name: np.mean(value)
            })

manager = ppe.training.ExtensionsManager(
    model, optimizer, num_epochs,
    iters_per_epoch=len(train_loader),
    out_dir=out_dir
)

standard_trigger = (1, 'epoch')
manager.extend(E.observe_lr(optimizer=optimizer), trigger=standard_trigger)
manager.extend(E.LogReport(trigger=standard_trigger))
manager.extend(E.PrintReport(['epoch', 'iteration', 'lr', 'train/loss', 'valid/loss', 'valid/acc', 'elapsed_time']), trigger=standard_trigger)
manager.extend(Evaluator(model, device, {'loss': criterion, 'acc': metric}, valid_loader, prefix='valid/'), trigger=standard_trigger)
manager.extend(E.snapshot(target=model, filename='best.pth'), trigger=ppe.training.triggers.MaxValueTrigger(key='valid/acc', trigger=standard_trigger))
manager.extend(E.snapshot(target=model, filename='model.pth'), trigger=standard_trigger)

while not manager.stop_trigger:
    for data in train_loader:
        with manager.run_iteration():
            model.train()
            optimizer.zero_grad()
            data = to(data, device)
            out = model(**data)
            loss = criterion(**out)
            ppe.reporting.report({
                'train/loss': loss.item()
            })
            loss.backward()
            optimizer.step()
            scheduler.step()

実行すると

epoch       iteration   lr          train/loss  valid/loss  valid/acc   elapsed_time
1           534         7.61087e-05  1.0535      0.489182    0.825715    770.13        
2           1068        9.96594e-05  0.471393    0.372857    0.872201    1538.18       
3           1602        9.69772e-05  0.387294    0.367487    0.870103    2305.61       
4           2136        9.17624e-05  0.34103     0.360847    0.879198    3073.83       
5           2670        8.42963e-05  0.298277    0.363659    0.882929    3840.81       
6           3204        7.49812e-05  0.257328    0.377409    0.881996    4608.64       
7           3738        6.43194e-05  0.221985    0.373493    0.883085    5375.9        
8           4272        5.28857e-05  0.189842    0.415019    0.874767    6144.7        
9           4806        4.12964e-05  0.154903    0.431548    0.877332    6910.01       
10          5340        3.01763e-05  0.126962    0.471258    0.875233    7677.17       
11          5874        2.01249e-05  0.109875    0.49328     0.873134    8444.17   

こんな感じにログを表示してくれて、out_dir/logにこのログと同じ内容のjsonファイルが作成されます。

1stepで学習する内容をすべてmanager.run_iteration()内部に置くことで、学習ステップが始まる前に起動して欲しいトリガーや学習ステップが終わった後に起動して欲しいトリガーなどの起動タイミングをコントロールすることができます。

Evaluatorは自作していますが、これは、aucなどの全推論結果を加味した指標を取りたい場合、ppe.training.extensions.Evaluatorを使うと不便だからです。

トリガーや、その他extensionsの使い方はgithubにあるdocumentationsを参照してください。

遅延評価モジュール

少し話が変わりますが、遅延評価モジュールの紹介をします。

ユースケース

EfficientNetという大きさをb0からb7までいろいろ変更できるモデルがあります。これは大きさを変更すると、最終層の出力ベクトルサイズも変わるのですが、これをコロコロ変えたい場合、最終層の全結合層の入力サイズを毎回変えなければなりません。また、EfficientNetからSE-ResNeXtに変える場合は?その度に最終層のベクトルサイズを調べ直すのは手間ですよね。しかもハードコーディングはかっこ悪い...。

Lazy Modules

class Model(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.backbone = backbone
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(2048, 5)
        
    def forward(self, x, **kwargs):
        out = self.backbone.extract_features(x)
        out = self.pool(out).squeeze(-1).squeeze(-1)
        out = self.fc(out)
        kwargs['out'] = out
        return kwargs

このように全結合層を書いているところを

class Model(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.backbone = backbone
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc = ppe.nn.LazyLinear(None, 5)
        
    def forward(self, x, **kwargs):
        out = self.backbone.extract_features(x)
        out = self.pool(out).squeeze(-1).squeeze(-1)
        out = self.fc(out)
        kwargs['out'] = out
        return kwargs

としてあげるだけで先ほどの悩みは半分ほど解決です。

あとは、遅延評価モデルなので、fcの入力サイズを評価する必要があります。それを行うためには、

model = Model(backbone)
for example in train_loader:
    break
with torch.no_grad():
    out = model(**example)

と実行してあげて、一度、モデルにデータを通します。そうすることで、モデルはデータが入力された際の全結合層への入力ベクトルサイズを確認することができ、あとは、通常のモデルとして扱うことができるようになります。

まとめ

他にも、Configの管理ツールやONNXをよしなにしてくれる機能もあるらしいですが、使ったことがないので、今回は取り扱いません。

pytorch-pfn-extrasは実験のログ管理ツールの中では、かなりコンパクトで取り回しやすいものだと感じています。

ドキュメント早く書いて...

PyTorchのテンプレコードを用意してどんなデータセットにも楽々ディープラーニング

この投稿はrioyokotalab Advent Calendar 2020 5日目の投稿です。

adventar.org

PyTorchは自由すぎる

PyTorchは自動微分ライブラリとしての側面が強く、一方で中途半端に深層学習としての機能を提供しているため、コードが書く人によってまちまちになりがち(個人的見解)。かといって、コードの書き方を強要するライブラリを大量に提供すれば、万人に受け入れられるコードにはならない。PyTorchはサードパーティーライブラリを作ってもらうことによって、書き方の共通化を進めているが、結局、そこには好みが出てきて、そのライブラリを使ったことがない人にとっては可読性の低いコードとなってしまう。

今回は、PyTorchの機能のみを使って(共有性の向上)、様々なデータセット、モデル、Loss関数の深層学習コードを実装できるようにした、自己流のテンプレートコードを紹介する。

Python**オペレータの利用

テンプレートコードでは、辞書を**オペレータによってunpackingしてキーワード引数として渡す機能1を利用する。さらに、中間の変数はなるべく辞書として持つことで、テンプレートコードに引数を追加する必要性を減らし、コードの変更量を減らす。こうすることで、データの意味や、値の意味を表現したまま、少ない変更量で学習コードを実現できる。これは、コードの修正が容易になるだけでなく、コードの可読性の向上にもつながると思っている。

テンプレートコード

データセット

class TestDataset:
    
    def __init__(self, df):
        self.data = df.data.to_list()
        self.root = root
        self.preprocess = Preprocess()
        self.postprocess = Postprocess()
        self.augmentation = None
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, i):
        x = Image.open(os.path.join(self.root, self.data[i]))
        x = self.preprocess(x)
        if self.augmentation is not None:
            x = self.augmentation(x)
        x = self.postprocess(x)
        return {
            'x': x,
        }
    
class ValidDataset(TestDataset):
    
    def __init__(self, df):
        super().__init__(df)
        self.label = df.label.to_list()
        
    def __getitem__(self, i):
        ret = super().__getitem__(i)
        ret['target'] = self.label[i]
        return ret
    
class TrainDataset(ValidDataset):
    
    def __init__(self, df, augmentation):
        super().__init__(df)
        self.augmentation = augmentation

データセット部分はTestDataset -> ValidDataset -> TrainDatasetの順で継承させる。

  • TestDatasetには、推論時に行うデータの前処理、正規化やモデルに入力するためのテンソル化などの作業を行う。
  • ValidDatsetでは学習ラベルを入力データとペアにするための作業を行う。
  • TrainDatasetでは主にデータ拡張の加える。

このようにコードを構成することで、推論用にモデルをデプロイする際に、TestDatsetを写すことで、テストデータの読み出しが想定と異なる動作をすることを防ぐことができる。

データローダー

train_dataset = TrainDataset(train_df, Augmentation())
valid_dataset = ValidDataset(valid_df)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True, drop_last=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False, num_workers=4, pin_memory=True, drop_last=False)

特に目立つ記述はないが、先ほどのデータセットは辞書オブジェクトを要素に持つ配列となるが、それをPyTorchのDataLoaderに通すと、なんと、辞書要素ごとにバッチ化してくれる

つまり、

[{x: データ, target: ラベル} ... {x: データ, target: ラベル}]となっているデータセットなら、データローダーから取り出されるミニバッチは{x:[データ, ..., データ], target: [ラベル, ..., ラベル]}となっている。

なぜこんな仕様になっているかは知らない。

モデル定義

class Model(nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model
        
    def forward(self, x, **kwargs):
        out = self.model(x)
        kwargs['out'] = out
        return kwargs

後々のコードではデータローダーから取り出された辞書オブジェクトに格納されたバッチを**オペレータによってunpackingして渡す。その時、すべての辞書要素を無差別に渡すようにするので、モデルには関係ないデータ(例えば、ラベル情報)なども入力される。それを変数として認識せず、かつ、返り値に残すために**kwargs変数を使う。これは、キーワード引数で渡された、明示的に書かれていない引数を辞書として持つ機能で、kwargsという名前で、辞書オブジェクトとして扱うことができる。モデルに通した後、その後に扱いたい情報をkwargsに追加して返すことで、拡張性の高いコードとすることができる。(あまり行儀がいいとは言えないかもしれないが)

余談ではあるが、huggingface氏が提供しているtransformers2という有名な自然言語向けの深層学習ライブラリがある。 ここで提供されているモデルは返り値を辞書で返してくるので、辞書で変数をまとめて扱う、というコーディングスタイルはかなり有用であると言える。

損失関数

class Criterion(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.cross_entropy = nn.CrossEntropyLoss()
        
        
    def forward(
        self,
        out,
        target,
        **kwargs
    ):
        return self.cross_entropy(out, target)

損失関数についても、モデル定義の時と同様に辞書オブジェクトを**オペレータでunpackingして渡されることを想定して、**kwargs引数を持った状態で作成する。モデル定義の時と違うのは、損失は.backwardメソッドを呼んで、勾配を計算する必要があるので、辞書ではなく、torch.tensorを直接返すようにする。

class AccMetric(nn.Module):
    
    def __init__(self):
        super().__init__()
        
    def forward(
        self,
        out,
        target,
        **kwargs
    ):
        pred = out.argmax(dim=1)
        return (pred == target).float().mean()

その他metricsも必要に応じて設計する。

バイス転送関数

def to(x, device, *args, **kwargs):
    return {
        key: value.to(device, *args, **kwargs) for key, value in x.items()
    }

辞書オブジェクトで渡されたミニバッチはtorch.tensorではないので、.toメソッドを直接呼ぶことができない。従って、各辞書要素について、.toメソッドを呼ぶ関数を用意する。

*args, **kwargs引数は.toメソッドに渡す他の引数(例えば、non_blockingなど)のために用意しておく。

その他学習に必要なオブジェクトの用意

num_epochs = 20
model = Model(_model)
model = model.to(device)
criterion = Criterion()
metric = AccMetric()
optimizer = torch.optim.AdamW(model.parameters())
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer, 
    max_lr=0.01, 
    epochs=num_epochs, 
    steps_per_epoch=len(train_loader),
    pct_start=0.1
)

作成したクラスなどからオブジェクトを作成し、その他optimzierやschedulerなどを設定する。

学習

for i in range(num_epochs):
    for data in train_loader:
        model.train()
        optimizer.zero_grad()
        data = to(data, device)
        out = model(**data)
        loss = criterion(**out)
        loss.backward()
        optimizer.step()
        scheduler.step()
    for data in valid_loader:
        model.eval()
        with torch.no_grad():
            data = to(data, device)
            out = model(**data)
            acc = metric(**out)
            loss = criterion(**out)
            

これだけで学習ができる。

まとめ

他の記事で言及しようと思っているが、実際には自分は、Weights and Biases3やpytorch-pfn-extras4など、実験トラッキングツールを活用している。しかし、それらは個人の好みが関わってくるので、今回は言及しない。ここまで、書いたテンプレートコードをまとめると次のようになる。

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

class TestDataset:
    
    def __init__(self, df):
        self.data = df.data.to_list()
        self.root = root
        self.preprocess = Preprocess()
        self.postprocess = Postprocess()
        self.augmentation = None
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, i):
        x = Image.open(os.path.join(self.root, self.data[i]))
        x = self.preprocess(x)
        if self.augmentation is not None:
            x = self.augmentation(x)
        x = self.postprocess(x)
        return {
            'x': x,
        }
    
class ValidDataset(TestDataset):
    
    def __init__(self, df):
        super().__init__(df)
        self.label = df.label.to_list()
        
    def __getitem__(self, i):
        ret = super().__getitem__(i)
        ret['target'] = self.label[i]
        return ret
    
class TrainDataset(ValidDataset):
    
    def __init__(self, df, augmentation):
        super().__init__(df)
        self.augmentation = augmentation


train_dataset = TrainDataset(train_df, Augmentation())
valid_dataset = ValidDataset(valid_df)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True, drop_last=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False, num_workers=4, pin_memory=True, drop_last=False)

class Model(nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model
        
    def forward(self, x, **kwargs):
        out = self.model(x)
        kwargs['out'] = out
        return kwargs

class Criterion(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.cross_entropy = nn.CrossEntropyLoss()
        
        
    def forward(
        self,
        out,
        target,
        **kwargs
    ):
        return self.cross_entropy(out, target)

class AccMetric(nn.Module):
    
    def __init__(self):
        super().__init__()
        
    def forward(
        self,
        out,
        target,
        **kwargs
    ):
        pred = out.argmax(dim=1)
        return (pred == target).float().mean()

def to(x, device, *args, **kwargs):
    return {
        key: value.to(device, *args, **kwargs) for key, value in x.items()
    }

num_epochs = 20
model = Model(_model)
model = model.to(device)
criterion = Criterion()
metric = AccMetric()
optimizer = torch.optim.AdamW(model.parameters())
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer, 
    max_lr=0.01, 
    epochs=num_epochs, 
    steps_per_epoch=len(train_loader),
    pct_start=0.1
)


for i in range(num_epochs):
    for data in train_loader:
        model.train()
        optimizer.zero_grad()
        data = to(data, device)
        out = model(**data)
        loss = criterion(**out)
        loss.backward()
        optimizer.step()
        scheduler.step()
    for data in valid_loader:
        model.eval()
        with torch.no_grad():
            data = to(data, device)
            out = model(**data)
            acc = metric(**out)
            loss = criterion(**out)
            

別にこれが最適解だとは思っていないので、まだまだ煮詰めていきたい。

Rank Gaussという正規化手法

この投稿はrioyokotalab Advent Calendar 2020 3日目の投稿です。

adventar.org

Rank Gauss

Kaggleでは良く知られているRank Gaussという正規化手法について紹介します。RankGaussは2017年のKaggleコンペ"Porto Seguro’s Safe Driver Prediction”で提案された1正規化手法です。この正規化手法は、平均を引いて標準偏差で割るという、画像処理などでよく使われる正規化とは違い、どんなデータ分布であっても、標準正規分布に直すことができるという強力な手法となっています。

正規分布に正規化するモチベーション

機械学習モデルについて統計的な議論を行う時、汎化や収束速度などの議論をする場合、入力データ分布に正規分布を仮定することが多いです。特にニューラルネットワーク系の話題になるとその色が強くなります。つまり、ニューラルネットワークにおける、様々なアルゴリズムの恩恵を受けるためには、入力が正規分布である必要があるのです。

RankGaussを使うモチベーション

実世界のデータは必ず正規分布になっているとは限りません。しかし、正規分布という、自然界のいろいろなものがなぜか従う分布を背後にもっているデータ分布はあるかもしれません。データ分布が生成されるモデル背景とその背後にある正規分布を見つけることができれば、話は早いですが、そんなことをしなくても、生成するモデルが単調性さえ維持していれば、RankGaussで正規分布に戻せるのです。

アルゴリズム

ある、数値データを考えます。まず、数値データを順位付けし、その順位が上位どの割合にあるかという[0, 1]の値に変換します。これだけで、元のデータ分布を一様分布に押し込めることができました。同じ要領で正規分布に押し込めることができます。

  1. 順位付けする
  2. -1 ~ 1の範囲にスケーリングする
  3. 逆誤差関数:  \frac{2}{\sqrt{\pi}} \int_{0}^{x} e^{-t^{2}} d t逆関数に通す

ことで、正規化ができます。

f:id:deoxy:20201203233753p:plain

大体こんなイメージ。

実装

scikit-learnにQuantileTransformer2というメソッドがあります。このoutput_distributionという引数に'normal'を渡すとRankGaussと同じ機能になります。 正規化は学習データに基づいて統計的な操作を加えるものなので、学習データの統計情報を保持する必要があります。QurantileTransformerはscikit-learnの学習モデルインターフェースに従って、.fitメソッドと.transformメソッドがあり、.fitメソッドに学習データを渡した後、.transformに学習、検証、テストデータを渡してあげると、正規分布に直されたデータが手に入ります。

まとめ

要は最強の正規化アルゴリズムf:id:deoxy:20201203235726p:plain ...まあ、何でもかんでも正規分布に押し込めりゃいいって話ではない。

Jupyter Notebookのジョブ実行 on ABCI

この投稿はrioyokotalab Advent Calendar 2020の投稿です。

adventar.org

Jupyter NotebookをABCI上で使いたい

公式の方法はこちら

Jupyter Notebookの利用 - ABCI User Guide

Jupyter Notebookは便利なもので[要出典]、ABCI上に立てれば、強力な計算資源の恩恵(4x Tesla V100 + 80 Thread + 384GBメモリ)を受けながらインタラクティブなプログラムの実行が行えます。

この利点は、データの読み込みに時間がかかるが、デバッグを細かく行いたい場面などで活かされます。また、実行内容の可視化を素早く行うことができ、実験結果の共有や、実行内容の正常性を確認することもできます。

これらの利点は、まさに機械学習と相性が良く、機械学習のためのスパコンであるABCIでこそJupyter Notebookの利便性を確保することは重要です。

しかし、公式の方法は手順を理解するためには重要な内容ですが、毎回その手順を行うのはコマンドが多く、大変です。今回は、Jupyter Notebookの起動を簡潔に行う方法を紹介します。

Jupyter Notebookの起動コマンドのジョブ実行

ABCI上でジョブ実行するためには、qsubコマンドを使います。qsubコマンドでスクリプトを実行するためには、ジョブスクリプトを用意する必要があります。

仮想環境の作成

公式の起動方法にもあるように、起動する前に、Pythonの仮想環境を作成する必要があります。

仮想環境の作成は、

python -m venv venv

と実行することで、手元に、venvという名前の仮想環境が作成されます。 Jupyter Notebookを起動するためには、Jupyterをpipでインストールする必要があるので、

source ./venv/bin/activate

と実行し、仮想環境を起動した後、

pip install jupyter

と実行しましょう。その他、必要なライブラリもこのタイミングでインストールしましょう。

Jupyter Notebookはアクセスする際にパスワードかトークンを求められます。ジョブ実行した場合、トークンを確認するのが面倒なので、予めパスワードを設定しておきましょう。パスワードの設定は

Running a notebook server — Jupyter Notebook 6.1.5 documentation

が参考になります。

jupyter notebook --generate-config
jupyter notebook password

の後、passwordを入力することで、パスワードの設定が可能です。

Jupyter Notebookの起動

自分が使っているJupyter Notebook起動のためのスクリプトjupyter.shは次の通りです。

#!/bin/bash
#$ -cwd
#$ -l rt_F=1
#$ -l h_rt=24:00:00
#$ -o log/o.$JOB_ID
#$ -j y


source /etc/profile.d/modules.sh
module load gcc/7.4.0
module load python/3.7/3.7.6
module load cuda/10.1/10.1.243
module load cudnn/7.6/7.6.5
module load nccl/2.5/2.5.6-1
unset JOB_ID

source ./venv/bin/activate
jupyter notebook --ip=`hostname` --port=8888 --no-browser --notebook-dir=./

このコマンドを用意して、

qsub -g [group_id] jupyter.sh

と実行すると、rt_Fノード上でJupyter Notebookの起動が行えます。

Jupyter Notebookへの接続

Jupyter Notebookに接続するためには、起動したJupyter NotebookがABCI上のどのノードで起動したかを特定する必要があります。

qstat

を実行し、queueの項目の下にある、gpu@g0000などとある部分を確認しましょう。

g0000の部分をメモしたら、自分のPCのローカルのターミナル上で、

ssh -N -L 8888:g0000:8888 [abciへsshするために叩いているHostName]

を実行しましょう。自分の環境の場合は、

ssh -N -L 8888:g0010:8888 abci

といった感じになります。

これによって、ローカルPCから、ABCI上のJupyter Notebookまでのトンネルが確保され、ローカルPCのlocalhost:8888からJupyter Notebookへアクセスできます。

Jupyter Notebookをジョブ実行によって起動するメリットと注意点

メリット

  • インタラクティブノードの確保する際の制限である、12時間を超えてJupyter Notebookを起動できる。
  • インタラクティブノードが切断されることによって、実行途中の計算結果などが消えてしまうことがない。(ターミナルなどが閉じてしまっても、トンネルを通しなおせば、実行途中のノートブックが開けます。)
  • jupyter.sh内に環境を書くことができる。(インタラクティブノードで、毎回、必要なモジュールをmodule loadするのはミスが起こりやすい)(機械学習において、環境の差は再現性を壊す最大の原因の一つ)
  • コマンドが少なくて済むので、楽

注意点

  • Jupyter Notebookを起動したジョブは自動的に終了してくれないので、ポイントの無駄遣いになる可能性がある。

Jupyter Notebookの終了方法

Jupyter Notebookのジョブを終了する方法は3種類ほどあります。

方法1

f:id:deoxy:20201201213922p:plain ブラウザ上で、Quitボタンを押す

方法2

ABCI上で、

qdel jupyter.sh

を実行する

方法3

ABCI上で、qstatコマンドで、Jupyter Notebookを起動したジョブIDを調べて、

qdel [ジョブID]

を実行する

まとめ

ABCI上でJupyter Notebookを

  1. qsub -g group_id jupyter.sh
  2. ssh -N -L 8888:g0000:8888 abci
  3. localhost:8888へアクセス

の3ステップで起動できます。Jupyter Notebookを楽々起動して、楽しい機械学習ライフを!