机器学习是一个执果索因的过程,人类对于自然的探索也是如此。线性是最为常见的性质,也是人能够直观感受的唯一性质。最浅显的,对一元变量的线性方程可以描述为$f(x)=wx + b$的形式。

针对更高维的线性性,只需要将前式的$x, w, b$替换成矢量形式。我们给出一个比较通用的式子,这个式子几乎可以在每一本教科书中找到:$f(\mathbf x)=\mathbf w^T \mathbf x+\mathbf b$。

线性回归,是通过已知的$\mathbf x$与$f(\mathbf x)$,来得到尽量准确的$\mathbf w,\mathbf b$的推测值。

梯度下降法

当然,线性回归是有数学上的公式的。但我们这里希望用梯度下降法来获得这个回归值。因为相比于线性回归的诸多技巧,梯度下降几乎是通用的,PyTorch也支持自动求梯度。

如果您理解势能场,例如电势、地势,那么您就能够理解梯度下降法。将一个小球放在山坡上,他会自动向着下降最快的方向运动。而这一下降的方向就是由(负)梯度所描述的。电磁场公式中,$\mathbf E = -\nabla \varphi$ 就是直接的数学表述,电场强度是电势的负梯度。

势与运动之间,会用物体的特征值来描述。比如受到同样的电场,重的、带电少的小球移动的慢,而轻的、带电多的小球移动的快。这一特征值,在梯度下降法中被描述为学习率。物体的特征值是不变的,但有一些算法会调整学习率,让收敛的过程更好。

小球运动起来之后,会产生一定的速度,进而加速其落入地势的最低点。有了这样的速度,或称动量,球就可以克服部分平缓地势,更快地到达低点。这便是带有动量的梯度下降法。

我们可以利用这一方法,通过一系列的数学运算,给出每个参量相较于样例的梯度,进而调节,将内置参量向样例靠拢。

因为梯度下降法非常常用,PyTorch也提供了一系列内置的梯度下降工具。

从例子中学习

从零开始的线性网络

首先引入相关包、数据集,和作图等辅助函数。此处我们使用的是MNIST手写数字图像数据集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import torch
import torch.nn as nn
import torchvision
import os

# Dataset
rootdir = os.getenv("TORCHVISION_ROOT")
trans = torchvision.transforms.ToTensor()
MNIST_train = torchvision.datasets.MNIST(root=rootdir,transform=trans, train=True, download=True)
MNIST_test = torchvision.datasets.MNIST(root=rootdir,transform=trans, train=False, download=True)

# For visualization
import matplotlib.pyplot as plt
from IPython.display import clear_output

class Plotter():
def __init__(self,mainkey,*args):
self.titlelist={}
self.mainkey = mainkey
self.titlelist[mainkey]=[]
for t in args:
self.titlelist[t]=[]

def update(self,**kwargs):
for key,value in kwargs.items():
self.titlelist[key].append(value)
graphs = []
for key,value in self.titlelist.items():
if(key == self.mainkey):
continue
graphs.append(plt.plot(self.titlelist[self.mainkey],value,label=key)[0])
clear_output(wait=True)
plt.legend(handles=graphs)
plt.show()

定义一个线性模型。

1
2
3
4
5
6
7
8
9
class linear_net(nn.Module):
def __init__(self,inputlayer,outputlayer):
super(linear_net, self).__init__()
self.Layer1 = nn.Parameter(torch.randn(inputlayer, outputlayer, requires_grad=True)*0.1)
self.Bias1 = nn.Parameter(torch.zeros(outputlayer, requires_grad=True))

def forward(self,x):
x = x @ self.Layer1 + self.Bias1
return x

这个模型需要关注的点,是__init__函数中定义的Layer1Bias1,与下面的forward函数。对于函数$f(\mathbf x)=\mathbf w^T \mathbf x+\mathbf b$,Layer1定义的是$\mathbf w^T$。(至于转置,请注意声明中的.t())。Bias1定义的是$\mathbf b$。

nn.Parameter函数允许你自行定义一组参与运算并需要反向传播的参量。当然,PyTorch提供了现成的模组nn.linear可以直接使用,但这里我们将它展开以便理解。

forward函数描述其前馈的方式。对于此线性网络,我们按图索骥完成这个线性函数即可。@符号是矩阵乘法。

