筆者:川嶋 宏彰
本連載では、機械学習の基礎となるさまざまな手法の仕組みや、それらの手法のPythonでの利用方法を解説していきます。最終回となる今回は、ニューラルネットの仕組みと、基本的なモデルについて解説します。
シェルスクリプトマガジン Vol.77は以下のリンク先でご購入できます。
図3 単純パーセプトロンを学習するための関数を定義するPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
import numpy as np def h_step(x): """ ステップ関数: 0より大きければ1、それ以外は0 """ return int(x > 0) def train_perceptron(X, y, lr=0.5, max_it=20, random_state=None): """ パーセプトロンの学習アルゴリズム lr: 学習率 (learning rate) max_it: 最大反復回数 (maximum number of iterations) """ N, d = X.shape # データ数x次元 X1 = np.c_[np.ones(N), X] # 1だけの列を1列目に追加 d1 = d + 1 # バイアス項込みの次元数 np.random.seed(random_state) w = np.random.rand(d1) # (w0, w1, w2) print('initial w', w) w_log = [w.copy()] # 初期の重み係数 info_log = [[-1] * 5] # [it, i, y, o, y-o] for it in range(max_it): # ① 反復 (iteration) print('--- iteration:', it) err = 0 for i in range(N): # ② 各データを入力 x = X1[i, :] y_pred = h_step(np.dot(w, x)) err += (y[i] - y_pred) ** 2 print('yhat:', y_pred, 'y:', y[i]) if y_pred != y[i]: w += lr * (y[i] - y_pred) * x # ③ wを更新 w_log.append(w.copy()) info_log.append([it, i, y[i], y_pred, y[i] - y_pred]) print('err:', err) if err == 0: print('## converged @ it=', it) break return w_log, info_log def get_fourpoints(xor_flag=False): """4点のデータを準備 xor_flag: 各データのラベルをどう設定するか True: 入力が異なる符号なら1, False: 入力が共に正なら1 """ X = np.array([[-1, -1], [-1, 1], [1, -1], [1, 1]]) y = np.array([0, 1, 1, 0]) if xor_flag else np.array([0, 0, 0, 1]) return X, y |
図4 単純パーセプトロンの学習過程を可視化する関数を定義するPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
import matplotlib.pyplot as plt import seaborn as sns plt.rcParams['font.size'] = 14 def plot_2dline_with_data(X, y, w, title='', i=-1): """ データと w0 + w1 x1 + w2 x2 = 0 をプロット """ fig = plt.figure(figsize=(5, 5)) sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, s=150) xlim = [-1.5, 1.5] ylim = [-1.5, 1.5] plt.xlim(xlim) plt.ylim(ylim) plt.xticks([-1, 1]) plt.yticks([-1, 1]) # w0 + w1*x1 + w2*x2 = 0 のプロット if w[2] == 0: # w2 == 0 のとき: x1 = -w0/w1 x2 = np.linspace(*ylim, 2) plt.plot([-w[0]/w[1]] * x2.size, x2, '-', linewidth=3, color='r') else: # w2 != 0 のとき: x2 = -(w0 + w1*x1)/w2 x1 = np.linspace(*xlim, 2) plt.plot(x1, -(w[0] + w[1]*x1)/w[2], '-', linewidth=3, color='r') if i >= 0: plt.scatter(X[i, 0], X[i, 1], s=300, facecolor='none', edgecolor='r', linewidth=2) plt.title(title) return fig |
図5 単純パーセプトロンの学習と結果表示をするPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
# データの読み込みと学習 xor_flag = False # (★) True: XOR, False: AND X, y = get_fourpoints(xor_flag) # 学習データを取得 print(f' X:\n{X},\n y: {y}') w_log, info_log = train_perceptron(X, y, random_state=0) # 学習過程の表示 for step in range(len(w_log)): title = 'it: {}, i: {}, y: {}, y_pred: {}, y - y_pred:{}'\ .format(*info_log[step]) print(title) w = w_log[step] it = info_log[step][0] i = info_log[step][1] print('w:', w) plot_flag = False if (it >= 3) and (it % 5 != 0) else True if plot_flag: plot_2dline_with_data(X, y, w, title, i) plt.show() |
図12 決定境界を可視化するための関数を定義するPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
import seaborn as sns import matplotlib as mpl def plot_decision_boundary(X, y, clf, xylabels=None, palette=None, fig=None, ngrid=50): """ 分類器 clf の決定境界を描画 """ if fig is None: fig = plt.figure(figsize=(5, 5)) else: plt.figure(fig.number) # 2次元空間にグリッド点を準備 xmin = X.min(axis=0) # 各列の最小値 xmax = X.max(axis=0) # 各列の最大値 xstep = [(xmax[j]-xmin[j]) / ngrid for j in range(2)] # グリッドのステップ幅 xmin = [xmin[j] - 8*xstep[j] for j in range(2)] # 少し広めに xmax = [xmax[j] + 8*xstep[j] for j in range(2)] aranges = [np.arange(xmin[j], xmax[j] + xstep[j], xstep[j]) for j in range(2)] x0grid, x1grid = np.meshgrid(*aranges) # 各グリッド点でクラスを判定 y_pred = clf.predict(np.c_[x0grid.ravel(), x1grid.ravel()]) y_pred = y_pred.reshape(x0grid.shape) # 2次元へ y_pred = np.searchsorted(np.unique(y_pred), y_pred) # 値をindexへ clist = palette.values() if type(palette) is dict else palette cmap = mpl.colors.ListedColormap(clist) if palette is not None else None plt.contourf(x0grid, x1grid, y_pred, alpha=0.3, cmap=cmap) sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, palette=palette) plt.legend() plt.xlim([xmin[0], xmax[0]]) plt.ylim([xmin[1], xmax[1]]) if xylabels is not None: plt.xlabel(xylabels[0]) plt.ylabel(xylabels[1]) return fig |
図13 多層パーセプトロンによる学習をするPythonコード
|
from sklearn.neural_network import MLPClassifier clf = MLPClassifier(hidden_layer_sizes=3, learning_rate_init=0.05, random_state=0) # XORデータの準備 xor_flag = True # True: XOR, False: AND X, y = get_fourpoints(xor_flag) # 学習データを取得 print(f' X:\n{X},\n y: {y}') clf.fit(X, y) # 学習(誤差逆伝播法)の実行 plot_decision_boundary(X, y, clf) # 決定境界 plt.show() |
図15 学習曲線や結合荷重などを表示するPythonコード
|
# 学習曲線の表示 plt.plot(range(1, clf.n_iter_ + 1), clf.loss_curve_) plt.xlabel('Epoch') plt.ylabel('Loss') plt.ylim(0,) plt.show() # 構造を確認 print('n_outputs:', clf.n_outputs_) # 出力層の素子数 print('n_layers (with input layer):', clf.n_layers_) # 入力層を含めた層の数 # 結合荷重 print('coefs:\n', clf.coefs_) # バイアス項以外の結合荷重(行列のリスト) print('intercepts:\n', clf.intercepts_) # バイアス項(ベクトルのリスト) |
図17 Breast Cancerデータセットの準備をするPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
from sklearn.datasets import load_breast_cancer from sklearn.model_selection import train_test_split from sklearn.metrics import roc_curve, roc_auc_score from sklearn.preprocessing import StandardScaler data = load_breast_cancer() # データセット読み込み X_orig = data['data'] y = 1 - data['target'] # 悪性を1、良性を0とする正解ラベル X = X_orig[:, :10] # 一部の特徴量を利用する # 訓練データとテストデータに2分割する(検証データは今回は切り分けない) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1) # ROCカーブの表示用に関数を準備 def plot_roc(fpr, tpr, marker=None): plt.figure(figsize=(5, 5)) plt.plot(fpr, tpr, marker=marker) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.grid() |
図18 Breast Cancerデータセットによる学習と評価をするPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
clf = MLPClassifier(hidden_layer_sizes=(5, 3), learning_rate_init=0.005, random_state=1) with_scaling = False # (★)標準化するか否か if with_scaling: # 標準化によるスケーリングあり scaler = StandardScaler().fit(X_train) # 訓練データで平均と標準偏差を求める Xscaled_train = scaler.transform(X_train) # X_train -> Xscaled_train Xscaled_test = scaler.transform(X_test) # X_test -> Xscaled_test clf.fit(Xscaled_train, y_train) # 学習 y_proba_train = clf.predict_proba(Xscaled_train) # 訓練データで事後確率予測 y_proba_test = clf.predict_proba(Xscaled_test) # テストデータ事後確率予測 else: # 標準化なし(そのまま) clf.fit(X_train, y_train) # 学習 y_proba_train = clf.predict_proba(X_train) # 訓練データで事後確率予測 y_proba_test = clf.predict_proba(X_test) # テストデータ事後確率予測 # テストデータに対するROCカーブ fpr, tpr, threshold = roc_curve(y_test, y_proba_test[:, 1]) plot_roc(fpr, tpr) plt.show() # 訓練・テストデータのAUC(Area Under the Curve)スコア print('AUC (train):', roc_auc_score(y_train, y_proba_train[:, 1])) print('AUC (test):', roc_auc_score(y_test, y_proba_test[:, 1])) |
図21 PyTorchによる画像認識の準備をするPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
import torch import torch.nn as nn import torch.nn.functional as F import torchvision from torchvision import datasets from torchvision import transforms as transforms import matplotlib.pyplot as plt import numpy as np # GPUが使えるか否かでデータの転送先を指定 device = 'cuda:0' if torch.cuda.is_available() else 'cpu' img_mean, img_std = (0.1307, 0.3081) # 画素値の標準化用 # 標準化などの前処理を設定 trans = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((img_mean,), (img_std,)) ]) # データセットのダウンロード rootdir = './data' # ダウンロード先 train_dataset = datasets.MNIST(root=rootdir, train=True, transform=trans, download=True) test_dataset = datasets.MNIST(root=rootdir, train=False, transform=trans, download=True) # データを読み込む専用のクラスDataLoaderを準備 batch_size = 32 train_loader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=batch_size, shuffle=True) test_loader = torch.utils.data.DataLoader( dataset=test_dataset, batch_size=batch_size, shuffle=False) # クラス情報 num_classes = len(train_dataset.classes) cls_names = [f'{i}' for i in range (num_classes)] # 画像表示用の関数を定義しておく def tensor_imshow(img, title=None): """ Tensorを画像として表示 """ img = img.numpy().transpose((1, 2, 0)) # (C,H,W)->(H,W,C) img = img_std * img + img_mean plt.imshow(np.clip(img, 0, 1)) if title is not None: plt.title(title) plt.show() def test_with_examples(model, loader): """ loaderのミニバッチの画像を予測結果と共にグリッド表示 model: 予測に用いるニューラルネット loader: DataLoader """ model.eval() # 推論モード with torch.no_grad(): # 推論のみ(勾配計算なし) imgs, labels = next(iter(loader)) # ミニバッチ取得 imgs_in = imgs.view(-1, imgs.shape[2]*imgs.shape[3]) outputs = model(imgs_in.to(device)) # 順伝播による推論 _, pred = torch.max(outputs, 1) # 予測ラベル grid = torchvision.utils.make_grid(imgs) # グリッド画像生成 title_str1 = '\n'.join( [', '.join([cls_names[y] for y in x]) for x in pred.view(-1, 8).tolist()]) title_str2 = '\n'.join( [', '.join([cls_names[y] for y in x]) for x in labels.view(-1, 8).tolist()]) tensor_imshow(grid, title='Predicted classes:\n' + f'{title_str1}\n\nTrue classes:\n{title_str2}\n') # 訓練データ例の表示 print('\n--- Training data (example) ---\n') imgs, labels = next(iter(train_loader)) grid = torchvision.utils.make_grid(imgs) title_str = '\n'.join([', '.join([cls_names[y] for y in x]) for x in labels.view(-1, 8).tolist()]) tensor_imshow(grid, title=f'True classes:\n{title_str}\n') # ニューラルネットの構造を定義 class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.fc1 = nn.Linear(28*28, 512) self.fc2 = nn.Linear(512, 256) self.fc3 = nn.Linear(256, num_classes) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) # 活性化関数なし(損失関数側でsoftmax) return x net = Net() # インスタンス生成 net = net.to(device) # モデルをGPUへ転送(もしGPUがあれば) # 学習前にテストデータを入力してみる print('\n--- Result BEFORE training ---\n') test_with_examples(net, test_loader) |
図22 PyTorchによるニューラルネットの学習と結果表示をするPythonコード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
# 損失関数(softmax + cross entropy) criterion = nn.CrossEntropyLoss() # 最適化手法の選択 optimizer = torch.optim.SGD(net.parameters(), lr=0.01) # 学習のループ num_epochs = 5 train_loss_log = [] train_acc_log = [] for epoch in range(num_epochs): print(f'Epoch {epoch + 1}/{num_epochs}\n', '-' * 10) # 訓練フェーズ sum_loss, sum_ok = [0.0, 0] for inputs, labels in train_loader: # ミニバッチ入力 inputs = inputs.view(-1, 28*28).to(device) labels = labels.to(device) # deviceに転送 outputs = net(inputs) # 順伝播 loss = criterion(outputs, labels) # 損失計算 optimizer.zero_grad() # 勾配をクリア loss.backward() # 誤差逆伝播 optimizer.step() # パラメータ更新 # ミニバッチの評価値(1バッチ分)を計算 _, preds = torch.max(outputs, 1) # 予測ラベル sum_loss += loss.item() * inputs.size(0) # 損失 sum_ok += torch.sum(preds == labels.data) # 正解数 # 1エポック分の評価値を計算 e_loss = sum_loss / len(train_dataset) # 平均損失 e_acc = sum_ok.cpu().double() / len(train_dataset) # 正解率 print(f'Loss: {e_loss:.4f} Acc: {e_acc:.4f}') train_loss_log.append(e_loss) train_acc_log.append(e_acc) # 学習曲線をプロット fig = plt.figure(figsize=(12, 6)) fig.add_subplot(121) # 損失 plt.plot(range(1, num_epochs + 1), train_loss_log, '.-') plt.title('Training data') plt.xlabel('Epoch') plt.ylabel('Loss') plt.ylim(0) fig.add_subplot(122) # 正解率 plt.plot(range(1, num_epochs + 1), train_acc_log, '.-') plt.title('Training data') plt.xlabel('Epoch') plt.ylabel('Accuracy') plt.show() # テストデータを入力してみる print('\n--- Result AFTER training ---\n') test_with_examples(net, test_loader) |