基础教程:深度卷积网络:实例探究

1.GoogleNet

注:本节并不是完整的GoogleNet实现,缺少了提前预测部分。

首先导包和对GPU进行配置:

import tensorflow as tf

print(tf.__version__)

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu,True)

输出:

2.1.0

定义网络基本Block:

class Inception(tf.keras.layers.Layer):
def __init__(self,c1,c2,c3,c4):
super().__init__()
# 1
self.p1_1 = tf.keras.layers.Conv2D(c1,kernel_size=1,activation='relu',padding='same')
# 2
self.p2_1 = tf.keras.layers.Conv2D(c2[0],kernel_size=1,activation='relu',padding='same')
self.p2_2 = tf.keras.layers.Conv2D(c2[1],kernel_size=3,activation='relu',padding='same')
# 3
self.p3_1 = tf.keras.layers.Conv2D(c3[0],kernel_size=1,activation='relu',padding='same')
self.p3_2 = tf.keras.layers.Conv2D(c3[1],kernel_size=5,activation='relu',padding='same')
# 4
self.p4_1 = tf.keras.layers.MaxPool2D(pool_size=3,padding='same',strides=1)
self.p4_2 = tf.keras.layers.Conv2D(c4,kernel_size=1,padding='same',activation='relu')


def call(self,x):
p1 = self.p1_1(x)
p2 = self.p2_2(self.p2_1(x))
p3 = self.p3_2(self.p3_1(x))
p4 = self.p4_2(self.p4_1(x))
return tf.concat([p1,p2,p3,p4],axis=-1)

可以看出,Block的输出由四部分输出串联组成

然后是网络定义:

b1 = tf.keras.models.Sequential()
b1.add(tf.keras.layers.Conv2D(64,kernel_size=7,strides=2,padding='same',activation='relu'))
b1.add(tf.keras.layers.MaxPool2D(pool_size=3,strides=2,padding='same'))
b2 = tf.keras.models.Sequential()
b2.add(tf.keras.layers.Conv2D(64,kernel_size=1,padding='same',activation='relu'))
b2.add(tf.keras.layers.Conv2D(192,kernel_size=3,padding='same',activation='relu'))
b2.add(tf.keras.layers.MaxPool2D(pool_size=3,strides=2,padding='same'))
b3 = tf.keras.models.Sequential()
b3.add(Inception(64,(96,128),(16,32),32))
b3.add(Inception(128,(128,192),(32,96),64))
b3.add(tf.keras.layers.MaxPool2D(pool_size=3,strides=2,padding='same'))
b4 = tf.keras.models.Sequential()
b4.add(Inception(192,(96,288),(16,48),64))
b4.add(Inception(160,(112,224),(24,64),64))
b4.add(Inception(128,(128,256),(16,64),64))
b4.add(Inception(112,(144,288),(16,64),64))
b4.add(Inception(256,(160,320),(32,128),128))
b4.add(tf.keras.layers.MaxPool2D(pool_size=3,strides=2,padding='same'))
b5 = tf.keras.models.Sequential()
b5.add(Inception(256,(160,320),(32,128),128))
b5.add(Inception(384,(192,384),(48,128),128))
b5.add(tf.keras.layers.GlobalAvgPool2D())
net = tf.keras.models.Sequential([b1,b2,b3,b4,b5,tf.keras.layers.Dense(10)])

注意,这里定了5个Sequential,只是为了方便区分。

我们观察一下网络结构和shape变化:

X = tf.random.uniform(shape=(1, 96, 96, 1))
for layer in net.layers:
X = layer(X)
print(layer.name, 'output shape:\t', X.shape)

结果如下:

sequential output shape:	 (1, 24, 24, 64)
sequential_1 output shape:	 (1, 12, 12, 192)
sequential_2 output shape:	 (1, 6, 6, 480)
sequential_3 output shape:	 (1, 3, 3, 832)
sequential_4 output shape:	 (1, 1024)
dense output shape:	 (1, 10)

然后是数据预处理、标准化和批读取:

import numpy as np