我们此处希望,传入的值不仅可以是向量,也可以是一个向量组,即矩阵。不以向量,而是以一个矩阵的形式传入,一方面是为了实现后文会提到的Mini-Batch SGD小批量梯度下降法,另一方面也是利用GPU的并行计算特性,在一个单位时间完成多次计算任务。

但是在传统的线性代数中,$\mathbf x$通常定义为$[x_1,x_2,\cdots,x_n]^T$,而一个向量组则定义为$[\mathbf x_a,\mathbf x_b,\cdots,\mathbf x_m]$。虽然是等价的,但我们如果将其转置,分别定义为$\mathbf x= [x_1,x_2,\cdots,x_n]$和 $$ \mathbf X = \left[\begin{matrix} \ \mathbf x_a \\ \mathbf x_b \\ \cdots \\ \mathbf x_m \end{matrix}\right] $$

并将函数改为$f(\mathbf x)=\mathbf X \mathbf w^T+\mathbf b$,在计算机上可以被更好地描述,也省去了转置的步骤。最重要的是,如果$\mathbf X$是矩阵的话,在做加法的时候$\mathbf b$会被自动延展。

此处您可能会想到,向量有行列之分。但PyTorch中的向量会自动选择方向,不分行列。比如,您可以试试看,torch.Tensor([1,2]).t() == torch.Tensor([1,2])的结果。

我们定义了一个TrainDaemon类,将PyTorch提供的参数和接口进行包装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class TrainDaemon:
def __init__(self,network,loss,optimizer,train_dataset,test_dataset,**kwargs):
if "network_args" in kwargs:
self.model = network(**kwargs["network_args"])
else:
self.model = network()
if "cuda" in kwargs and kwargs["cuda"]==True:
self.model.cuda()
self.enable_cuda = True
else:
self.enable_cuda = False
if "dataloader_args" in kwargs:
self.train_DataLoader = torch.utils.data.DataLoader(train_dataset,**kwargs["dataloader_args"])
self.test_DataLoader = torch.utils.data.DataLoader(test_dataset,**kwargs["dataloader_args"])
else:
self.train_DataLoader = torch.utils.data.DataLoader(train_dataset,batch_size=1)
self.test_DataLoader = torch.utils.data.DataLoader(test_dataset,batch_size=1)
if "optimizer_args" in kwargs:
self.optimizer = optimizer(self.model.parameters(),**kwargs["optimizer_args"])
if "loss_args" in kwargs:
self.loss = loss(**kwargs["loss_args"])
else:
self.loss = loss()
if "input_transformer" in kwargs:
self.inputTrans = kwargs["input_transformer"]
else:
self.inputTrans = lambda x:x
if "output_transformer" in kwargs:
self.outputTrans =kwargs["output_transformer"]
else:
self.outputTrans = lambda x:x
if "accuracy_fun" in kwargs:
self.accuracy = kwargs["accuracy_fun"]
else:
self.accuracy = lambda x,y: 1


def _train_batch(self,train_sample,train_output):

if self.enable_cuda:
train_sample = self.inputTrans(train_sample).cuda()
train_output = self.outputTrans(train_output).cuda()
else:
train_sample = self.inputTrans(train_sample)
train_output = self.outputTrans(train_output)
self.optimizer.zero_grad()
model_output = self.model(train_sample)
train_loss = self.loss(model_output,train_output)
train_loss.backward()
self.optimizer.step()
return train_loss, self.accuracy(model_output,train_output)

def _full_test(self):
with torch.no_grad():
correct_rate = 0
test_loss = 0
total_groups = 0
for test_sample,test_output in self.test_DataLoader:
total_groups += 1
if self.enable_cuda:
test_sample = self.inputTrans(test_sample).cuda()
test_output = self.outputTrans(test_output).cuda()
else:
test_sample = self.inputTrans(test_sample)
test_output = self.outputTrans(test_output)
model_output = self.model(test_sample)
test_loss += self.loss(model_output,test_output)
correct_rate += self.accuracy(model_output,test_output)
return test_loss/total_groups, correct_rate/total_groups

def _train_epoch(self):
correct_rate = 0
train_loss = 0
total_groups = 0
for train_sample,train_output in self.train_DataLoader:
_train_loss, _correct_rate = self._train_batch(train_sample,train_output)
train_loss +=_train_loss
correct_rate += _correct_rate
total_groups+=1
return train_loss/total_groups, correct_rate/total_groups

