2018年5月19日土曜日

Pytorch Reinforcement Learning (DQN) tutorial 日本語意訳コメント

ううーーーPython初めてでいきなりDQNはチャレンジしすぎでしょうか?
先人たちのコードを見ながら四苦八苦中です。

PyTorchのDQN tutorialコードに日本語コメントを入れました。
自分用です。

第14回 深層強化学習DQN(Deep Q-Network)の解説
https://book.mynavi.jp/manatee/detail/id=89691

第15回 CartPole課題で深層強化学習DQNを実装
https://book.mynavi.jp/manatee/detail/id=89831

を主に参考にしています。
上記記事のコードは、OpenAIGymのCartPole-v0の位置や傾きといった情報を元にDQNをおこなっています。Keras-RLのCartPole-v0サンプルで同じです。PyTorchのサンプルでは、描画した画像をポールを中心に抜き出して画像から学習するようになっています。

自分がわからなかったところをコメントに残しておきましたでの、参考になれば幸いです。(Python初心者なので割と基本的なところからわかってません汗)

あと朝から調べながらやっていたのですが、後半お酒飲みながら書いてます。だんだんコメント口調崩れてきますがお気になさらずに・・。

"""
Pytorch DQN チュートリアル 日本語意訳
Reinforcement Learning (DQN) tutorial
=====================================
**Author**: `Adam Paszke <https://github.com/apaszke>`_
"""

import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as nnFunc
import torchvision.transforms as tvTrans

env = gym.make('CartPole-v0').unwrapped

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display
plt.ion()

# GPUモードが有効な場合GPUを利用する。それ以外はCPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

"""
 経験を保存するメモリクラスを定義する
 DQNではランダムにサンプリングすることで相関のないバッチを使用する。
 バッチ学習のために二つのクラスが必要です。
 -  ``Transition`` - 状態を表す名前付きタプル
 -  ``ReplayMemory`` - サイクルバッファメモリクラス
       sampleメソッドで取得したランダムなバッチでトレーニングを行います。

"""
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

#経験を保存するメモリクラス
class ReplayMemory(object):

    #コンストラクタ
    def __init__(self, capacity):
        self.capacity = capacity    # メモリの最大長さ
        self.memory = []            # 経験を保存する変数
        self.position = 0           # 現在の保存位置

    # メモリに追加する。
    def push(self, *args):
        # メモリが満タンでないときは足す
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        #サイクリックにポジションを指定する。
        self.position = (self.position + 1) % self.capacity

    #バッチサイズ分ランダムにメモリを取り出す。
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    #メモリに保持している長さ
    def __len__(self):
        return len(self.memory)


"""
 DQNクラス
 元英語のDQNについてはほかのページに一杯のってるから省略
 Qテーブルをニューラルネットワークで近似する
 そのためのネットワークを定義する。

 Pytorchのサンプルコードが、PyTorchのサンプルでよくある下記の形ではない。
 model = nn.Sequential()
 model.add_module('fc1', nn.Linear(10,100))

 上がDefine and run、コードの形がDefine by run.入力Xによって
"""
#DQNクラス
class DQN(nn.Module):
    #コンストラクタ
    def __init__(self):
        #ニューラルネットワーク内モデルの定義
        super(DQN, self).__init__()
        #Conv2d(inのチャンネル数、outのチャンネル数、
        #kernel_size特徴抽出フィルタ縦横サイズ
        #kernel_size特徴抽出フィルタの探査
        #たぶん、Kerasと変わらない・・。畳み込みについては下記二つがわかりやすい。
        #https://keras.io/ja/layers/convolutional/
        #https://deepage.net/deep_learning/2016/11/07/convolutional_neural_network.html#畳み込みとは
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)

        #最後に入力448、出力2線形入出力
        self.head = nn.Linear(448, 2)

    def forward(self, x):
        #ニューラルネットワークの定義
        x = nnFunc.relu(self.bn1(self.conv1(x)))
        x = nnFunc.relu(self.bn2(self.conv2(x)))
        x = nnFunc.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))



"""
 次のコードはレンダリングのためのユーティリティです。
 環境からの画像変換はtorchvisionを使うと簡単です的な?
 PILイメージに変換して、高さ40pixelにしたあと、PyTorch向けのTensorに変換する
 という一連の流れをresizeFとして定義した感じ。
 蛇足だけど、動的型定義だと、これ、関数だっけ?変数だっけ?とかわかりにくい。
 なんとかならんもんかなぁ・・。
"""
#レンダリングのためのリサイズ関数定義
resizeF = tvTrans.Compose([tvTrans.ToPILImage(),
                    tvTrans.Resize(40, interpolation=Image.CUBIC),
                    tvTrans.ToTensor()])