class DataLoader():
def __init__(self):
fashion_mnist = tf.keras.datasets.fashion_mnist
(self.train_images,self.train_labels),(self.test_images,self.test_labels) = fashion_mnist.load_data()
self.test_images = np.expand_dims(self.test_images.astype(np.float32)/255.,axis=-1)
self.train_images = np.expand_dims(self.train_images.astype(np.float32)/255.,axis=-1)
self.train_labels = self.train_labels.astype(np.int32)
self.test_images = self.test_images.astype(np.int32)
self.num_train,self.num_test = self.train_images.shape[0],self.test_images.shape[0]


def get_batch_train(self,batch_size):
index= np.random.randint(0,np.shape(self.train_images)[0],batch_size)
resized_images = tf.image.resize_with_pad(self.train_images[index],224,224)
return resized_images.numpy(),self.train_labels[index]

def get_batch_test(self, batch_size):
index = np.random.randint(0, np.shape(self.test_images)[0], batch_size)
resized_images = tf.image.resize_with_pad(self.test_images[index],224,224)
return resized_images.numpy(), self.test_labels[index]

batch_size = 128
dataLoader = DataLoader()
x_batch,y_batch = dataLoader.get_batch_train(batch_size)
print("x_batch shape:",x_batch.shape,"y_batch shape:", y_batch.shape)

输出:

x_batch shape: (128, 224, 224, 1) y_batch shape: (128,)

最后,对网络进行训练,由于时间关系,只训练1个epoch:

def train_googlenet():
epoch = 1
num_iter = dataLoader.num_train//batch_size
for e in range(epoch):
for n in range(num_iter):
x_batch, y_batch = dataLoader.get_batch_train(batch_size)
net.fit(x_batch, y_batch,verbose=2)
if n%20 == 0:
net.save_weights("5.9_googlenet_weights.h5")

optimizer = tf.keras.optimizers.Adam(lr=1e-7)

net.compile(optimizer=optimizer,
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])

train_googlenet()

最后输出测试:

x_test, y_test = dataLoader.get_batch_test(2000)
net.evaluate(x_test, y_test, verbose=1)

2.ResNet

啊。。原来残差并不是有跳过,而是变相的把结果和输入相加起来。。(如果是连接起来,则是DenseNet【tf.keras.layers.concatenate([x,y], axis=-1)】)作为输出。。啊,学到了学到了。

首先,老规矩,导包,设置GPU:

import tensorflow as tf
print(tf.__version__)

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu,True)

输出:

2.1.0

然后是基础的残差,可以看到最后返回的是激活后的Y+X:

这里之所以要有conv3是因为要使用1×1卷积将

class  Residual(tf.keras.Model):
def __init__(self,num_channels, use_1x1conv=False,strides=1,**kwargs):
super(Residual,self).__init__(**kwargs)
self.conv1 = tf.keras.layers.Conv2D(num_channels,padding='same',kernel_size=3,strides=strides)
self.conv2 = tf.keras.layers.Conv2D(num_channels,kernel_size=3,padding='same')
if use_1x1conv:
self.conv3 = tf.keras.layers.Conv2D(num_channels,kernel_size=1,strides=strides)
else:
self.conv3 = None
self.bn1 = tf.keras.layers.BatchNormalization()
self.bn2 = tf.keras.layers.BatchNormalization()


def call(self,X):
Y = tf.keras.activations.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
return tf.keras.activations.relu(Y+X)

我们将残差组合成残差块:

class ResnetBlock(tf.keras.layers.Layer):
def __init__(self,num_channels,num_residuals,first_block=False,**kwargs):
super(ResnetBlock,self).__init__(**kwargs)
self.listLayers = []
for i in range(num_residuals):
if i==0 and not first_block:
self.listLayers.append(Residual(num_channels, use_1x1conv=True, strides=2))
else:
self.listLayers.append(Residual(num_channels))

def call(self,X):
for layer in self.listLayers.layers:
X = layer(X)
return X