def train(self,epoch=1,log=False,plot=False):
if plot:
plotr = Plotter("Epoch","train_loss","train_accuracy","test_loss","test_accuracy")
for ep in range(epoch):
train_loss, train_accu = self._train_epoch()
test_loss, test_accu = self._full_test()
if log:
print(f"Epoch{ep}:\n"
"Train\tloss:{format(train_loss,'0.6f')}\taccu:{format(train_accu,'0.4f')}\n"
"Test\tloss:{format(test_loss,'0.6f')}\taccu:{format(test_accu,'0.4f')}")
if plot:
plotr.update(
Epoch=ep,
train_loss=train_loss,train_accuracy=train_accu,
test_loss=test_loss,test_accuracy=test_accu)

我们主要分析_train_batch函数。首先我们将数据进行变形,使其符合784-10的网络形状(784=28*28,是MNIST数据集的形状)这一变形函数将在后面讨论。

首先调用self.optimizer.zero_grad()将梯度清空,通过model_output = self.model(test_sample),我们将数据穿入网络,计算其误差。再对误差调用train_loss.backward(),利用梯度的链式传递特性反演并修改原有梯度。前面我们将梯度清空,是因为backward函数是增加而非覆盖原有的梯度。最后我们使用self.optimizer.step()来更新梯度值。

至于验证函数,我们调用了with torch.no_grad(),以保证验证过程不会影响我们的模型。其他的各函数将整个训练过程进行了包装。内容并不复杂,此处建议您自己写一个。

在自己写训练的函数时,常常会涉及到GPU相关运算的问题。GPU上的张量和CPU上的张量不能直接运算,要尤其注意。

接下来我们实例化一个TrainDaemon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def inputparse(X):
return X.reshape(X.shape[0],-1)
def outputparse(X):
ans_tensor = torch.zeros(len(X),10)
for batch_loc in range(len(X)):
ans_tensor[batch_loc][X[batch_loc]] = 1
return ans_tensor
def accuracy(X,Y):
return ((X.argmax(dim=1) == Y.argmax(dim=1)).sum()/len(X.argmax(dim=1)))

trainer = TrainDaemon(
linear_net,torch.nn.MSELoss,torch.optim.SGD,
MNIST_train,MNIST_test,
network_args={
"inputlayer":len(MNIST_train[0][0].reshape(-1)),
"outputlayer":10},
optimizer_args={
"lr":0.1,
"momentum":0.9},
dataloader_args={
"shuffle":True,
"batch_size":256},
cuda=True,
input_transformer=inputparse,
output_transformer=outputparse,
accuracy_fun=accuracy,)

在这里,我们将数据进行处理变形以符合我们的网络参数。outputparse(X)函数将MNIST提供的答案[3,4,...]转换成网络输出的[[0,0,0,1,0,0,0,0,0,0],[0,0,0,0,1,0,0,0,0,0],...]形式。而inputparse(X)函数则将原本以平面格形式描述的图像组转成一个向量组。

我们定义一个精度函数accuracy(X,Y),这个函数比对两个输出,计数答案相同的组并求比例。

接下来是实例化TrainDaemon。我们先向实例导入我们的网络和数据,体现在上面的linear_net,MNIST_train,MNIST_test三个参量和network_args参数组。我们给出DataLoader的参数:shuffle设为真,使其打乱数据集的顺序,batch_size设为256,这是接下来小批量随机梯度下降的参数。

我们定义损失函数和优化方式。我们使用最熟悉的函数:二乘,即平均平方损失作为损失函数,此处的torch.nn.MSELoss即为平均平方损失。优化方式采用随机梯度下降法。torch.optim.SGD是随机梯度下降。optimizer_args中,lr标明学习率,momentum标明动量。

至于此处的随机,原本的梯度下降需要将所有数据带入以得到一个用以下降的梯度,这样计算量较大,因而提出了随机梯度下降法。其最初是从样本空间中随机提出一个样本,作为整体的代表,计算一个梯度并更新。

这样的方法受到样本个体的特性影响很大,收敛效果不好,因此提出了小批量随机梯度下降,指的是提出一批较小的样本,而非单个样本进行训练。这种做法可以使得梯度的下降更接近整体水平,又在性能上较梯度下降有所提高。

最后我们向实例传递我们的输入、输出变形函数。

这样一个线性网络就搭建好了。我们可以运行试试看。

