シェルスクリプトマガジン

機械学習ことはじめ(Vol.77掲載)

筆者:川嶋 宏彰

本連載では、機械学習の基礎となるさまざまな手法の仕組みや、それらの手法のPythonでの利用方法を解説していきます。最終回となる今回は、ニューラルネットの仕組みと、基本的なモデルについて解説します。

シェルスクリプトマガジン Vol.77は以下のリンク先でご購入できます。

図3 単純パーセプトロンを学習するための関数を定義するPythonコード

import numpy as np

def h_step(x):
  """ ステップ関数: 0より大きければ1、それ以外は0 """
  return int(x > 0)
def train_perceptron(X, y, lr=0.5, max_it=20, random_state=None):
  """ パーセプトロンの学習アルゴリズム
      lr: 学習率 (learning rate)
      max_it: 最大反復回数 (maximum number of iterations)
  """
  N, d = X.shape  # データ数x次元
  X1 = np.c_[np.ones(N), X]  # 1だけの列を1列目に追加
  d1 = d + 1  # バイアス項込みの次元数
  np.random.seed(random_state)
  w = np.random.rand(d1)  # (w0, w1, w2)
  print('initial w', w)
  w_log = [w.copy()]  # 初期の重み係数
  info_log = [[-1] * 5]  # [it, i, y, o, y-o]
  for it in range(max_it):  # ① 反復 (iteration)
    print('--- iteration:', it)
    err = 0
    for i in range(N):  # ② 各データを入力
      x = X1[i, :]
      y_pred = h_step(np.dot(w, x))
      err += (y[i] - y_pred) ** 2
      print('yhat:', y_pred, 'y:', y[i])
      if y_pred != y[i]:
        w += lr * (y[i] - y_pred) * x  # ③ wを更新
      w_log.append(w.copy())
      info_log.append([it, i, y[i], y_pred, y[i] - y_pred])
    print('err:', err)
    if err == 0:
      print('## converged @ it=', it)
      break
  return w_log, info_log
def get_fourpoints(xor_flag=False):
  """4点のデータを準備
     xor_flag: 各データのラベルをどう設定するか
         True: 入力が異なる符号なら1, False: 入力が共に正なら1
  """
  X = np.array([[-1, -1], [-1, 1], [1, -1], [1, 1]])
  y = np.array([0, 1, 1, 0]) if xor_flag else np.array([0, 0, 0, 1])
  return X, y

図4 単純パーセプトロンの学習過程を可視化する関数を定義するPythonコード

import matplotlib.pyplot as plt
import seaborn as sns

plt.rcParams['font.size'] = 14
def plot_2dline_with_data(X, y, w, title='', i=-1):
  """ データと w0 + w1 x1 + w2 x2 = 0 をプロット """
  fig = plt.figure(figsize=(5, 5))
  sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, s=150)
  xlim = [-1.5, 1.5]
  ylim = [-1.5, 1.5]
  plt.xlim(xlim)
  plt.ylim(ylim)
  plt.xticks([-1, 1])
  plt.yticks([-1, 1])
  # w0 + w1*x1 + w2*x2 = 0 のプロット
  if w[2] == 0:  # w2 == 0 のとき: x1 = -w0/w1
    x2 = np.linspace(*ylim, 2)
    plt.plot([-w[0]/w[1]] * x2.size, x2, '-', linewidth=3, color='r')
  else:  # w2 != 0 のとき: x2 = -(w0 + w1*x1)/w2
    x1 = np.linspace(*xlim, 2)
    plt.plot(x1, -(w[0] + w[1]*x1)/w[2], '-', linewidth=3, color='r')
  if i >= 0: plt.scatter(X[i, 0], X[i, 1], s=300, facecolor='none', 
                         edgecolor='r', linewidth=2)
  plt.title(title)
  return fig

図5 単純パーセプトロンの学習と結果表示をするPythonコード

