实现18 层的深度残差网络 ResNet18,并在 CIFAR10 图片数据集上训练与测试。标准的 ResNet18 接受输入为224 × 224 大小的图片数据,我们将 ResNet18 进行适量调整,使得它输入大小为32 × 32,输出维度为 10。调整后的 ResNet18 网络结构如图:
def preprocess(x, y):
# 将数据映射到-1~1
x = 2 * tf.cast(x, dtype=tf.float32) / 255. - 1
y = tf.cast(y, dtype=tf.int32) # 类型转换
return x, y
(x, y), (x_test, y_test) = load.load_data() # 加载数据集
y = tf.squeeze(y, axis=1) # 删除不必要的维度
y_test = tf.squeeze(y_test, axis=1) # 删除不必要的维度
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x, y)) # 构建训练集
# 随机打散,预处理,批量化
train_db = train_db.shuffle(1000).map(preprocess).batch(512)
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)) # 构建测试集
# 预处理,批量化
test_db = test_db.map(preprocess).batch(512)
# 采样一个样本
sample = next(iter(train_db))
数据集的各种处理还是合以前一样。由于网上在线下载数据集不行,使用的是自定义的函数来加载数据的。数据集加载以后,构建训练集合测试集并进行相应的类型转换,训练集除了要进行批处理还要进行打散。
二、网络模型构建 1. BasicBlock首先实现中间两个卷积层, Skip Connection 1x1 卷积层的残差模块
class BasicBlock(layers.Layer):
# 残差模块
def __init__(self, filter_num, stride=1):
super(BasicBlock, self).__init__()
# 第一个卷积单元
self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
self.bn1 = layers.BatchNormalization()
self.relu = layers.Activation('relu')
# 第二个卷积单元
self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
self.bn2 = layers.BatchNormalization()
# padding都是same,所以只有stride=1,输入与输出形状相同,否则高宽减少为原来的1/s
if stride != 1: # 通过1x1卷积完成shape匹配
self.downsample = Sequential()
self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
else: # shape匹配,直接短接
self.downsample = lambda x: x
def call(self, inputs, training=None):
# [b, h, w, c],通过第一个卷积单元
out = self.conv1(inputs)
out = self.bn1(out)
out = self.relu(out)
# 通过第二个卷积单元
out = self.conv2(out)
out = self.bn2(out)
# 通过identity模块
identity = self.downsample(inputs)
# 2条路径输出直接相加
output = layers.add([out, identity])
output = tf.nn.relu(output) # 激活函数
return output
通过build_resblock 可以一次完成多个残差模块的新建
def build_resblock(self, filter_num, blocks, stride=1):
# 辅助函数,堆叠filter_num个BasicBlock
res_blocks = Sequential()
# 只有第一个BasicBlock的步长可能不为1,实现下采样
res_blocks.add(BasicBlock(filter_num, stride))
# 每个每个ResBlock包含两个 blocks 个BasicBlock
for _ in range(1, blocks): # 其他BasicBlock步长都为1
res_blocks.add(BasicBlock(filter_num, stride=1))
return res_blocks
2.ResNet
在设计深度卷积神经网络时,一般按照特征图高宽ℎ/𝑤逐渐减少,通道数𝑐逐渐增大的经验法则。可以通过堆叠通道数逐渐增大的 Res Block 来实现高层特征的提取。下面来实现通用的 ResNet 网络模型。
# 通用的ResNet实现类
class ResNet(keras.Model):
def __init__(self, layer_dims, num_classes=10):
# layer_dims:[2, 2, 2, 2] 4个ResBlock,每个ResBlock包含两个basicblock num_classes:分类数目
super(ResNet, self).__init__()
# 根网络,预处理
self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
])
# 堆叠4个Block,每个block包含了多个BasicBlock,设置步长不一样
# 在设计深度卷积神经网络时,一般按照特征图高宽ℎ/𝑤逐渐减少,通道数𝑐逐渐增大的经验法则。
# 每个ResBlock包含两个basicblock
self.layer1 = self.build_resblock(64, layer_dims[0])
self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
# 输出:[b, 512, h,w],无法确定h,w
# 通过Pooling层将高宽降低为1x1, 相当由于打平成全连接层
self.avgpool = layers.GlobalAveragePooling2D()
# 最后连接一个全连接层分类
self.fc = layers.Dense(num_classes)
# 向前计算
def call(self, inputs, training=None):
# 通过根网络
x = self.stem(inputs)
# 一次通过4个模块
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
# 通过池化层
x = self.avgpool(x)
# 通过全连接层
x = self.fc(x)
return x
通过调整每个 Res Block 的堆叠数量和通道数可以产生不同的 ResNet,如通过 64-64-128-128-256-256-512-512 通道数配置,共 8 个 Res Block,可得到 ResNet18 的网络模型。每个ResBlock 包含了 2 个主要的卷积层,因此卷积层数量是8 ∙ 2 = 16,加上网络末尾的全连接层,共 18 层。
注意,在卷积层之后一般是添加全连接层来进行特征的向量化,但是全连接层有一个非常致命的弱点就是参数量过大,特别是与最后一个卷积层相连的全连接层。一方面增加了Training以及testing的计算量,降低了速度;另外一方面参数量过大容易过拟合。虽然使用了类似dropout等手段去处理,还是达不到效果。
全连接层将卷积层展开成向量之后,再要针对每个feature map进行分类,Global Average Pooling直接完成这两步操作。
所以,这里这卷积层之后先进行了GlobalAveragePooling2D,然后才进行全连接层的连接。全局平均池化,深度神经网络中经常使用的一个层,使用前后的尺寸分别为[B,H,W,C]->[B,C].
创建ResNet18和ResNet34网络模型:
def resnet18():
# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
return ResNet([2, 2, 2, 2])
def resnet34():
# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
return ResNet([3, 4, 6, 3])
三、网络装配
在完成网络模型的搭建后,需要指定网络使用的优化器对象、 损失函数类型, 评价指标等设定,这一步称为装配
model = resnet18() # ResNet18网络
model.build(input_shape=(None, 32, 32, 3))
model.summary() # 统计网络参数
optimizer = optimizers.Adam(lr=1e-4) # 构建优化器
优化器主要使用apply_gradients方法传入变量和对应梯度从而来对给定变量进行迭代
四、计算梯度,代价函数并更新参数在使用自动求导功能计算梯度,需要将向前计算过程放置在tf.GradientTape()环境中, 利用GradientTape对象的gradient()方法自动求解参数的梯度, 并利用optimizers对象更新参数
with tf.GradientTape() as tape:
# [b, 32, 32, 3] => [b, 10],前向传播
logits = model(x)
# [b] => [b, 10],one-hot编码
y_onehot = tf.one_hot(y, depth=10)
# 计算交叉熵
loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
loss = tf.reduce_mean(loss)
# 计算梯度信息
grads = tape.gradient(loss, model.trainable_variables)
# 更新网络参数
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step % 50 == 0:
print(epoch, step, 'loss:', float(loss))
五、测试
在测试阶段,由于不需要记录梯度信息,代码一般不需要写在 with tf.GradientTape() as tape 环境中。前向计算得到的输出经过 softmax 函数后,代表了网络预测当前图片输入𝒙属于类别𝑖的概率𝑃(𝒙标签是𝑖|𝑥), 𝑖 ∈ 9 。通过 argmax 函数选取概率最大的元素所在的索引,作为当前𝒙的预测类别,与真实标注𝑦比较,通过计算比较结果中间 True 的数量并求和来统计预测正确的样本的个数,最后除以总样本的个数,得出网络的测试准确度
total_num = 0
total_correct = 0
for x, y in test_db:
logits = model(x)
prob = tf.nn.softmax(logits, axis=1)
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
acc = total_correct / total_num
print(epoch, 'acc:', acc)
测试结果:
跑了2个epoch, 准确率比较低,跑完应该能达到80%左右。
完整程序resnet.py文件
# -*- codeing = utf-8 -*-
# @Time : 13:21
# @Author:Paranipd
# @File : resnet.py
# @Software:PyCharm
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Sequential
class BasicBlock(layers.Layer):
# 残差模块
def __init__(self, filter_num, stride=1):
super(BasicBlock, self).__init__()
# 第一个卷积单元
self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
self.bn1 = layers.BatchNormalization()
self.relu = layers.Activation('relu')
# 第二个卷积单元
self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
self.bn2 = layers.BatchNormalization()
# padding都是same,所以只有stride=1,输入与输出形状相同,否则高宽减少为原来的1/s
if stride != 1: # 通过1x1卷积完成shape匹配
self.downsample = Sequential()
self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
else: # shape匹配,直接短接
self.downsample = lambda x: x
def call(self, inputs, training=None):
# [b, h, w, c],通过第一个卷积单元
out = self.conv1(inputs)
out = self.bn1(out)
out = self.relu(out)
# 通过第二个卷积单元
out = self.conv2(out)
out = self.bn2(out)
# 通过identity模块
identity = self.downsample(inputs)
# 2条路径输出直接相加
output = layers.add([out, identity])
output = tf.nn.relu(output) # 激活函数
return output
# 通用的ResNet实现类
class ResNet(keras.Model):
def __init__(self, layer_dims, num_classes=10):
# layer_dims:[2, 2, 2, 2] 4个ResBlock,每个ResBlock包含两个basicblock num_classes:分类数目
super(ResNet, self).__init__()
# 根网络,预处理
self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),
layers.BatchNormalization(),
layers.Activation('relu'),
layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
])
# 堆叠4个Block,每个block包含了多个BasicBlock,设置步长不一样
# 在设计深度卷积神经网络时,一般按照特征图高宽ℎ/𝑤逐渐减少,通道数𝑐逐渐增大的经验法则。
# 每个ResBlock包含两个basicblock
self.layer1 = self.build_resblock(64, layer_dims[0])
self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
# 输出:[b, 512, h,w],无法确定h,w
# 通过Pooling层将高宽降低为1x1, 相当由于打平成全连接层
self.avgpool = layers.GlobalAveragePooling2D()
# 最后连接一个全连接层分类
self.fc = layers.Dense(num_classes)
# 向前计算
def call(self, inputs, training=None):
# 通过根网络
x = self.stem(inputs)
# 一次通过4个模块
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
# 通过池化层
x = self.avgpool(x)
# 通过全连接层
x = self.fc(x)
return x
def build_resblock(self, filter_num, blocks, stride=1):
# 辅助函数,堆叠filter_num个BasicBlock
res_blocks = Sequential()
# 只有第一个BasicBlock的步长可能不为1,实现下采样
res_blocks.add(BasicBlock(filter_num, stride))
# 每个每个ResBlock包含两个 blocks 个BasicBlock
for _ in range(1, blocks): # 其他BasicBlock步长都为1
res_blocks.add(BasicBlock(filter_num, stride=1))
return res_blocks
def resnet18():
# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
return ResNet([2, 2, 2, 2])
def resnet34():
# 通过调整模块内部BasicBlock的数量和配置实现不同的ResNet
return ResNet([3, 4, 6, 3])
mian.py
# -*- codeing = utf-8 -*-
# @Time : 14:22
# @Author:Paranipd
# @File : resnet18_train.py
# @Software:PyCharm
import tensorflow as tf
from tensorflow.keras import layers, optimizers, datasets, Sequential
import os
from resnet import resnet18
import load
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.random.set_seed(2345)
def preprocess(x, y):
# 将数据映射到-1~1
x = 2 * tf.cast(x, dtype=tf.float32) / 255. - 1
y = tf.cast(y, dtype=tf.int32) # 类型转换
return x, y
(x, y), (x_test, y_test) = load.load_data() # 加载数据集
y = tf.squeeze(y, axis=1) # 删除不必要的维度
y_test = tf.squeeze(y_test, axis=1) # 删除不必要的维度
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x, y)) # 构建训练集
# 随机打散,预处理,批量化
train_db = train_db.shuffle(1000).map(preprocess).batch(512)
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)) # 构建测试集
# 预处理,批量化
test_db = test_db.map(preprocess).batch(512)
# 采样一个样本
sample = next(iter(train_db))
def main():
# [b, 32, 32, 3] => [b, 1, 1, 512]
model = resnet18() # ResNet18网络
model.build(input_shape=(None, 32, 32, 3))
model.summary() # 统计网络参数
optimizer = optimizers.Adam(lr=1e-4) # 构建优化器
for epoch in range(100): # 训练epoch
for step, (x, y) in enumerate(train_db):
with tf.GradientTape() as tape:
# [b, 32, 32, 3] => [b, 10],前向传播
logits = model(x)
# [b] => [b, 10],one-hot编码
y_onehot = tf.one_hot(y, depth=10)
# 计算交叉熵
loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
loss = tf.reduce_mean(loss)
# 计算梯度信息
grads = tape.gradient(loss, model.trainable_variables)
# 更新网络参数
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if step % 50 == 0:
print(epoch, step, 'loss:', float(loss))
total_num = 0
total_correct = 0
for x, y in test_db:
logits = model(x)
prob = tf.nn.softmax(logits, axis=1)
pred = tf.argmax(prob, axis=1)
pred = tf.cast(pred, dtype=tf.int32)
correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
correct = tf.reduce_sum(correct)
total_num += x.shape[0]
total_correct += int(correct)
acc = total_correct / total_num
print(epoch, 'acc:', acc)
if __name__ == '__main__':
main()