# This is based on the code from gym.
screen_width = 600

"""
 カートの位置計算関数
"""
def get_cart_location():
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART

"""
 スクリーンを取得する。
"""
def get_screen():
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))  
    # transposeは軸変換
    # 元画像は400:600:3の配列
    # [[[R,G,B][R,G,B][R,G,B]..][[R,G,B],[R,G,B],[R,G,B]...]...]と並んでいるのを
    # [[[R,R,R...][G,G,G..]][[B,B,B..]]と並べ替えている。たぶん・・・。
    # 変換後は3,400,600の配列
    # CxHxWのプレーン画像(CHW)
    screen = screen[:, 160:320]
    view_width = 320

    #ポールを中心に320:160の画像にしている
    cart_location = get_cart_location()
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    screen = screen[:, :, slice_range]
    
    # Float変換後、1に正規化して、Torchtensorに変換
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    # リサイズして、バッチ向けに1次元足した値(BCHW)を返却する。
    #unsqueezeが1次元足す関数
    return resizeF(screen).unsqueeze(0).to(device)


"""
 リサイズ画像のサンプル出力
 テストコードと思われる
"""
#env.reset()
#plt.figure()
#plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(), interpolation='none')
#plt.title('Example extracted screen')
#plt.show()


"""
トレーニング用のモデルとオプティマイザを生成して、ユーティリティを定義する。
select_action
 εグリーディ法でアクションを選択する。
 アクションを選択するランダム確立はEPS_STARTで始まりEPS_ENDに向けて
 指数関数的に下がります。
plot_durations
 最後の100回のエピソード(公式評価で使用された尺度)の平均と一緒に、
 エピソードの期間をプロットするヘルパーです。Google翻訳そのまま。
"""

BATCH_SIZE = 128        #ミニバッチのサイズ
GAMMA = 0.999           #時間割引率
                        #εグリーディ法でランダム確率を変化させるための
EPS_START = 0.9         #  開始値
EPS_END = 0.05          #  終了値
EPS_DECAY = 200         #  最大ステップ数と同じ値 
TARGET_UPDATE = 10      #モデルを更新する間隔


policy_net = DQN().to(device)       #学習途中の学習モデル
target_net = DQN().to(device)       #最終的な学習モデル
#(分けてる理由がわからない・・・eval/trainってしながらやるんじゃないの?)

#最終モデルと学習途中モデルを一致させる。
target_net.load_state_dict(policy_net.state_dict())

#最終的モデルは推論モードへ
target_net.eval()

#最適化(勾配法)の指定
#サンプルコードではRMSpropだけど、Adamもあるので、そちらにしてみる。
#数式いっぱいだけど、ここがきれいにまとまってる。
#https://qiita.com/tokkuman/items/1944c00415d129ca0ee9
#このあたりからΣがいっぱいのページを追いかける必要が出てきて頭いたーってなる。
#optimizer = optim.RMSprop(policy_net.parameters())     #元コードはこちら
optimizer = optim.Adam(policy_net.parameters())
memory = ReplayMemory(10000)

steps_done = 0

#アクションを選択。ステップ数が少ないとランダムで選ばれることが多い
def select_action(state):
    global steps_done
    sample = random.random()        #ランダム値0 <= r < 1
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
                                    
    steps_done += 1
    if sample > eps_threshold:      
        #閾値以上ならモデルからの生成値
        with torch.no_grad():       #no_gradは勾配計算を無効化する。推論時に使うらしい。evalと何がちがうの・・・?
            result1 = policy_net(state)        #推論して
            result2 = result1.max(1)           #max(最大化を計算する次元、https://pytorch.org/docs/master/torch.html#torch.max
            result3 = result2[1].view(1, 1)    #インデックスが行動となる。(たぶん)
            return result3
#            return policy_net(state).max(1)[1].view(1, 1)      #元コードはこちら
    else:
        #ランダムなアクション
        return torch.tensor([[random.randrange(2)]], device=device, dtype=torch.long)


#エピソード毎の結果(プロット用)
episode_durations = []

#結果プロット用コード
def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        display.clear_output(wait=True)
        display.display(plt.gcf())



"""
最後にトレーニングするためのコード

DQNのトレーニングQ関数を計算する部分
https://book.mynavi.jp/manatee/detail/id=89691
がわかりやすい。

以下Google翻訳
ここでは、最適化の1つのステップを実行する `` optimize_model``関数を見つけることができます。
最初にバッチをサンプリングし、すべてのテンソルを1つのものに連結し、
math: `Q(s_t、a_t)`および:math: `V(s_ {t + 1})= \ max_a Q(s_ { 1}、a) `を計算し、
それらを損失に結びつける。 defitionによって、次のように設定します。math: `s`は端末状態です。
ターゲットネットワークを使用して、安定性を高めるために:math: `V(s_ {t + 1}) 'を計算します。
ターゲットネットワークはほとんどの場合固定された状態になっていますが、ポリシーネットワークの重みが更新されることがあります。
これは通常、設定されたステップ数ですが、簡単にするためにエピソードを使用します。
"""