# データの読み込みと学習
xor_flag = False  # (★) True: XOR,  False: AND
X, y = get_fourpoints(xor_flag)  # 学習データを取得
print(f' X:\n{X},\n y: {y}')
w_log, info_log = train_perceptron(X, y, random_state=0)
# 学習過程の表示
for step in range(len(w_log)):
  title = 'it: {}, i: {}, y: {}, y_pred: {}, y - y_pred:{}'\
          .format(*info_log[step])
  print(title)
  w = w_log[step]
  it = info_log[step][0]
  i = info_log[step][1]
  print('w:', w)
  plot_flag = False if (it >= 3) and (it % 5 != 0) else True
  if plot_flag:
    plot_2dline_with_data(X, y, w, title, i)
    plt.show() 

図12 決定境界を可視化するための関数を定義するPythonコード

import seaborn as sns
import matplotlib as mpl

def plot_decision_boundary(X, y, clf, xylabels=None, palette=None, fig=None, ngrid=50):
  """ 分類器 clf の決定境界を描画 """
  if fig is None: fig = plt.figure(figsize=(5, 5))
  else: plt.figure(fig.number)
  # 2次元空間にグリッド点を準備
  xmin = X.min(axis=0)  # 各列の最小値
  xmax = X.max(axis=0)  # 各列の最大値
  xstep = [(xmax[j]-xmin[j]) / ngrid for j in range(2)]  # グリッドのステップ幅
  xmin = [xmin[j] - 8*xstep[j] for j in range(2)]  # 少し広めに
  xmax = [xmax[j] + 8*xstep[j] for j in range(2)]
  aranges = [np.arange(xmin[j], xmax[j] + xstep[j], xstep[j]) for j in range(2)]
  x0grid, x1grid = np.meshgrid(*aranges)
  # 各グリッド点でクラスを判定
  y_pred = clf.predict(np.c_[x0grid.ravel(), x1grid.ravel()])
  y_pred = y_pred.reshape(x0grid.shape)  # 2次元へ
  y_pred = np.searchsorted(np.unique(y_pred), y_pred)  # 値をindexへ
  clist = palette.values() if type(palette) is dict else palette
  cmap = mpl.colors.ListedColormap(clist) if palette is not None else None
  plt.contourf(x0grid, x1grid, y_pred, alpha=0.3, cmap=cmap)
  sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, palette=palette)
  plt.legend()
  plt.xlim([xmin[0], xmax[0]])
  plt.ylim([xmin[1], xmax[1]])
  if xylabels is not None:
    plt.xlabel(xylabels[0])
    plt.ylabel(xylabels[1])
  return fig

図13 多層パーセプトロンによる学習をするPythonコード

from sklearn.neural_network import MLPClassifier

clf = MLPClassifier(hidden_layer_sizes=3, learning_rate_init=0.05, random_state=0)
# XORデータの準備
xor_flag = True  # True: XOR,  False: AND
X, y = get_fourpoints(xor_flag)  # 学習データを取得
print(f' X:\n{X},\n y: {y}')
clf.fit(X, y)  # 学習(誤差逆伝播法)の実行
plot_decision_boundary(X, y, clf)  # 決定境界
plt.show()

図15 学習曲線や結合荷重などを表示するPythonコード

# 学習曲線の表示
plt.plot(range(1, clf.n_iter_ + 1), clf.loss_curve_)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim(0,)
plt.show()
# 構造を確認
print('n_outputs:', clf.n_outputs_)  # 出力層の素子数
print('n_layers (with input layer):', clf.n_layers_)  # 入力層を含めた層の数
# 結合荷重
print('coefs:\n', clf.coefs_)  # バイアス項以外の結合荷重(行列のリスト)
print('intercepts:\n', clf.intercepts_)  # バイアス項(ベクトルのリスト)

図17 Breast Cancerデータセットの準備をするPythonコード

from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.preprocessing import StandardScaler