1
trainer.train(100,plot=True)

可以观察到,测试的准确度在很短的区间内就达到了大约85%的准确率。

不要重复造轮子——PyTorch

上面的net类显得很复杂,而且所有的参量都是由我们手动设置的。PyTorch提供了线性层的预制模块,其本身和我们目前的模块是相同的。

我们利用线性层来替代手动管理的参量:

1
2
3
4
5
6
7
8
class linear_smart_net(nn.Module):
def __init__(self,inputlayer,outputlayer):
super(linear_smart_net, self).__init__()
self.linearlayer = nn.Linear(inputlayer,outputlayer,bias=True)

def forward(self,x):
x = self.linearlayer(x)
return x

此时将由PyTorch为我们管理线性层。注意:PyTorch的Linear模块中的矩阵处理和我们之前提到的方法是一致的。包括PyTorch的DataLoader设计和此都是一脉相承的,请勿乱转置您的矩阵。

另外,最初的输入处理部分

1
2
def inputparse(X):
return X.reshape(X.shape[0],-1)

也可以引入一个Flatten层来处理,其功能和上述的inputparse函数一致。

答案的输出处理部分,

1
2
3
4
5
def outputparse(X):
ans_tensor = torch.zeros(len(X),10)
for batch_loc in range(len(X)):
ans_tensor[batch_loc][X[batch_loc]] = 1
return ans_tensor

也可以用既有的nn.functional.one_hot代替。但是上面这个函数产生的值是整型,我们需要将其转为计算得到的浮点类型。

我们可以以此将网络改为:

1
2
3
4
5
6
7
8
9
10
class linear_smarter_net(nn.Module):
def __init__(self,inputlayer,outputlayer):
super(linear_smart_net, self).__init__()
self.flattenlayer = nn.Flatten()
self.linearlayer = nn.Linear(inputlayer,outputlayer,bias=True)

def forward(self,x):
x = self.flattenlayer(x)
x = self.linearlayer(x)
return x

并将实例模型改为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
trainer = TrainDaemon(
linear_net,torch.nn.MSELoss,torch.optim.SGD,
MNIST_train,MNIST_test,
network_args={
"inputlayer":len(MNIST_train[0][0].reshape(-1)),
"outputlayer":10},
optimizer_args={
"lr":0.1,
"momentum":0.9},
dataloader_args={
"shuffle":True,
"batch_size":256},
cuda=True,
output_transformer=lambda X : nn.functional.one_hot(X,10).float(),
accuracy_fun=accuracy,)

softmax函数

然而,这一个线性函数的可观测性并不好。我们随机选取了一个数据放进网络,得到

1
[ 0.0333,  0.7411,  0.0758,  0.0243,  0.0601,  0.0430,  0.0036,  0.0748,  -0.0436, -0.0057]

这样一组数据。可以观察到,式中出现了负数。这样的数据参数值描述性差。

我们此处介绍softmax函数。在以分类为目的的神经网络中,通常会使用与softmax类似的函数,来将输出转化为概率分布。softmax描述为: $$ \mathrm{softmax}(x_i)=\frac{e^{x_i}}{\sum e^{x_j}} $$ 可以看到,softmax函数将$x$从$\mathbb R$上的分布转变为$(0,1]$间的概率分布。这样我们的参数就可以充分得到放大。

我们依照softmax函数的定义,修改我们的网络,在forward函数后部加上一层:

1
2
3
4
5
6
7
8
9
10
11
12
class softmax_linear_net(nn.Module):
def __init__(self,inputlayer,outputlayer):
super(softmax_linear_net, self).__init__()
self.flatten = nn.Flatten()
self.linear = nn.Linear(inputlayer,outputlayer,bias=True)
self.softmax = nn.Softmax(dim=1)

def forward(self,x):
x = self.flatten(x)
x = self.linear(x)
x = self.softmax(x)
return x

其它的参数保持几乎不变,我们训练一次,作为前面不加入softmax函数的对照:

3

可以观察到,测试的准确度达到了大约93%,比之前的提高了一些。而且再取一组数据放进网络:

1
[9.9987e-01, 1.6010e-12, 1.3448e-05, 3.1601e-06, 8.1114e-10, 6.1254e-05, 3.7817e-05, 2.8747e-06, 1.1370e-05, 1.6873e-07]

可以观察到,数据的规则性与可解释性更好了。