筆者:川嶋 宏彰
本連載では、機械学習の基礎となるさまざまな手法の仕組みや、それらの手法のPythonでの利用方法を解説していきます。最終回となる今回は、ニューラルネットの仕組みと、基本的なモデルについて解説します。
シェルスクリプトマガジン Vol.77は以下のリンク先でご購入できます。


図3 単純パーセプトロンを学習するための関数を定義するPythonコード
import numpy as np
def h_step(x):
""" ステップ関数: 0より大きければ1、それ以外は0 """
return int(x > 0)
def train_perceptron(X, y, lr=0.5, max_it=20, random_state=None):
""" パーセプトロンの学習アルゴリズム
lr: 学習率 (learning rate)
max_it: 最大反復回数 (maximum number of iterations)
"""
N, d = X.shape # データ数x次元
X1 = np.c_[np.ones(N), X] # 1だけの列を1列目に追加
d1 = d + 1 # バイアス項込みの次元数
np.random.seed(random_state)
w = np.random.rand(d1) # (w0, w1, w2)
print('initial w', w)
w_log = [w.copy()] # 初期の重み係数
info_log = [[-1] * 5] # [it, i, y, o, y-o]
for it in range(max_it): # ① 反復 (iteration)
print('--- iteration:', it)
err = 0
for i in range(N): # ② 各データを入力
x = X1[i, :]
y_pred = h_step(np.dot(w, x))
err += (y[i] - y_pred) ** 2
print('yhat:', y_pred, 'y:', y[i])
if y_pred != y[i]:
w += lr * (y[i] - y_pred) * x # ③ wを更新
w_log.append(w.copy())
info_log.append([it, i, y[i], y_pred, y[i] - y_pred])
print('err:', err)
if err == 0:
print('## converged @ it=', it)
break
return w_log, info_log
def get_fourpoints(xor_flag=False):
"""4点のデータを準備
xor_flag: 各データのラベルをどう設定するか
True: 入力が異なる符号なら1, False: 入力が共に正なら1
"""
X = np.array([[-1, -1], [-1, 1], [1, -1], [1, 1]])
y = np.array([0, 1, 1, 0]) if xor_flag else np.array([0, 0, 0, 1])
return X, y
図4 単純パーセプトロンの学習過程を可視化する関数を定義するPythonコード
import matplotlib.pyplot as plt
import seaborn as sns
plt.rcParams['font.size'] = 14
def plot_2dline_with_data(X, y, w, title='', i=-1):
""" データと w0 + w1 x1 + w2 x2 = 0 をプロット """
fig = plt.figure(figsize=(5, 5))
sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, s=150)
xlim = [-1.5, 1.5]
ylim = [-1.5, 1.5]
plt.xlim(xlim)
plt.ylim(ylim)
plt.xticks([-1, 1])
plt.yticks([-1, 1])
# w0 + w1*x1 + w2*x2 = 0 のプロット
if w[2] == 0: # w2 == 0 のとき: x1 = -w0/w1
x2 = np.linspace(*ylim, 2)
plt.plot([-w[0]/w[1]] * x2.size, x2, '-', linewidth=3, color='r')
else: # w2 != 0 のとき: x2 = -(w0 + w1*x1)/w2
x1 = np.linspace(*xlim, 2)
plt.plot(x1, -(w[0] + w[1]*x1)/w[2], '-', linewidth=3, color='r')
if i >= 0: plt.scatter(X[i, 0], X[i, 1], s=300, facecolor='none',
edgecolor='r', linewidth=2)
plt.title(title)
return fig
図5 単純パーセプトロンの学習と結果表示をするPythonコード
# データの読み込みと学習
xor_flag = False # (★) True: XOR, False: AND
X, y = get_fourpoints(xor_flag) # 学習データを取得
print(f' X:\n{X},\n y: {y}')
w_log, info_log = train_perceptron(X, y, random_state=0)
# 学習過程の表示
for step in range(len(w_log)):
title = 'it: {}, i: {}, y: {}, y_pred: {}, y - y_pred:{}'\
.format(*info_log[step])
print(title)
w = w_log[step]
it = info_log[step][0]
i = info_log[step][1]
print('w:', w)
plot_flag = False if (it >= 3) and (it % 5 != 0) else True
if plot_flag:
plot_2dline_with_data(X, y, w, title, i)
plt.show()
図12 決定境界を可視化するための関数を定義するPythonコード
import seaborn as sns
import matplotlib as mpl
def plot_decision_boundary(X, y, clf, xylabels=None, palette=None, fig=None, ngrid=50):
""" 分類器 clf の決定境界を描画 """
if fig is None: fig = plt.figure(figsize=(5, 5))
else: plt.figure(fig.number)
# 2次元空間にグリッド点を準備
xmin = X.min(axis=0) # 各列の最小値
xmax = X.max(axis=0) # 各列の最大値
xstep = [(xmax[j]-xmin[j]) / ngrid for j in range(2)] # グリッドのステップ幅
xmin = [xmin[j] - 8*xstep[j] for j in range(2)] # 少し広めに
xmax = [xmax[j] + 8*xstep[j] for j in range(2)]
aranges = [np.arange(xmin[j], xmax[j] + xstep[j], xstep[j]) for j in range(2)]
x0grid, x1grid = np.meshgrid(*aranges)
# 各グリッド点でクラスを判定
y_pred = clf.predict(np.c_[x0grid.ravel(), x1grid.ravel()])
y_pred = y_pred.reshape(x0grid.shape) # 2次元へ
y_pred = np.searchsorted(np.unique(y_pred), y_pred) # 値をindexへ
clist = palette.values() if type(palette) is dict else palette
cmap = mpl.colors.ListedColormap(clist) if palette is not None else None
plt.contourf(x0grid, x1grid, y_pred, alpha=0.3, cmap=cmap)
sns.scatterplot(x=X[:, 0], y=X[:, 1], hue=y, palette=palette)
plt.legend()
plt.xlim([xmin[0], xmax[0]])
plt.ylim([xmin[1], xmax[1]])
if xylabels is not None:
plt.xlabel(xylabels[0])
plt.ylabel(xylabels[1])
return fig
図13 多層パーセプトロンによる学習をするPythonコード
from sklearn.neural_network import MLPClassifier
clf = MLPClassifier(hidden_layer_sizes=3, learning_rate_init=0.05, random_state=0)
# XORデータの準備
xor_flag = True # True: XOR, False: AND
X, y = get_fourpoints(xor_flag) # 学習データを取得
print(f' X:\n{X},\n y: {y}')
clf.fit(X, y) # 学習(誤差逆伝播法)の実行
plot_decision_boundary(X, y, clf) # 決定境界
plt.show()
図15 学習曲線や結合荷重などを表示するPythonコード
# 学習曲線の表示
plt.plot(range(1, clf.n_iter_ + 1), clf.loss_curve_)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim(0,)
plt.show()
# 構造を確認
print('n_outputs:', clf.n_outputs_) # 出力層の素子数
print('n_layers (with input layer):', clf.n_layers_) # 入力層を含めた層の数
# 結合荷重
print('coefs:\n', clf.coefs_) # バイアス項以外の結合荷重(行列のリスト)
print('intercepts:\n', clf.intercepts_) # バイアス項(ベクトルのリスト)
図17 Breast Cancerデータセットの準備をするPythonコード
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.preprocessing import StandardScaler
data = load_breast_cancer() # データセット読み込み
X_orig = data['data']
y = 1 - data['target'] # 悪性を1、良性を0とする正解ラベル
X = X_orig[:, :10] # 一部の特徴量を利用する
# 訓練データとテストデータに2分割する(検証データは今回は切り分けない)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
random_state=1)
# ROCカーブの表示用に関数を準備
def plot_roc(fpr, tpr, marker=None):
plt.figure(figsize=(5, 5))
plt.plot(fpr, tpr, marker=marker)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.grid()
図18 Breast Cancerデータセットによる学習と評価をするPythonコード
clf = MLPClassifier(hidden_layer_sizes=(5, 3), learning_rate_init=0.005, random_state=1)
with_scaling = False # (★)標準化するか否か
if with_scaling: # 標準化によるスケーリングあり
scaler = StandardScaler().fit(X_train) # 訓練データで平均と標準偏差を求める
Xscaled_train = scaler.transform(X_train) # X_train -> Xscaled_train
Xscaled_test = scaler.transform(X_test) # X_test -> Xscaled_test
clf.fit(Xscaled_train, y_train) # 学習
y_proba_train = clf.predict_proba(Xscaled_train) # 訓練データで事後確率予測
y_proba_test = clf.predict_proba(Xscaled_test) # テストデータ事後確率予測
else: # 標準化なし(そのまま)
clf.fit(X_train, y_train) # 学習
y_proba_train = clf.predict_proba(X_train) # 訓練データで事後確率予測
y_proba_test = clf.predict_proba(X_test) # テストデータ事後確率予測
# テストデータに対するROCカーブ
fpr, tpr, threshold = roc_curve(y_test, y_proba_test[:, 1])
plot_roc(fpr, tpr)
plt.show()
# 訓練・テストデータのAUC(Area Under the Curve)スコア
print('AUC (train):', roc_auc_score(y_train, y_proba_train[:, 1]))
print('AUC (test):', roc_auc_score(y_test, y_proba_test[:, 1]))
図21 PyTorchによる画像認識の準備をするPythonコード
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets
from torchvision import transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
# GPUが使えるか否かでデータの転送先を指定
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
img_mean, img_std = (0.1307, 0.3081) # 画素値の標準化用
# 標準化などの前処理を設定
trans = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((img_mean,), (img_std,))
])
# データセットのダウンロード
rootdir = './data' # ダウンロード先
train_dataset = datasets.MNIST(root=rootdir, train=True,
transform=trans, download=True)
test_dataset = datasets.MNIST(root=rootdir, train=False,
transform=trans, download=True)
# データを読み込む専用のクラスDataLoaderを準備
batch_size = 32
train_loader = torch.utils.data.DataLoader(
dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(
dataset=test_dataset, batch_size=batch_size, shuffle=False)
# クラス情報
num_classes = len(train_dataset.classes)
cls_names = [f'{i}' for i in range (num_classes)]
# 画像表示用の関数を定義しておく
def tensor_imshow(img, title=None):
""" Tensorを画像として表示 """
img = img.numpy().transpose((1, 2, 0)) # (C,H,W)->(H,W,C)
img = img_std * img + img_mean
plt.imshow(np.clip(img, 0, 1))
if title is not None:
plt.title(title)
plt.show()
def test_with_examples(model, loader):
""" loaderのミニバッチの画像を予測結果と共にグリッド表示
model: 予測に用いるニューラルネット
loader: DataLoader
"""
model.eval() # 推論モード
with torch.no_grad(): # 推論のみ(勾配計算なし)
imgs, labels = next(iter(loader)) # ミニバッチ取得
imgs_in = imgs.view(-1, imgs.shape[2]*imgs.shape[3])
outputs = model(imgs_in.to(device)) # 順伝播による推論
_, pred = torch.max(outputs, 1) # 予測ラベル
grid = torchvision.utils.make_grid(imgs) # グリッド画像生成
title_str1 = '\n'.join(
[', '.join([cls_names[y] for y in x])
for x in pred.view(-1, 8).tolist()])
title_str2 = '\n'.join(
[', '.join([cls_names[y] for y in x])
for x in labels.view(-1, 8).tolist()])
tensor_imshow(grid, title='Predicted classes:\n'
+ f'{title_str1}\n\nTrue classes:\n{title_str2}\n')
# 訓練データ例の表示
print('\n--- Training data (example) ---\n')
imgs, labels = next(iter(train_loader))
grid = torchvision.utils.make_grid(imgs)
title_str = '\n'.join([', '.join([cls_names[y] for y in x])
for x in labels.view(-1, 8).tolist()])
tensor_imshow(grid, title=f'True classes:\n{title_str}\n')
# ニューラルネットの構造を定義
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(28*28, 512)
self.fc2 = nn.Linear(512, 256)
self.fc3 = nn.Linear(256, num_classes)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x) # 活性化関数なし(損失関数側でsoftmax)
return x
net = Net() # インスタンス生成
net = net.to(device) # モデルをGPUへ転送(もしGPUがあれば)
# 学習前にテストデータを入力してみる
print('\n--- Result BEFORE training ---\n')
test_with_examples(net, test_loader)
図22 PyTorchによるニューラルネットの学習と結果表示をするPythonコード
# 損失関数(softmax + cross entropy)
criterion = nn.CrossEntropyLoss()
# 最適化手法の選択
optimizer = torch.optim.SGD(net.parameters(), lr=0.01)
# 学習のループ
num_epochs = 5
train_loss_log = []
train_acc_log = []
for epoch in range(num_epochs):
print(f'Epoch {epoch + 1}/{num_epochs}\n', '-' * 10)
# 訓練フェーズ
sum_loss, sum_ok = [0.0, 0]
for inputs, labels in train_loader: # ミニバッチ入力
inputs = inputs.view(-1, 28*28).to(device)
labels = labels.to(device) # deviceに転送
outputs = net(inputs) # 順伝播
loss = criterion(outputs, labels) # 損失計算
optimizer.zero_grad() # 勾配をクリア
loss.backward() # 誤差逆伝播
optimizer.step() # パラメータ更新
# ミニバッチの評価値(1バッチ分)を計算
_, preds = torch.max(outputs, 1) # 予測ラベル
sum_loss += loss.item() * inputs.size(0) # 損失
sum_ok += torch.sum(preds == labels.data) # 正解数
# 1エポック分の評価値を計算
e_loss = sum_loss / len(train_dataset) # 平均損失
e_acc = sum_ok.cpu().double() / len(train_dataset) # 正解率
print(f'Loss: {e_loss:.4f} Acc: {e_acc:.4f}')
train_loss_log.append(e_loss)
train_acc_log.append(e_acc)
# 学習曲線をプロット
fig = plt.figure(figsize=(12, 6))
fig.add_subplot(121) # 損失
plt.plot(range(1, num_epochs + 1), train_loss_log, '.-')
plt.title('Training data')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim(0)
fig.add_subplot(122) # 正解率
plt.plot(range(1, num_epochs + 1), train_acc_log, '.-')
plt.title('Training data')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.show()
# テストデータを入力してみる
print('\n--- Result AFTER training ---\n')
test_with_examples(net, test_loader)