data = load_breast_cancer()  # データセット読み込み
X_orig = data['data']
y = 1 - data['target']  # 悪性を1、良性を0とする正解ラベル
X = X_orig[:, :10]  # 一部の特徴量を利用する
# 訓練データとテストデータに2分割する(検証データは今回は切り分けない)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
                                                    random_state=1)
# ROCカーブの表示用に関数を準備
def plot_roc(fpr, tpr, marker=None):
    plt.figure(figsize=(5, 5))
    plt.plot(fpr, tpr, marker=marker)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.grid()

図18 Breast Cancerデータセットによる学習と評価をするPythonコード

clf = MLPClassifier(hidden_layer_sizes=(5, 3), learning_rate_init=0.005, random_state=1)
with_scaling = False  # (★)標準化するか否か
if with_scaling:  # 標準化によるスケーリングあり
  scaler = StandardScaler().fit(X_train)  # 訓練データで平均と標準偏差を求める
  Xscaled_train = scaler.transform(X_train)  # X_train -> Xscaled_train
  Xscaled_test = scaler.transform(X_test)  # X_test -> Xscaled_test
  clf.fit(Xscaled_train, y_train)  # 学習
  y_proba_train = clf.predict_proba(Xscaled_train)  # 訓練データで事後確率予測
  y_proba_test = clf.predict_proba(Xscaled_test)  # テストデータ事後確率予測
else:  # 標準化なし(そのまま)
  clf.fit(X_train, y_train)  # 学習
  y_proba_train = clf.predict_proba(X_train)  # 訓練データで事後確率予測
  y_proba_test = clf.predict_proba(X_test)  # テストデータ事後確率予測
# テストデータに対するROCカーブ
fpr, tpr, threshold = roc_curve(y_test, y_proba_test[:, 1])
plot_roc(fpr, tpr)
plt.show()
# 訓練・テストデータのAUC(Area Under the Curve)スコア
print('AUC (train):', roc_auc_score(y_train, y_proba_train[:, 1]))
print('AUC (test):', roc_auc_score(y_test, y_proba_test[:, 1]))

図21 PyTorchによる画像認識の準備をするPythonコード

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets
from torchvision import transforms as transforms
import matplotlib.pyplot as plt
import numpy as np

# GPUが使えるか否かでデータの転送先を指定
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
img_mean, img_std = (0.1307, 0.3081)  # 画素値の標準化用
# 標準化などの前処理を設定
trans = transforms.Compose([
  transforms.ToTensor(),
  transforms.Normalize((img_mean,), (img_std,))
])
# データセットのダウンロード
rootdir = './data'  # ダウンロード先
train_dataset = datasets.MNIST(root=rootdir, train=True,
  transform=trans, download=True)
test_dataset = datasets.MNIST(root=rootdir, train=False,
  transform=trans, download=True)