def optimize_model():
    # メモリサイズがミニバッチより小さい間は何もしない
    if len(memory) < BATCH_SIZE:
        return

    #ミニバッチ分のデータを取り出す。
    transitions = memory.sample(BATCH_SIZE)

    #stackoverflow見ろとのこと。(see http://stackoverflow.com/a/19343/3343043 for
    # [[state, action, state_next, reward][..]...BatchSize分]
    #→
    #[[state,state...BatchSize分][action,action...BatchSize分]...]に変換
    #transposeはnumpyじゃないから使えないらしい。

    batch = Transition(*zip(*transitions))

    #CARTPOLEがDoneになってなくて、NextStateがあるかどうかをチェックする。
    #map関数は配列要素に指定の関数を通した新しい配列を生成する関数
    #lambdaは匿名関数(ラムダ式)
    #この場合sを受け取って、s is not none というbool値に変換
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.uint8)


    #次ステップがあるやつのみ取り出す。(※1)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])

    #torch.catはリストを渡すと、そのリストの値をもったTensorに変換してくれる。
    #batchはTensorのリストでTensorではない。
    #そのためPyTorchで扱えるようにTensor型(内部にTensorのリストを持つ)に変換する。
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken
    # Q(s_t, a_t)を求める
    #policy_net(state_batch)を実行すると出力二つなので[[out1, out2][out1, out2]...]な感じになる。
    #gatherは、行列から、特定の列を選択して集約する関数
    #[[5,6][7,8]]という入力に対しして1次元化するとき[1,0]という値を渡すと
    #第一要素目は1,2要素目は0を抽出して[5,7]という値を引っ張ってくる。
    #・・・・・最初全然意味わからんかった。
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # max{Q(s_t+1, a)}値を求める。
    # 次の状態がない場合は0にしておくため、0のTensorをいったん作成する。
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    
    #Tensorの[]に0/1のマスク行列を渡すと、その要素だけ引っ張ってくるらしい。
    #next_state_valuesはBatchSizeだが、next_state_values[non_final_mask]ってするとnon_final_maskが1の項目だけってなる。
    #non_final_next_statesはすでに(※1)でNextStateあるもののみ抽出
    #.max(1)[0]についてはselect_actionメソッド見てね。
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()

    # 教師となるQ(s_t, a_t)値を求める
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # 損失関数を計算する。smooth_l1_lossは Huber loss
    #http://s0sem0y.hatenablog.com/entry/2017/06/19/084210
    # 損失関数はここがわかりやすい
    loss = nnFunc.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    #ネットワークの更新
    optimizer.zero_grad()       # 勾配をリセット
    loss.backward()             # 誤差逆伝播(バックプロパゲーション)を計算

    #パラメーターを-1< data < 1にする。
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()


"""
やっとメインのトレーニングループだぜ!

環境を初期化して、Tensorも初期化して、サンプリングもして、
実行し、次の画面と報酬(常に1)を観察し、
モデルを一度最適化します。
エピソードが終了でループを再開
num_episodesの初期値はテスト用に小さい値なので、大きくする必要があるよ。
"""

#num_episodes = 50
num_episodes = 1000
for i_episode in range(num_episodes):
    # 環境の初期化
    env.reset()
    last_screen = get_screen()
    current_screen = get_screen()
    state = current_screen - last_screen
    for t in count():
        # アクションを選択
        action = select_action(state)
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        # 新しい状態を取得
        last_screen = current_screen
        current_screen = get_screen()
        if not done:
            next_state = current_screen - last_screen
        else:
            next_state = None

        # メモリに保存
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # 最適化
        optimize_model()
        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break
    # optimize_model内で使用するtarget_netを最新の学習にアップデート
    if i_episode % TARGET_UPDATE == 0:
        #モデルを読み込む
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()


「MT4でFXを勝ち抜く研究をするブログ」で公開している無料インジケータは、こちらの一覧から。
インジケータ一覧

Twitterもよろしくお願いします。
https://twitter.com/mt4program
Trading View プロフィール

ブログランキングにご協力よろしくお願いします。m(._.)m
にほんブログ村 為替ブログ FX テクニカルトレード派へ
にほんブログ村

お約束ですが、本ブログは、投資に対する利益を約束する物ではありません。最終的には自己責任によるご判断よろしくお願いいたします。