定义我们的残差网络,使用了4个残差块,每个残差块中下又有num_blocks个残差基本单位:

class ResNet(tf.keras.Model):
def __init__(self,num_blocks,**kwargs):
super(ResNet,self).__init__(**kwargs)
self.conv = tf.keras.layers.Conv2D(64,kernel_size=7,strides=2,padding='same')
self.bn = tf.keras.layers.BatchNormalization()
self.relu = tf.keras.layers.Activation('relu')
self.mp = tf.keras.layers.MaxPool2D(pool_size=3,strides=2,padding='same')
self.resnet_block1 = ResnetBlock(64,num_blocks[0],first_block=True)
self.resnet_block2 = ResnetBlock(128,num_blocks[1])
self.resnet_block3 = ResnetBlock(256,num_blocks[2])
self.resnet_block4 = ResnetBlock(512,num_blocks[3])
self.gap = tf.keras.layers.GlobalAvgPool2D()
self.fc = tf.keras.layers.Dense(10,activation='softmax')

def call(self,X):
X = self.conv(X)
X = self.bn(X)
X = self.relu(X)
X = self.mp(X)
X = self.resnet_block1(X)
X = self.resnet_block2(X)
X = self.resnet_block3(X)
X = self.resnet_block4(X)
X = self.gap(X)
X = self.fc(X)
return X

定义:

mynet = ResNet([2,2,2,2])

输出网络结构:

X = tf.random.uniform(shape=(1,  224, 224 , 1))
for layer in mynet.layers:
X = layer(X)
print(layer.name, 'output shape:\t', X.shape)

输出:

conv2d_1 output shape:	 (1, 112, 112, 64)
batch_normalization_1 output shape:	 (1, 112, 112, 64)
activation_1 output shape:	 (1, 112, 112, 64)
max_pooling2d_1 output shape:	 (1, 56, 56, 64)
resnet_block output shape:	 (1, 56, 56, 64)
resnet_block_1 output shape:	 (1, 28, 28, 128)
resnet_block_2 output shape:	 (1, 14, 14, 256)
resnet_block_3 output shape:	 (1, 7, 7, 512)
global_average_pooling2d output shape:	 (1, 512)
dense output shape:	 (1, 10)

网络训练:

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
x_train = x_train.reshape((60000, 28, 28, 1)).astype('float32') / 255
x_test = x_test.reshape((10000, 28, 28, 1)).astype('float32') / 255

mynet.compile(loss='sparse_categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])

history = mynet.fit(x_train, y_train,
batch_size=64,
epochs=5,
validation_split=0.2)
test_scores = mynet.evaluate(x_test, y_test, verbose=2)

输出:

Train on 48000 samples, validate on 12000 samples
Epoch 1/5
48000/48000 [==============================] - 29s 595us/sample - loss: 0.4796 - accuracy: 0.8319 - val_loss: 0.3784 - val_accuracy: 0.8611



Epoch 2/5
48000/48000 [==============================] - 24s 492us/sample - loss: 0.3222 - accuracy: 0.8814 - val_loss: 0.3301 - val_accuracy: 0.8831

 - ETA: 2s - loss: 0.3233 - accuracy: 0.8809

Epoch 3/5
48000/48000 [==============================] - 24s 507us/sample - loss: 0.2825 - accuracy: 0.8960 - val_loss: 0.4055 - val_accuracy: 0.8632



Epoch 4/5
48000/48000 [==============================] - 23s 486us/sample - loss: 0.2627 - accuracy: 0.9033 - val_loss: 0.2666 - val_accuracy: 0.9048



Epoch 5/5
48000/48000 [==============================] - 23s 470us/sample - loss: 0.2419 - accuracy: 0.9105 - val_loss: 0.2871 - val_accuracy: 0.8990



10000/10000 - 2s - loss: 0.2869 - accuracy: 0.8958

网络测试:

mynet.evaluate(x_test,y_test)

输出:

10000/10000 [==============================] - 3s 270us/sample - loss: 0.2869 - accuracy: 0.8958

0 条评论

发表回复

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用 * 标注