机器学习是一个执果索因的过程,人类对于自然的探索也是如此。线性是最为常见的性质,也是人能够直观感受的唯一性质。最浅显的,对一元变量的线性方程可以描述为$f(x)=wx + b$的形式。
针对更高维的线性性,只需要将前式的$x, w, b$替换成矢量形式。我们给出一个比较通用的式子,这个式子几乎可以在每一本教科书中找到:$f(\mathbf x)=\mathbf w^T \mathbf x+\mathbf b$。
线性回归,是通过已知的$\mathbf x$与$f(\mathbf x)$,来得到尽量准确的$\mathbf w,\mathbf b$的推测值。
梯度下降法
当然,线性回归是有数学上的公式的。但我们这里希望用梯度下降法来获得这个回归值。因为相比于线性回归的诸多技巧,梯度下降几乎是通用的,PyTorch也支持自动求梯度。
如果您理解势能场,例如电势、地势,那么您就能够理解梯度下降法。将一个小球放在山坡上,他会自动向着下降最快的方向运动。而这一下降的方向就是由(负)梯度 所描述的。电磁场公式中,$\mathbf E = -\nabla \varphi$ 就是直接的数学表述,电场强度是电势的负梯度。
势与运动之间,会用物体的特征值来描述。比如受到同样的电场,重的、带电少的小球移动的慢,而轻的、带电多的小球移动的快。这一特征值,在梯度下降法中被描述为学习率 。物体的特征值是不变的,但有一些算法会调整学习率,让收敛的过程更好。
小球运动起来之后,会产生一定的速度,进而加速其落入地势的最低点。有了这样的速度,或称动量 ,球就可以克服部分平缓地势,更快地到达低点。这便是带有动量的梯度下降法。
我们可以利用这一方法,通过一系列的数学运算,给出每个参量相较于样例的梯度,进而调节,将内置参量向样例靠拢。
因为梯度下降法非常常用,PyTorch也提供了一系列内置的梯度下降工具。
从例子中学习
从零开始的线性网络
首先引入相关包、数据集,和作图等辅助函数。此处我们使用的是MNIST手写数字图像数据集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import torchimport torch.nn as nnimport torchvisionimport osrootdir = os.getenv("TORCHVISION_ROOT" ) trans = torchvision.transforms.ToTensor() MNIST_train = torchvision.datasets.MNIST(root=rootdir,transform=trans, train=True , download=True ) MNIST_test = torchvision.datasets.MNIST(root=rootdir,transform=trans, train=False , download=True ) import matplotlib.pyplot as pltfrom IPython.display import clear_outputclass Plotter () : def __init__ (self,mainkey,*args) : self.titlelist={} self.mainkey = mainkey self.titlelist[mainkey]=[] for t in args: self.titlelist[t]=[] def update (self,**kwargs) : for key,value in kwargs.items(): self.titlelist[key].append(value) graphs = [] for key,value in self.titlelist.items(): if (key == self.mainkey): continue graphs.append(plt.plot(self.titlelist[self.mainkey],value,label=key)[0 ]) clear_output(wait=True ) plt.legend(handles=graphs) plt.show()
定义一个线性模型。
1 2 3 4 5 6 7 8 9 class linear_net (nn.Module) : def __init__ (self,inputlayer,outputlayer) : super(linear_net, self).__init__() self.Layer1 = nn.Parameter(torch.randn(inputlayer, outputlayer, requires_grad=True )*0.1 ) self.Bias1 = nn.Parameter(torch.zeros(outputlayer, requires_grad=True )) def forward (self,x) : x = x @ self.Layer1 + self.Bias1 return x
这个模型需要关注的点,是__init__
函数中定义的Layer1
和Bias1
,与下面的forward
函数。对于函数$f(\mathbf x)=\mathbf w^T \mathbf x+\mathbf b$,Layer1
定义的是$\mathbf w^T$。(至于转置,请注意声明中的.t()
)。Bias1
定义的是$\mathbf b$。
nn.Parameter
函数允许你自行定义一组参与运算并需要反向传播的参量。当然,PyTorch提供了现成的模组nn.linear
可以直接使用,但这里我们将它展开以便理解。
forward
函数描述其前馈的方式。对于此线性网络,我们按图索骥完成这个线性函数即可。@
符号是矩阵乘法。
我们此处希望,传入的值不仅可以是向量,也可以是一个向量组,即矩阵。不以向量,而是以一个矩阵的形式传入,一方面是为了实现后文会提到的Mini-Batch SGD小批量梯度下降法,另一方面也是利用GPU的并行计算特性,在一个单位时间完成多次计算任务。
但是在传统的线性代数中,$\mathbf x$通常定义为$[x_1,x_2,\cdots,x_n]^T$,而一个向量组则定义为$[\mathbf x_a,\mathbf x_b,\cdots,\mathbf x_m]$。虽然是等价的,但我们如果将其转置,分别定义为$\mathbf x= [x_1,x_2,\cdots,x_n]$和
$$
\mathbf X = \left[\begin{matrix}
\
\mathbf x_a \\
\mathbf x_b \\
\cdots \\
\mathbf x_m
\end{matrix}\right]
$$
并将函数改为$f(\mathbf x)=\mathbf X \mathbf w^T+\mathbf b$,在计算机上可以被更好地描述,也省去了转置的步骤。最重要的是,如果$\mathbf X$是矩阵的话,在做加法的时候$\mathbf b$会被自动延展。
此处您可能会想到,向量有行列之分。但PyTorch中的向量会自动选择方向,不分行列。比如,您可以试试看,torch.Tensor([1,2]).t() == torch.Tensor([1,2])
的结果。
我们定义了一个TrainDaemon
类,将PyTorch提供的参数和接口进行包装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 class TrainDaemon : def __init__ (self,network,loss,optimizer,train_dataset,test_dataset,**kwargs) : if "network_args" in kwargs: self.model = network(**kwargs["network_args" ]) else : self.model = network() if "cuda" in kwargs and kwargs["cuda" ]==True : self.model.cuda() self.enable_cuda = True else : self.enable_cuda = False if "dataloader_args" in kwargs: self.train_DataLoader = torch.utils.data.DataLoader(train_dataset,**kwargs["dataloader_args" ]) self.test_DataLoader = torch.utils.data.DataLoader(test_dataset,**kwargs["dataloader_args" ]) else : self.train_DataLoader = torch.utils.data.DataLoader(train_dataset,batch_size=1 ) self.test_DataLoader = torch.utils.data.DataLoader(test_dataset,batch_size=1 ) if "optimizer_args" in kwargs: self.optimizer = optimizer(self.model.parameters(),**kwargs["optimizer_args" ]) if "loss_args" in kwargs: self.loss = loss(**kwargs["loss_args" ]) else : self.loss = loss() if "input_transformer" in kwargs: self.inputTrans = kwargs["input_transformer" ] else : self.inputTrans = lambda x:x if "output_transformer" in kwargs: self.outputTrans =kwargs["output_transformer" ] else : self.outputTrans = lambda x:x if "accuracy_fun" in kwargs: self.accuracy = kwargs["accuracy_fun" ] else : self.accuracy = lambda x,y: 1 def _train_batch (self,train_sample,train_output) : if self.enable_cuda: train_sample = self.inputTrans(train_sample).cuda() train_output = self.outputTrans(train_output).cuda() else : train_sample = self.inputTrans(train_sample) train_output = self.outputTrans(train_output) self.optimizer.zero_grad() model_output = self.model(train_sample) train_loss = self.loss(model_output,train_output) train_loss.backward() self.optimizer.step() return train_loss, self.accuracy(model_output,train_output) def _full_test (self) : with torch.no_grad(): correct_rate = 0 test_loss = 0 total_groups = 0 for test_sample,test_output in self.test_DataLoader: total_groups += 1 if self.enable_cuda: test_sample = self.inputTrans(test_sample).cuda() test_output = self.outputTrans(test_output).cuda() else : test_sample = self.inputTrans(test_sample) test_output = self.outputTrans(test_output) model_output = self.model(test_sample) test_loss += self.loss(model_output,test_output) correct_rate += self.accuracy(model_output,test_output) return test_loss/total_groups, correct_rate/total_groups def _train_epoch (self) : correct_rate = 0 train_loss = 0 total_groups = 0 for train_sample,train_output in self.train_DataLoader: _train_loss, _correct_rate = self._train_batch(train_sample,train_output) train_loss +=_train_loss correct_rate += _correct_rate total_groups+=1 return train_loss/total_groups, correct_rate/total_groups def train (self,epoch=1 ,log=False,plot=False) : if plot: plotr = Plotter("Epoch" ,"train_loss" ,"train_accuracy" ,"test_loss" ,"test_accuracy" ) for ep in range(epoch): train_loss, train_accu = self._train_epoch() test_loss, test_accu = self._full_test() if log: print(f"Epoch{ep} :\n" "Train\tloss:{format(train_loss,'0.6f')}\taccu:{format(train_accu,'0.4f')}\n" "Test\tloss:{format(test_loss,'0.6f')}\taccu:{format(test_accu,'0.4f')}" ) if plot: plotr.update( Epoch=ep, train_loss=train_loss,train_accuracy=train_accu, test_loss=test_loss,test_accuracy=test_accu)
我们主要分析_train_batch
函数。首先我们将数据进行变形,使其符合784-10的网络形状(784=28*28,是MNIST数据集的形状)这一变形函数将在后面讨论。
首先调用self.optimizer.zero_grad()
将梯度清空,通过model_output = self.model(test_sample)
,我们将数据穿入网络,计算其误差。再对误差调用train_loss.backward()
,利用梯度的链式传递特性反演并修改原有梯度。前面我们将梯度清空,是因为backward
函数是增加而非覆盖原有的梯度。最后我们使用self.optimizer.step()
来更新梯度值。
至于验证函数,我们调用了with torch.no_grad()
,以保证验证过程不会影响我们的模型。其他的各函数将整个训练过程进行了包装。内容并不复杂,此处建议您自己写一个。
在自己写训练的函数时,常常会涉及到GPU相关运算的问题。GPU上的张量和CPU上的张量不能直接运算,要尤其注意。
接下来我们实例化一个TrainDaemon
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def inputparse (X) : return X.reshape(X.shape[0 ],-1 ) def outputparse (X) : ans_tensor = torch.zeros(len(X),10 ) for batch_loc in range(len(X)): ans_tensor[batch_loc][X[batch_loc]] = 1 return ans_tensor def accuracy (X,Y) : return ((X.argmax(dim=1 ) == Y.argmax(dim=1 )).sum()/len(X.argmax(dim=1 ))) trainer = TrainDaemon( linear_net,torch.nn.MSELoss,torch.optim.SGD, MNIST_train,MNIST_test, network_args={ "inputlayer" :len(MNIST_train[0 ][0 ].reshape(-1 )), "outputlayer" :10 }, optimizer_args={ "lr" :0.1 , "momentum" :0.9 }, dataloader_args={ "shuffle" :True , "batch_size" :256 }, cuda=True , input_transformer=inputparse, output_transformer=outputparse, accuracy_fun=accuracy,)
在这里,我们将数据进行处理变形以符合我们的网络参数。outputparse(X)
函数将MNIST提供的答案[3,4,...]
转换成网络输出的[[0,0,0,1,0,0,0,0,0,0],[0,0,0,0,1,0,0,0,0,0],...]
形式。而inputparse(X)
函数则将原本以平面格形式描述的图像组转成一个向量组。
我们定义一个精度函数accuracy(X,Y)
,这个函数比对两个输出,计数答案相同的组并求比例。
接下来是实例化TrainDaemon
。我们先向实例导入我们的网络和数据,体现在上面的linear_net,MNIST_train,MNIST_test
三个参量和network_args
参数组。我们给出DataLoader
的参数:shuffle
设为真,使其打乱数据集的顺序,batch_size
设为256,这是接下来小批量随机梯度下降的参数。
我们定义损失函数和优化方式。我们使用最熟悉的函数:二乘,即平均平方损失作为损失函数,此处的torch.nn.MSELoss
即为平均平方损失。优化方式采用随机梯度下降法。torch.optim.SGD
是随机梯度下降。optimizer_args
中,lr
标明学习率,momentum
标明动量。
至于此处的随机,原本的梯度下降需要将所有数据带入以得到一个用以下降的梯度,这样计算量较大,因而提出了随机梯度下降法。其最初是从样本空间中随机提出一个样本,作为整体的代表,计算一个梯度并更新。
这样的方法受到样本个体的特性影响很大,收敛效果不好,因此提出了小批量随机梯度下降,指的是提出一批较小的样本,而非单个样本进行训练。这种做法可以使得梯度的下降更接近整体水平,又在性能上较梯度下降有所提高。
最后我们向实例传递我们的输入、输出变形函数。
这样一个线性网络就搭建好了。我们可以运行试试看。
1 trainer.train(100 ,plot=True )
可以观察到,测试的准确度在很短的区间内就达到了大约85%的准确率。
不要重复造轮子——PyTorch
上面的net类显得很复杂,而且所有的参量都是由我们手动设置的。PyTorch提供了线性层的预制模块,其本身和我们目前的模块是相同的。
我们利用线性层来替代手动管理的参量:
1 2 3 4 5 6 7 8 class linear_smart_net (nn.Module) : def __init__ (self,inputlayer,outputlayer) : super(linear_smart_net, self).__init__() self.linearlayer = nn.Linear(inputlayer,outputlayer,bias=True ) def forward (self,x) : x = self.linearlayer(x) return x
此时将由PyTorch为我们管理线性层。注意:PyTorch的Linear
模块中的矩阵处理和我们之前提到的方法是一致的。包括PyTorch的DataLoader
设计和此都是一脉相承的,请勿乱转置您的矩阵。
另外,最初的输入处理部分
1 2 def inputparse (X) : return X.reshape(X.shape[0 ],-1 )
也可以引入一个Flatten层来处理,其功能和上述的inputparse函数一致。
答案的输出处理部分,
1 2 3 4 5 def outputparse (X) : ans_tensor = torch.zeros(len(X),10 ) for batch_loc in range(len(X)): ans_tensor[batch_loc][X[batch_loc]] = 1 return ans_tensor
也可以用既有的nn.functional.one_hot
代替。但是上面这个函数产生的值是整型,我们需要将其转为计算得到的浮点类型。
我们可以以此将网络改为:
1 2 3 4 5 6 7 8 9 10 class linear_smarter_net (nn.Module) : def __init__ (self,inputlayer,outputlayer) : super(linear_smart_net, self).__init__() self.flattenlayer = nn.Flatten() self.linearlayer = nn.Linear(inputlayer,outputlayer,bias=True ) def forward (self,x) : x = self.flattenlayer(x) x = self.linearlayer(x) return x
并将实例模型改为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 trainer = TrainDaemon( linear_net,torch.nn.MSELoss,torch.optim.SGD, MNIST_train,MNIST_test, network_args={ "inputlayer" :len(MNIST_train[0 ][0 ].reshape(-1 )), "outputlayer" :10 }, optimizer_args={ "lr" :0.1 , "momentum" :0.9 }, dataloader_args={ "shuffle" :True , "batch_size" :256 }, cuda=True , output_transformer=lambda X : nn.functional.one_hot(X,10 ).float(), accuracy_fun=accuracy,)
softmax函数
然而,这一个线性函数的可观测性并不好。我们随机选取了一个数据放进网络,得到
1 [ 0.0333 , 0.7411 , 0.0758 , 0.0243 , 0.0601 , 0.0430 , 0.0036 , 0.0748 , -0.0436 , -0.0057 ]
这样一组数据。可以观察到,式中出现了负数。这样的数据参数值描述性差。
我们此处介绍softmax函数。在以分类为目的的神经网络中,通常会使用与softmax类似的函数,来将输出转化为概率分布。softmax描述为:
$$
\mathrm{softmax}(x_i)=\frac{e^{x_i}}{\sum e^{x_j}}
$$
可以看到,softmax函数将$x$从$\mathbb R$上的分布转变为$(0,1]$间的概率分布。这样我们的参数就可以充分得到放大。
我们依照softmax函数的定义,修改我们的网络,在forward函数后部加上一层:
1 2 3 4 5 6 7 8 9 10 11 12 class softmax_linear_net (nn.Module) : def __init__ (self,inputlayer,outputlayer) : super(softmax_linear_net, self).__init__() self.flatten = nn.Flatten() self.linear = nn.Linear(inputlayer,outputlayer,bias=True ) self.softmax = nn.Softmax(dim=1 ) def forward (self,x) : x = self.flatten(x) x = self.linear(x) x = self.softmax(x) return x
其它的参数保持几乎不变,我们训练一次,作为前面不加入softmax函数的对照:
可以观察到,测试的准确度达到了大约93%,比之前的提高了一些。而且再取一组数据放进网络:
1 [9.9987e-01 , 1.6010e-12 , 1.3448e-05 , 3.1601e-06 , 8.1114e-10 , 6.1254e-05 , 3.7817e-05 , 2.8747e-06 , 1.1370e-05 , 1.6873e-07 ]
可以观察到,数据的规则性与可解释性更好了。