Numpy实现BP神经网络(包含Dropout、BN等训练技巧)
BP神經(jīng)網(wǎng)絡(luò)
簡介
本文主要通過在MNIST數(shù)據(jù)集上使用全連接神經(jīng)網(wǎng)絡(luò)對比有無歸一化、有無Dropout以及有無Batch Normalization進(jìn)行訓(xùn)練,可視化分析常用的深度網(wǎng)絡(luò)訓(xùn)練技巧的原因及效果。
網(wǎng)絡(luò)結(jié)構(gòu)
網(wǎng)絡(luò)結(jié)構(gòu)類似下圖,輸入層定為784(輸入圖片特征數(shù)),隱藏層1有512個(gè)神經(jīng)元(tanh激活),隱藏層2有512個(gè)神經(jīng)元(tanh激活),輸出層有10個(gè)神經(jīng)元(softmax激活,得到10個(gè)類別的概率分布)。
訓(xùn)練Pipeline
輸入數(shù)據(jù)
將輸入的一個(gè)batch的數(shù)據(jù)的x處理為[b, 784],y通過onehot編碼處理為[b, 10],
前向傳播
前向計(jì)算,得到各層的輸入和輸出。
反向傳播
按照BP規(guī)則,計(jì)算各個(gè)參數(shù)的梯度。
參數(shù)優(yōu)化
按照Adam算法,對參數(shù)進(jìn)行更新。
源碼
由于模塊較多,這里直接給出最為核心的model源碼,其他模塊可以在文末的Github找到。
""" Author: Zhou Chen Date: 2019/12/4 Desc: 構(gòu)建模型 """ import numpy as np from initializers import xavier, zero from utils import onehot from activations import tanh, softmax, softmax_gradient, tanh_gradient from losses import cross_entropy from optimizers import SGD, Adamdef dropout(x, p):"""以概率p丟棄神經(jīng)元連接,為了處理方便采用反向Dropout思路,該方法無需修改測試網(wǎng)絡(luò)"""keep_prob = 1 - p# z這里寫的時(shí)候犯了一個(gè)錯(cuò)誤,就是不應(yīng)該批量生成概率矩陣,而是生成的概率矩陣批量重復(fù)d_temp = np.random.binomial(1, keep_prob, size=x.shape[1:]) / keep_probd_temp = d_temp.reshape(-1)x_dropout = x * d_tempreturn x_dropout, d_tempclass Model(object):def __init__(self, num_layers, units_list=None, initializer=None, optimizer='adam'):self.weight_num = num_layers - 1# 根據(jù)傳入的初始化方法初始化參數(shù),本次實(shí)驗(yàn)只實(shí)現(xiàn)xavier和全0初始化self.params = xavier(num_layers, units_list) if initializer == 'xavier' else zero(num_layers, units_list)self.optimizer = Adam(weights=self.params, weight_num=self.weight_num) if optimizer == 'adam' else SGD()self.bn_param = {}def forward(self, x, dropout_prob=None):"""前向傳播,針對一個(gè)mini-batch處理"""net_inputs = [] # 各層的輸入net_outputs = [] # 各層激活后的輸出net_d = []# 為了層號對應(yīng),將輸入層直接添加net_inputs.append(x)net_outputs.append(x)net_d.append(np.ones(x.shape[1:])) # 輸入層無丟棄概率for i in range(1, self.weight_num): # 參數(shù)數(shù)量比層數(shù)少1x = x @ self.params['w'+str(i)].Tnet_inputs.append(x)x = tanh(x)if dropout_prob:# 訓(xùn)練階段丟棄x, d_temp = dropout(x, dropout_prob)net_d.append(d_temp)net_outputs.append(x)out = x @ self.params['w'+str(self.weight_num)].Tnet_inputs.append(out)out = softmax(out)net_outputs.append(out)return {'net_inputs': net_inputs, 'net_outputs': net_outputs, 'd': net_d}, outdef backward(self, nets, y, pred, dropout_prob=None):"""dz[out] = out - ydw[out] = dz[out] @ outputs[out-1].Tdb[out] = dz[out]dz[i] = W[i+1]dz[i+1] * grad(z[i])dw[i] = dz[i] @ outputs[i-1]db[i] = dz[i]sa"""grads = dict()grads['dz'+str(self.weight_num)] = (pred - y) # [b, 10]grads['dw'+str(self.weight_num)] = grads['dz'+str(self.weight_num)].T @ nets['net_outputs'][self.weight_num-1] #[10, 512]for i in reversed(range(1, self.weight_num)):temp = grads['dz' + str(i + 1)] @ self.params['w' + str(i + 1)] * tanh_gradient(nets['net_inputs'][i])if dropout_prob:temp = temp * nets['d'][i] / (1-dropout_prob)grads['dz'+str(i)] = temp # [b, 128]grads['dw'+str(i)] = grads['dz'+str(i)].T @ nets['net_outputs'][i-1]return gradsdef train(self, data_loader, valid_loader, epochs, learning_rate, dropout_prob=None):losses_train = []losses_valid = []for epoch in range(epochs):print("epoch", epoch)# 訓(xùn)練部分epoch_loss_train = 0for step, (x, y) in enumerate(data_loader):# x:[b, 28, 28] -> [b, 784] , y:[b, 1] -> [b, 10]x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward(x, dropout_prob)loss = cross_entropy(y, pred)epoch_loss_train += lossgrads = self.backward(nets, y, pred, dropout_prob)# SGD更新參數(shù)# self.params = optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])self.params = self.optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])if step % 100 == 0:print("epoch {} training step {} loss {:.4f}".format(epoch, step, loss))losses_train.append(epoch_loss_train)print(epoch_loss_train)data_loader.restart()# 驗(yàn)證部分,只進(jìn)行前向傳播epoch_loss_valid = 0for step, (x, y) in enumerate(valid_loader):x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward(x, dropout_prob)loss = cross_entropy(y, pred)epoch_loss_valid += lossif step % 100 == 0:print("epoch {} validation step {} loss {:.4f}".format(epoch, step, loss))losses_valid.append(epoch_loss_valid)valid_loader.restart()his = {'train_loss': losses_train, 'valid_loss': losses_valid}return hisdef batch_norm(self, x, layer_index, mode):epsilon = 1e-6momentum = 0.9N, D = x.shapeglobal_mean = self.bn_param.get('global_mean' + str(layer_index), np.zeros(D, dtype=x.dtype))global_var = self.bn_param.get('global_var' + str(layer_index), np.zeros(D, dtype=x.dtype))cache = Noneif mode == 'train':# 計(jì)算當(dāng)前batch的均值和方差sample_mean = np.mean(x, axis=0)sample_var = np.var(x, axis=0)x_hat = (x - sample_mean) / np.sqrt(sample_var + epsilon)out = self.params['gamma' + str(layer_index)] * x_hat + self.params['beta' + str(layer_index)] # bn結(jié)束global_mean = momentum * global_mean + (1 - momentum) * sample_meanglobal_var = momentum * global_var + (1 - momentum) * sample_varcache = {'x': x, 'x_hat': x_hat, 'sample_mean': sample_mean, 'sample_var': sample_var}else:# 測試模式,使用全局均值和方差標(biāo)準(zhǔn)化x_hat = (x - global_mean) / np.sqrt(global_var + epsilon)out = self.params['gamma' + str(layer_index)] * x_hat + self.params['beta' + str(layer_index)]self.bn_param['global_mean' + str(layer_index)] = global_meanself.bn_param['global_var' + str(layer_index)] = global_varreturn out, cachedef forward_bn(self, x, bn_mode='train'):"""帶BN層的前向傳播"""net_inputs = []net_outputs = []caches = []net_inputs.append(x)net_outputs.append(x)caches.append(x)for i in range(1, self.weight_num):# 所有隱層的輸入都進(jìn)行BN,輸入層和輸出層不進(jìn)行BNx = x = x @ self.params['w'+str(i)].Tnet_inputs.append(x)x, cache = self.batch_norm(x, i, bn_mode) # 可以將BN理解為加在隱藏層神經(jīng)元輸入和輸出間可訓(xùn)練的一層caches.append(cache)x = tanh(x)net_outputs.append(x)out = x @ self.params['w' + str(self.weight_num)].Tnet_inputs.append(out)out = softmax(out)net_outputs.append(out)return {'net_inputs': net_inputs, 'net_outputs': net_outputs, 'cache': caches}, outdef backward_bn(self, nets, y, pred):"""加入BN層的反向傳播"""epsilon = 1e-6momentum = 0.9grads = dict()# 求解輸出層梯度,依據(jù)鏈?zhǔn)椒▌t,無BNgrads['dz' + str(self.weight_num)] = (pred - y)grads['dw' + str(self.weight_num)] = grads['dz' + str(self.weight_num)].T @ nets['net_outputs'][self.weight_num - 1]for i in reversed(range(1, self.weight_num)):N = nets['cache'][i]['x'].shape[0]grads['dz'+str(i)] = grads['dz' + str(i + 1)] @ self.params['w' + str(i + 1)]grads['dgamma'+str(i)] = np.sum(grads['dz'+str(i)] * nets['cache'][i]['x_hat'])grads['dbeta'+str(i)] = np.sum(grads['dz'+str(i)], axis=0)dx_hat = grads['dz'+str(i)] * self.params['gamma'+str(i)]dsigma = -0.5 * np.sum(dx_hat * (nets['cache'][i]['x'] - nets['cache'][i]['sample_mean']), axis=0) * np.power(nets['cache'][i]['sample_var'][i] + epsilon, -1.5)dmu = -np.sum(dx_hat / np.sqrt(nets['cache'][i]['sample_var'] + epsilon), axis=0) - 2 * dsigma * np.sum(nets['cache'][i]['x'] - nets['cache'][i]['sample_mean'], axis=0) / Ndx = dx_hat / np.sqrt(nets['cache'][i]['sample_var'] + epsilon) + 2.0 * dsigma * (nets['cache'][i]['x'] - nets['cache'][i]['sample_mean']) / N + dmu / Ntemp = dx * tanh_gradient(nets['net_inputs'][i])grads['dw'+str(i)] = temp.T @ nets['net_outputs'][i-1]return gradsdef train_bn(self, data_loader, valid_loader, epochs, learning_rate):losses_train = []losses_valid = []for epoch in range(epochs):print("epoch", epoch)epoch_loss_train = 0# 重置全局均值和方差# 批量訓(xùn)練for step, (x, y) in enumerate(data_loader):# x:[b, 28, 28] -> [b, 784] , y:[b, 1] -> [b, 10]x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward_bn(x, bn_mode='train')grads = self.backward_bn(nets, y, pred)self.optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])loss = cross_entropy(y, pred)epoch_loss_train += lossif step % 100 == 0:print("epoch {} step {} loss {:.4f}".format(epoch, step, loss))losses_train.append(epoch_loss_train)data_loader.restart()print(epoch_loss_train)# 驗(yàn)證集測試epoch_loss_valid = 0for step, (x, y) in enumerate(valid_loader):x = x.reshape(-1, 28 * 28)y = onehot(y, 10)nets, pred = self.forward_bn(x, bn_mode='test')loss = cross_entropy(y, pred)epoch_loss_valid += lossif step % 100 == 0:print("epoch {} step {} loss {:.4f}".format(epoch, step, loss))losses_valid.append(epoch_loss_valid)valid_loader.restart()his = {'train_loss': losses_train, 'valid_loss': losses_valid}return hisdef predict(self, data_loader, bn=False):labels = []pred = []losses = 0for (x, y) in data_loader:x = x.reshape(-1, 28 * 28)y = onehot(y, 10)if bn:_, out = self.forward_bn(x, 'test')else:_, out = self.forward(x)loss = cross_entropy(y, out)losses += lossout = list(np.argmax(out, axis=-1).flatten())y = list(np.argmax(y, axis=1).flatten())labels += ypred += outreturn np.array(pred).astype('int'), np.array(labels).astype('int')訓(xùn)練效果
對比有無歸一化
主要用途
調(diào)整輸入層數(shù)值尺度,以便統(tǒng)一使用梯度下降優(yōu)化時(shí)統(tǒng)一各層參數(shù)優(yōu)化的學(xué)習(xí)率。(不然輸入層需要使用較低學(xué)習(xí)率)
使用效果
收斂更快 即損失下降更快。
訓(xùn)練集
驗(yàn)證集
測試集
對比有無Dropout
主要用途
隨機(jī)關(guān)閉神經(jīng)元以減少神經(jīng)元之間的相關(guān)度,從而逼迫神經(jīng)網(wǎng)絡(luò)進(jìn)行更加復(fù)雜的學(xué)習(xí),有效抑制過擬合。
使用效果
訓(xùn)練收斂變慢,測試效果變好。
訓(xùn)練集
驗(yàn)證集
測試集
對比有無Batch Normalization
主要用途
將各層網(wǎng)絡(luò)的輸入統(tǒng)一為標(biāo)準(zhǔn)正態(tài)分布,加快網(wǎng)絡(luò)的學(xué)習(xí),有效解決訓(xùn)練的相關(guān)問題。
使用效果
訓(xùn)練加速。
訓(xùn)練集
驗(yàn)證集
測試集
補(bǔ)充說明
本案例均使用Numpy手寫神經(jīng)網(wǎng)絡(luò)的訓(xùn)練,如有疏漏之處歡迎之處。源碼開源于我的Github,歡迎star和fork。
超強(qiáng)干貨來襲 云風(fēng)專訪:近40年碼齡,通宵達(dá)旦的技術(shù)人生總結(jié)
以上是生活随笔為你收集整理的Numpy实现BP神经网络(包含Dropout、BN等训练技巧)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习-Kmeans聚类
- 下一篇: 版本控制可视化工具-Gource教程