DeepLearning tutorial(3)MLP多层感知机原理简介+代码详解
@author:wepon
@blog:http://blog.csdn.net/u012162613/article/details/43221829
本文介绍多层感知机算法,特别是详细解读其代码实现,基于Python theano,代码来自:Multilayer Perceptron,如果你想详细了解多层感知机算法,可以参考:UFLDL教程,或者参考本文第一部分的算法简介。
经详细注释的代码:放在我的github地址上,可下载。
一、多层感知机(MLP)原理简介
多层感知机(MLP,Multilayer Perceptron)也叫人工神经网络(ANN,Artificial Neural Network),除了输入输出层,它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构,如下图:
从上图可以看到,多层感知机层与层之间是全连接的(全连接的意思就是:上一层的任何一个神经元与下一层的所有神经元都有连接)。多层感知机最底层是输入层,中间是隐藏层,最后是输出层。
输入层没什么好说,你输入什么就是什么,比如输入是一个n维向量,就有n个神经元。
隐藏层的神经元怎么得来?首先它与输入层是全连接的,假设输入层用向量X表示,则隐藏层的输出就是
f(W1X+b1),W1是权重(也叫连接系数),b1是偏置,函数f 可以是常用的sigmoid函数或者tanh函数:
最后就是输出层,输出层与隐藏层是什么关系?其实隐藏层到输出层可以看成是一个多类别的逻辑回归,也即softmax回归,所以输出层的输出就是softmax(W2X1+b2),X1表示隐藏层的输出f(W1X+b1)。
MLP整个模型就是这样子的,上面说的这个三层的MLP用公式总结起来就是,函数G是softmax
因此,MLP所有的参数就是各个层之间的连接权重以及偏置,包括W1、b1、W2、b2。对于一个具体的问题,怎么确定这些参数?求解最佳的参数是一个最优化问题,解决最优化问题,最简单的就是梯度下降法了(SGD):首先随机初始化所有参数,然后迭代地训练,不断地计算梯度和更新参数,直到满足某个条件为止(比如误差足够小、迭代次数足够多时)。这个过程涉及到代价函数、规则化(Regularization)、学习速率(learning rate)、梯度计算等,本文不详细讨论,读者可以参考本文顶部给出的两个链接。
了解了MLP的基本模型,下面进入代码实现部分。
二、多层感知机(MLP)代码详细解读(基于python+theano)
"""
This tutorial introduces the multilayer perceptron using Theano. A multilayer perceptron is a logistic regressor where
instead of feeding the input to the logistic regression you insert a
intermediate layer, called the hidden layer, that has a nonlinear
activation function (usually tanh or sigmoid) . One can use many such
hidden layers making the architecture deep. The tutorial will also tackle
the problem of MNIST digit classification. .. math:: f(x) = G( b^{(2)} + W^{(2)}( s( b^{(1)} + W^{(1)} x))), References: - textbooks: "Pattern Recognition and Machine Learning" -
Christopher M. Bishop, section 5 """ from __future__ import print_function __docformat__ = 'restructedtext en' import os
import sys
import timeit import numpy import theano
import theano.tensor as T from logistic_sgd import LogisticRegression, load_data # start-snippet-1
class HiddenLayer(object):
def __init__(self, rng, input, n_in, n_out, W=None, b=None,
activation=T.tanh):
"""
Typical hidden layer of a MLP: units are fully-connected and have
sigmoidal activation function. Weight matrix W is of shape (n_in,n_out)
and the bias vector b is of shape (n_out,). NOTE : The nonlinearity used here is tanh Hidden unit activation is given by: tanh(dot(input,W) + b) :type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights :type input: theano.tensor.dmatrix
:param input: a symbolic tensor of shape (n_examples, n_in) :type n_in: int
:param n_in: dimensionality of input :type n_out: int
:param n_out: number of hidden units :type activation: theano.Op or function
:param activation: Non linearity to be applied in the hidden
layer
"""
'''
初始化函数!HiddenLayer实例化时调用该函数。该层与输入层是全连接,激活函数为tanh。 参数介绍:
rng 类型为:numpy.random.RandomState。
rng 功能为:rng是用来产生随机数的实例化对象。本类中用于对W进行随机数初始化。而非0值初始化。 input 类型为:符号变量T.dmatrix
input 功能为:代表输入数据(在这里其实就是传入的图片数据x,其shape为[n_examples, n_in],n_examples是样本的数量) n_in 类型为:int
n_in 功能为:每一个输入样本数据的长度。和LR中一样,比如一张图片是28*28=784,
那么这里n_in=784,意思就是把图片数据转化为1维。 n_out 类型为:int
n_out 功能为:隐层单元的个数(隐层单元的个数决定了最终结果向量的长度) activation 类型为:theano.Op 或者 function
activation 功能为:隐层的非线性激活函数
'''
self.input = input
# end-snippet-1 # `W` is initialized with `W_values` which is uniformely sampled
# from sqrt(-6./(n_in+n_hidden)) and sqrt(6./(n_in+n_hidden))
# for tanh activation function
# the output of uniform if converted using asarray to dtype
# theano.config.floatX so that the code is runable on GPU
# Note : optimal initialization of weights is dependent on the
# activation function used (among other things).
# For example, results presented in [Xavier10] suggest that you
# should use 4 times larger initial weights for sigmoid
# compared to tanh
# We have no info for other function, so we use the same as
# tanh.
# 根据博文中的介绍,W应该按照均匀分布来随机初始化,其样本数据范围为:
# [sqrt(-6./(fin+fout)),sqrt(6./(fin+fout))]
# 根据博文中的说明,fin很显然就是n_in了,因为n_in就是样本数据的长度,即输入层的单元个数。
# 同样,fout就是n_out,因为n_out是隐层单元的个数。
# rng.uniform()的意思就是产生一个大小为size的矩阵,
# 矩阵的每个元素值最小是low,最大是high,且所有元素值是随机均匀采样。
if W is None:
W_values = numpy.asarray(
rng.uniform(
low=-numpy.sqrt(6. / (n_in + n_out)),
high=numpy.sqrt(6. / (n_in + n_out)),
size=(n_in, n_out)
),
dtype=theano.config.floatX
)
# 如果激活函数是sigmoid的话,每个元素的值是tanh的4倍。
if activation == theano.tensor.nnet.sigmoid:
W_values *= 4 W = theano.shared(value=W_values, name='W', borrow=True)
# 偏置b初始化为0,因为梯度反向传播对b无效
if b is None:
b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
b = theano.shared(value=b_values, name='b', borrow=True) self.W = W
self.b = b # 计算线性输出,即无激活函数的结果,就等于最基本的公式 f(x)=Wx+b
# 如果我们传入了自己的激活函数,那么就把该线性输出送入我们自己的激活函数,
# 此处激活函数为非线性函数tanh,因此产生的结果是非线性的。
lin_output = T.dot(input, self.W) + self.b
self.output = (
# 这个表达式其实很简单,就是其他高级语言里边的三目运算
# condition?"True":"false" 如果条件(activation is None)成立,
# 则self.output=lin_ouput
# 否则,self.output=activation(lin_output)
lin_output if activation is None
else activation(lin_output)
)
# parameters of the model
self.params = [self.W, self.b] # start-snippet-2
class MLP(object):
"""Multi-Layer Perceptron Class A multilayer perceptron is a feedforward artificial neural network model
that has one layer or more of hidden units and nonlinear activations.
Intermediate layers usually have as activation function tanh or the
sigmoid function (defined here by a ``HiddenLayer`` class) while the
top layer is a softmax layer (defined here by a ``LogisticRegression``
class).
"""
'''
多层感知机是一个前馈人工神经网络模型。它包含一个或多个隐层单元以及非线性激活函数。
中间层通常使用tanh或sigmoid作为激活函数,顶层(输出层)通常使用softmax作为分类器。
'''
def __init__(self, rng, input, n_in, n_hidden, n_out):
"""Initialize the parameters for the multilayer perceptron :type rng: numpy.random.RandomState
:param rng: a random number generator used to initialize weights :type input: theano.tensor.TensorType
:param input: symbolic variable that describes the input of the
architecture (one minibatch) :type n_in: int
:param n_in: number of input units, the dimension of the space in
which the datapoints lie :type n_hidden: int
:param n_hidden: number of hidden units :type n_out: int
:param n_out: number of output units, the dimension of the space in
which the labels lie """
'''
rng, input在前边已经介绍过。 n_in : int类型,输入数据的数目(样本数),此处对应的是输入的样本数据。 n_hidden : int类型,隐层单元数目(隐藏层神经元个数) n_out : int类型,输出层单元数目,此处对应的是输入样本的标签数据的数目。
'''
# Since we are dealing with a one hidden layer MLP, this will translate
# into a HiddenLayer with a tanh activation function connected to the
# LogisticRegression layer; the activation function can be replaced by
# sigmoid or any other nonlinear function
# 首先定义一个隐层,用来连接输入层和隐层。
self.hiddenLayer = HiddenLayer(
rng=rng,
input=input,
n_in=n_in,
n_out=n_hidden,
activation=T.tanh
) # The logistic regression layer gets as input the hidden units
# of the hidden layer
# 然后定义一个LR层,用来连接隐层和输出层,隐藏层的输出作为输出层的输入
self.logRegressionLayer = LogisticRegression(
input=self.hiddenLayer.output,
n_in=n_hidden,
n_out=n_out
)
# end-snippet-2 start-snippet-3
# L1 norm ; one regularization option is to enforce L1 norm to
# be small
# 规则化,常用的是L1和L2。是为了防止过拟合。
# 其计算方式很简单。具体规则化的内容在文章下方详细说一下
# L1项的计算公式是:将W的每个元素的绝对值累加求和。此处有2个W,因此两者相加。
self.L1 = (
abs(self.hiddenLayer.W).sum()
+ abs(self.logRegressionLayer.W).sum()
) # square of L2 norm ; one regularization option is to enforce
# square of L2 norm to be small
# L2项的计算公式是:将W的每个元素的平方累加求和。此处有2个W,因此两者相加,再取平方根。
self.L2_sqr = (
(self.hiddenLayer.W ** 2).sum()
+ (self.logRegressionLayer.W ** 2).sum()
) # negative log likelihood of the MLP is given by the negative
# log likelihood of the output of the model, computed in the
# logistic regression layer
# 和LR一样,计算负对数似然函数,计算误差(损失)。
self.negative_log_likelihood = (
self.logRegressionLayer.negative_log_likelihood
)
# same holds for the function computing the number of errors
#错误率
self.errors = self.logRegressionLayer.errors # the parameters of the model are the parameters of the two layer it is
# made out of
self.params = self.hiddenLayer.params + self.logRegressionLayer.params
# end-snippet-3 # keep track of model input
self.input = input def test_mlp(learning_rate=0.01, L1_reg=0.00, L2_reg=0.0001, n_epochs=1000,
dataset='mnist.pkl.gz', batch_size=20, n_hidden=500):
"""
Demonstrate stochastic gradient descent optimization for a multilayer
perceptron This is demonstrated on MNIST. :type learning_rate: float
:param learning_rate: learning rate used (factor for the stochastic
gradient :type L1_reg: float
:param L1_reg: L1-norm's weight when added to the cost (see
regularization) :type L2_reg: float
:param L2_reg: L2-norm's weight when added to the cost (see
regularization) :type n_epochs: int
:param n_epochs: maximal number of epochs to run the optimizer :type dataset: string
:param dataset: the path of the MNIST dataset file from
http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz """
"""
执行训练。学习速率为0.13,最大执行迭代次数为1000,数据集为‘mnist.pkl.gz’,样本块为600个/块
"""
datasets = load_data(dataset) train_set_x, train_set_y = datasets[0]
valid_set_x, valid_set_y = datasets[1]
test_set_x, test_set_y = datasets[2] # compute number of minibatches for training, validation and testing
# 计算总样本可以分成多少个数据块,便于后期循环用。
n_train_batches = train_set_x.get_value(borrow=True).shape[0] // batch_size
n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] // batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0] // batch_size ######################
# BUILD ACTUAL MODEL #
######################
print('... building the model') # allocate symbolic variables for the data
# index是正在使用的样本块的下标
index = T.lscalar() # index to a [mini]batch # 因为LR中的input是TensorType类型,因此引用时,也需要定义一个TensorType类型
# x表示样本的具体数据
x = T.matrix('x') # the data is presented as rasterized images
# 同样y也应该是一个TensorType类型,是一个向量,而且数据类型还是int,因此定义一个T.ivector。
# 其中i表示int,vector表示向量。详细可以参考Theano教程。
# y表示样本的标签。
y = T.ivector('y') # the labels are presented as 1D vector of
# [int] labels # 实例化随机函数生成器
rng = numpy.random.RandomState(1234) # construct the MLP class
# x就是input样本,是一个矩阵,因此定义一个T.matrix
# n_in,n_out的取值在此不再赘述,可以翻看上边的博文。
# 在实例化时,会自动调用LR中的__init__函数
classifier = MLP(
rng=rng,
input=x,
n_in=28 * 28,
n_hidden=n_hidden,
n_out=10
) # start-snippet-4
# the cost we minimize during training is the negative log likelihood of
# the model plus the regularization terms (L1 and L2); cost is expressed
# here symbolically
# 代价函数,这是一个符号变量,cost并不是一个具体的数值。当传入具体的数据后,
# 其才会有具体的数据产生。在原代价函数的基础上加入规则参数*规则项。
cost = (
classifier.negative_log_likelihood(y)
+ L1_reg * classifier.L1
+ L2_reg * classifier.L2_sqr
) # end-snippet-4 # compiling a Theano function that computes the mistakes that are made
# by the model on a minibatch
# 测试模型基本不需要说太多了,主要是用来计算当前样本块的误差率;
# 测试不需要更新数据,因此没有updates,但是测试需要用到givens来代替cost计算公式中x和y的数值。
# 测试模型采用的数据集是测试数据集test_set_x和test_set_y,
test_model = theano.function(
inputs=[index],
outputs=classifier.errors(y),
givens={
x: test_set_x[index * batch_size:(index + 1) * batch_size],
y: test_set_y[index * batch_size:(index + 1) * batch_size]
}
)
# 验证模型和测试模型的不同之处在于计算所用的数据不一样,验证模型用的是验证数据集。
validate_model = theano.function(
inputs=[index],
outputs=classifier.errors(y),
givens={
x: valid_set_x[index * batch_size:(index + 1) * batch_size],
y: valid_set_y[index * batch_size:(index + 1) * batch_size]
}
) # start-snippet-5
# compute the gradient of cost with respect to theta (sorted in params)
# the resulting gradients will be stored in a list gparams
# 对W求导,只需要调用函数T.grad()函数以及指定求(偏)导对象为classifier.W
# MLP对比LR的不同的地方就是求偏导的参数多了2个,因此这个地方用循环来做。
# W1, b1, W2, b2存在classifier.params中。
# 通过遍历params中的参数,以此计算出cost对它们的偏导数,存于gparams中。
gparams = [T.grad(cost, param) for param in classifier.params] # specify how to update the parameters of the model as a list of
# (variable, update expression) pairs # given two lists of the same length, A = [a1, a2, a3, a4] and
# B = [b1, b2, b3, b4], zip generates a list C of same size, where each
# element is a pair formed from the two lists :
# C = [(a1, b1), (a2, b2), (a3, b3), (a4, b4)]
# updates相当于一个更新器,说明了哪个参数需要更新,以及更新公式
# 下面代码指明更新需要参数W,更新公式是(原值-学习速率*梯度值)
# 和求导类似,这个地方也是用到了循环去更新各个参数的值。
updates = [
(param, param - learning_rate * gparam)
for param, gparam in zip(classifier.params, gparams)
] # compiling a Theano function `train_model` that returns the cost, but
# in the same time updates the parameter of the model based on the rules
# defined in `updates`
# 上边所提到的TensorType都是符号变量,符号变量只有传入具体数值时才会生成新的数据。
# theano.function也是一个特色函数。在本实验中,它会生成一个叫train_model的函数。
# 该函数的参数传递入口是inputs,就是将需要传递的参数index赋值给inputs
# 该函数的返回值是通过outputs指定的,也就是返回经过计算后的cost变量。
# 更新器updates是用刚刚定义的update # givens是一个很实用的功能。它的作用是:在计算cost时会用到符号变量x和y(x并没有显示的表达出来,
# 函数negative_log_likehood用到了p_y_given_x,而计算p_y_given_x时用到了input,input就是x)。
# 符号变量经过计算之后始终会有一个自身值,而此处计算cost不用x和y的自身值,那就可以通过givens里边的表达式
# 重新指定计算cost表达式中的x和y所用的值,而且不会改变x和y原来的值。 ## 举个简单的例子:
# state = shared(0)
# inc = T.iscalar('inc')
# accumulator = function([inc], state, updates=[(state, state+inc)])
# state.get_value() #结果是array(0),因为初始值就是0
# accumulator(1) #会输出结果array(0),即原来的state是0,但是继续往下看
# state.get_value() #结果是array(1),根据updates得知,state=state+inc=0+1=1
# accumulator(300) #会输出结果array(1),即原来的state是1,但是继续往下看
# state.get_value() #结果是array(301),根据updates得知,state=state+inc=1+300=301
## 此时state=301,继续做实验
# fn_of_state = state * 2 + inc
## foo用来代替更新表达式中的state,即不用state原来的值,而用新的foo值,但是fn_of_state表达式不变
# foo = T.scalar(dtype=state.dtype)
## skip_shared函数是输入inc和foo,输出fn_of_state,通过givens修改foo代替fn_of_state表达式中的state
# skip_shared = function([inc, foo], fn_of_state, givens=[(state, foo)])
# skip_shared(1, 3) #会输出结果array(7),即fn_of_state=foo * 2 + inc = 3*2+1 = 7
## 再来看看state的原值是多少呢?
# state.get_value() #会输出结果array(301),而不是foo的值3
## 希望通过这个小例子能说清楚givens的作用。
##因为每一次都需要用新的x和y去计算cost值,而不是用原来的上一次的x和y去计算,因此需要用到givens
train_model = theano.function(
inputs=[index],
outputs=cost,
updates=updates,
givens={
x: train_set_x[index * batch_size: (index + 1) * batch_size],
y: train_set_y[index * batch_size: (index + 1) * batch_size]
}
)
# end-snippet-5 ###############
# TRAIN MODEL #
###############
print('... training') # early-stopping parameters
patience = 10000 # look as this many examples regardless
patience_increase = 2 # wait this much longer when a new best is
# found
#当新的验证误差是原来的0.995倍时,才会更新best_validation_loss。即误差小了,但是至少要小了0.995倍。
improvement_threshold = 0.995 # a relative improvement of this much is
# considered significant
#这样设置validation_frequency可以保证每一次epoch都会在验证集上测试。
validation_frequency = min(n_train_batches, patience // 2)
# go through this many
# minibatche before checking the network
# on the validation set; in this case we
# check every epoch best_validation_loss = numpy.inf
best_iter = 0
test_score = 0.
start_time = timeit.default_timer() epoch = 0
# 以下开始循环训练。while循环由epoch控制,是迭代次数。
# for循环由n_train_batches控制,即一次epoch迭代共循环(总样本数/样本块数=n_train_batches)次。
# for循环里面会累加训练过的batch数iter,当iter是validation_frequency倍数时则会在验证集上测试。
# 如果验证集的损失this_validation_loss小于之前最佳的损失best_validation_loss,则更新best_validation_loss和best_iter,同时在testset上测试。
# 如果验证集的损失this_validation_loss小于best_validation_loss*improvement_threshold时则更新patience。
# 当达到最大步数n_epoch时,或者patience<iter时,结束训练
done_looping = False while (epoch < n_epochs) and (not done_looping):
epoch = epoch + 1
for minibatch_index in range(n_train_batches): minibatch_avg_cost = train_model(minibatch_index)
# iteration number
iter = (epoch - 1) * n_train_batches + minibatch_index if (iter + 1) % validation_frequency == 0:
# compute zero-one loss on validation set
validation_losses = [validate_model(i) for i
in range(n_valid_batches)]
this_validation_loss = numpy.mean(validation_losses) print(
'epoch %i, minibatch %i/%i, validation error %f %%' %
(
epoch,
minibatch_index + 1,
n_train_batches,
this_validation_loss * 100.
)
) # if we got the best validation score until now
if this_validation_loss < best_validation_loss:
#improve patience if loss improvement is good enough
if (
this_validation_loss < best_validation_loss *
improvement_threshold
):
patience = max(patience, iter * patience_increase) best_validation_loss = this_validation_loss
best_iter = iter # test it on the test set
test_losses = [test_model(i) for i
in range(n_test_batches)]
test_score = numpy.mean(test_losses) print((' epoch %i, minibatch %i/%i, test error of '
'best model %f %%') %
(epoch, minibatch_index + 1, n_train_batches,
test_score * 100.)) if patience <= iter:
done_looping = True
break end_time = timeit.default_timer()
print(('Optimization complete. Best validation score of %f %% '
'obtained at iteration %i, with test performance %f %%') %
(best_validation_loss * 100., best_iter + 1, test_score * 100.))
print(('The code for file ' +
os.path.split(__file__)[1] +
' ran for %.2fm' % ((end_time - start_time) / 60.)), file=sys.stderr) if __name__ == '__main__':
test_mlp()
注:上面代码中有个需要注意的地方,见下面的例子
class b:
def k(self,u):
print(u) class f(object):
def __init__(self):
self.bb = b()
self.gg = (self.bb.k) ff = f()
ff.gg(44) 输出: 44