# データを読み込む専用のクラスDataLoaderを準備
batch_size = 32
train_loader = torch.utils.data.DataLoader(
  dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(
  dataset=test_dataset, batch_size=batch_size, shuffle=False)
# クラス情報
num_classes = len(train_dataset.classes)
cls_names = [f'{i}' for i in range (num_classes)]
# 画像表示用の関数を定義しておく
def tensor_imshow(img, title=None):
  """ Tensorを画像として表示 """
  img = img.numpy().transpose((1, 2, 0))  # (C,H,W)->(H,W,C)
  img = img_std * img + img_mean
  plt.imshow(np.clip(img, 0, 1))
  if title is not None:
    plt.title(title)
  plt.show()
def test_with_examples(model, loader):
  """ loaderのミニバッチの画像を予測結果と共にグリッド表示
    model: 予測に用いるニューラルネット
    loader: DataLoader
  """
  model.eval()  # 推論モード
  with torch.no_grad():  # 推論のみ(勾配計算なし)
    imgs, labels = next(iter(loader))  # ミニバッチ取得
    imgs_in = imgs.view(-1, imgs.shape[2]*imgs.shape[3])
    outputs = model(imgs_in.to(device))  # 順伝播による推論
  _, pred = torch.max(outputs, 1)  # 予測ラベル
  grid = torchvision.utils.make_grid(imgs)  # グリッド画像生成
  title_str1 = '\n'.join(
    [', '.join([cls_names[y] for y in x]) 
      for x in pred.view(-1, 8).tolist()])
  title_str2 = '\n'.join(
    [', '.join([cls_names[y] for y in x]) 
      for x in labels.view(-1, 8).tolist()])
  tensor_imshow(grid, title='Predicted classes:\n'
    + f'{title_str1}\n\nTrue classes:\n{title_str2}\n')
# 訓練データ例の表示
print('\n--- Training data (example) ---\n')
imgs, labels = next(iter(train_loader))
grid = torchvision.utils.make_grid(imgs)
title_str = '\n'.join([', '.join([cls_names[y] for y in x])
  for x in labels.view(-1, 8).tolist()])
tensor_imshow(grid, title=f'True classes:\n{title_str}\n')
# ニューラルネットの構造を定義
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.fc1 = nn.Linear(28*28, 512)
    self.fc2 = nn.Linear(512, 256)
    self.fc3 = nn.Linear(256, num_classes)
  def forward(self, x):
    x = F.relu(self.fc1(x))
    x = F.relu(self.fc2(x))
    x = self.fc3(x)  # 活性化関数なし(損失関数側でsoftmax)
    return x
net = Net()  # インスタンス生成
net = net.to(device)  # モデルをGPUへ転送(もしGPUがあれば)
# 学習前にテストデータを入力してみる
print('\n--- Result BEFORE training ---\n')
test_with_examples(net, test_loader)

図22 PyTorchによるニューラルネットの学習と結果表示をするPythonコード

# 損失関数(softmax + cross entropy)
criterion = nn.CrossEntropyLoss()
# 最適化手法の選択
optimizer = torch.optim.SGD(net.parameters(), lr=0.01)
# 学習のループ
num_epochs = 5
train_loss_log = []
train_acc_log = []
for epoch in range(num_epochs):
  print(f'Epoch {epoch + 1}/{num_epochs}\n', '-' * 10)
  # 訓練フェーズ
  sum_loss, sum_ok = [0.0, 0]
  for inputs, labels in train_loader:  # ミニバッチ入力
    inputs = inputs.view(-1, 28*28).to(device)
    labels = labels.to(device)  # deviceに転送
    outputs = net(inputs)  # 順伝播
    loss = criterion(outputs, labels)  # 損失計算
    optimizer.zero_grad()  # 勾配をクリア
    loss.backward()  # 誤差逆伝播
    optimizer.step()  # パラメータ更新
    # ミニバッチの評価値(1バッチ分)を計算
    _, preds = torch.max(outputs, 1)  # 予測ラベル
    sum_loss += loss.item() * inputs.size(0)  # 損失
    sum_ok += torch.sum(preds == labels.data)  # 正解数
  # 1エポック分の評価値を計算
  e_loss = sum_loss / len(train_dataset)  # 平均損失
  e_acc = sum_ok.cpu().double() / len(train_dataset)  # 正解率
  print(f'Loss: {e_loss:.4f} Acc: {e_acc:.4f}')
  train_loss_log.append(e_loss)
  train_acc_log.append(e_acc)
# 学習曲線をプロット
fig = plt.figure(figsize=(12, 6))
fig.add_subplot(121)  # 損失
plt.plot(range(1, num_epochs + 1), train_loss_log, '.-')
plt.title('Training data')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim(0)
fig.add_subplot(122)  # 正解率
plt.plot(range(1, num_epochs + 1), train_acc_log, '.-')
plt.title('Training data')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.show()
# テストデータを入力してみる
print('\n--- Result AFTER training ---\n')
test_with_examples(net, test_loader)