3、Spark MLlib Deep Learning Convolution Neural Network(深度学习-卷积神经网络)3.2

http://blog.csdn.net/sunbow0

第三章Convolution Neural Network (卷积神经网络)

2基础及源代码解析

2.1 Convolution Neural Network卷积神经网络基础知识

1)基础知识:

自行google,百度。基础方面的非常多,随便看看就能够,仅仅是非常多没有把细节说得清楚和明确;

能把细节说清楚了讲明确了。能够參照以下2个文章,前提条件是你得先要有基础性的了解。

2)重点參照:

http://www.cnblogs.com/fengfenggirl/p/cnn_implement.html

http://www.cnblogs.com/tornadomeet/archive/2013/05/05/3061457.html

2.2 Deep Learning CNN源代码解析

2.2.1 CNN代码结构

CNN源代码主要包含:CNN,CNNModel两个类,源代码结构例如以下:

Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.2-LMLPHP

CNN结构:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.2-LMLPHP

CNNModel结构:

Spark MLlib Deep Learning Convolution Neural Network (深度学习-卷积神经网络)3.2-LMLPHP

2.2.2 CNN训练过程

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

2.2.3 CNN解析

(1) CNNLayers

/**

* types:网络层类别

* outputmaps:特征map数量

* kernelsize:卷积核k大小

* k: 卷积核

* b: 偏置

* dk:
卷积核的偏导

* db:
偏置的偏导

* scale: pooling大小

*/

caseclassCNNLayers(

types:String,

outputmaps:Double,

kernelsize:Double,

scale:Double,

k:Array[Array[BDM[Double]]],

b: Array[Double],

dk:Array[Array[BDM[Double]]],

db:Array[Double])extends Serializable

CNNLayers:自己定义数据类型。存储网络每一层的參数信息。

(2) CnnSetup

卷积神经网络參数初始化。依据參数逐层构建CNN网络。

/** 卷积神经网络层參数初始化. */

defCnnSetup: (Array[CNNLayers], BDM[Double], BDM[Double], Double) = {

varinputmaps1=1.0

varmapsize1=mapsize

varconfinit= ArrayBuffer[CNNLayers]()

to)
{// layer

valtype1=types(l)

valoutputmap1=outputmaps(l)

valkernelsize1=kernelsize(l)

valscale1=scale(l)

vallayersconf=if(type1=="s"){//每一层參数初始化

mapsize1 =mapsize1 /scale1

valb1 = Array.fill(inputmaps1.toInt)(0.0)

,)))

new CNNLayers(type1,outputmap1,kernelsize1,scale1,ki,b1,ki,b1)

} elseif(type1=="c"){

mapsize1 =mapsize1 -kernelsize1+1.0

)

)

valki = ArrayBuffer[Array[BDM[Double]]]()

to)
{// input map

valkj = ArrayBuffer[BDM[Double]]()

to)
{// output map

valkk = (BDM.rand[Double](kernelsize1.toInt,kernelsize1.toInt)-0.5)*2.0*
sqrt(6.0/ (fan_in+fan_out))

kj +=kk

}

ki +=kj.toArray

}

valb1 = Array.fill(outputmap1.toInt)(0.0)

inputmaps1 =outputmap1

new CNNLayers(type1,outputmap1,kernelsize1,scale1,ki.toArray,b1,ki.toArray,b1)

} else{

,)))

valb1 = Array(0.0)

new CNNLayers(type1,outputmap1,kernelsize1,scale1,ki,b1,ki,b1)

}

confinit+=layersconf

}

,)
* ,) *inputmaps1

)

valffW= (BDM.rand[Double](onum,fvnum.toInt)-0.5)*2.0*
sqrt(6.0/ (onum+fvnum))

(confinit.toArray,ffb,ffW,alpha)

}

(3) expand

克罗内克积方法。

/**

* 克罗内克积

*

*/

defexpand(a: BDM[Double],s: Array[Int]): BDM[Double]= {

// val a = BDM((1.0, 2.0), (3.0,4.0), (5.0, 6.0))

// val s = Array(3, 2)

valsa = Array(a.rows, a.cols)

vartt =new Array[Array[Int]](sa.length)

to
by -) {

varh =BDV.zeros[Int](sa(ii) * s(ii))

to
by s(

tt(ii) = Accumulate(h).data

}

).length,).length)

tob.rows
-) {

tob.cols
-) {

)(j1)
-, )()

}

}

b

}

(4) convn

卷积计算方法。

/**

* convn卷积计算

*/

defconvn(m0: BDM[Double],k0: BDM[Double],shape: String): BDM[Double]=
{

//val m0 = BDM((1.0, 1.0, 1.0, 1.0),(0.0, 0.0, 1.0, 1.0), (0.0, 1.0, 1.0, 0.0), (0.0, 1.0, 1.0, 0.0))

//val k0 = BDM((1.0, 1.0), (0.0,1.0))

//val m0 = BDM((1.0, 1.0, 1.0),(1.0, 1.0, 1.0), (1.0, 1.0, 1.0))

//val k0 = BDM((1.0, 2.0, 3.0),(4.0, 5.0, 6.0), (7.0, 8.0, 9.0))

valout1= shapematch{

case"valid"=>

valm1 = m0

valk1 = k0.t

valrow1 =m1.rows -k1.rows
+

valcol1 =m1.cols -k1.cols
+

varm2 = BDM.zeros[Double](row1,col1)

to)
{

to)
{

valr1 =i

valr2 =r1 +k1.rows
-

valc1 =j

valc2 =c1 +k1.cols
-

valmi =m1(r1 tor2,c1
toc2)

m2(i,j) = (mi :*k1).sum

}

}

m2

case"full"=>


* (k0.), m0. * (k0.))

to m0.)
{

to m0.)
{

) +)
+j) = m0(i,j)

}

}

valk1 = Rot90(Rot90(k0))

valrow1 =m1.rows -k1.rows
+

valcol1 =m1.cols -k1.cols
+

varm2 = BDM.zeros[Double](row1,col1)

to)
{

to)
{

valr1 =i

valr2 =r1 +k1.rows
-

valc1 =j

valc2 =c1 +k1.cols
-

valmi =m1(r1 tor2,c1
toc2)

m2(i,j) = (mi :*k1).sum

}

}

m2

}

out1

}

(5) CNNtrain

对神经网络进行训练。

输入參数:train_d 训练RDD数据。opts训练參数。

输出:CNNModel,训练模型。

/**

* 执行卷积神经网络算法.

*/

defCNNtrain(train_d: RDD[(BDM[Double], BDM[Double])], opts: Array[Double]):CNNModel = {

valsc =train_d.sparkContext

varinitStartTime= System.currentTimeMillis()

varinitEndTime= System.currentTimeMillis()

// 參数初始化配置

var(cnn_layers,cnn_ffb,cnn_ffW,cnn_alpha)=
CnnSetup

// 样本数据划分:训练数据、交叉检验数据

)

valsplitW1= Array(1.0-validation,validation)

valtrain_split1= train_d.randomSplit(splitW1, System.nanoTime())

)

)

// m:训练样本的数量

valm =train_t.count

// 计算batch的数量

).toInt

).toInt

valnumbatches= (m /batchsize).toInt

varrL = Array.fill(numepochs *numbatches.toInt)(0.0)

// numepochs是循环的次数

tonumepochs) {

initStartTime= System.currentTimeMillis()

valsplitW2= Array.fill(numbatches)(1.0 /numbatches)

//
依据分组权重。随机划分每组样本数据

tonumbatches) {

//
权重

valbc_cnn_layers =sc.broadcast(cnn_layers)

valbc_cnn_ffb =sc.broadcast(cnn_ffb)

valbc_cnn_ffW =sc.broadcast(cnn_ffW)

//
样本划分

valtrain_split2 =train_t.randomSplit(splitW2, System.nanoTime())

)

// CNNff是进行前向传播

// net =cnnff(net, batch_x);

valtrain_cnnff = CNN.CNNff(batch_xy1,bc_cnn_layers,bc_cnn_ffb,bc_cnn_ffW)

// CNNbp是后向传播

// net =cnnbp(net, batch_y);

valtrain_cnnbp = CNN.CNNbp(train_cnnff,bc_cnn_layers,bc_cnn_ffb,bc_cnn_ffW)

//
权重更新

//  net =cnnapplygrads(net,opts);

valtrain_nnapplygrads = CNN.CNNapplygrads(train_cnnbp,bc_cnn_ffb,bc_cnn_ffW,cnn_alpha)

cnn_ffW =train_nnapplygrads._1

cnn_ffb =train_nnapplygrads._2

cnn_layers =train_nnapplygrads._3

// error and loss

//
输出误差计算

// net.L = 1/2* sum(net.e(:) .^ 2) / size(net.e, 2);

valrdd_loss1 =train_cnnbp._1.map(f => f._5)

val (loss2,counte)=rdd_loss1.treeAggregate((0.0,0L))(

seqOp = (c, v) => {

// c: (e, count), v: (m)

vale1 = c._1

vale2 = (v :* v).sum

valesum =e1 +e2

()

},

combOp = (c1, c2) => {

// c: (e, count)

vale1 = c1._1

vale2 = c2._1

valesum =e1 +e2

(esum, c1._2 + c2._2)

})

valLoss = (loss2/counte.toDouble)*0.5

) {

rL(n) =Loss

} else {

)
+0.01 *Loss

}

}

initEndTime= System.currentTimeMillis()

//
打印输出结果

printf("epoch: numepochs = %d , Took = %dseconds; batch train mse = %f.\n",i,
scala.math.ceil(().toLong,))

}

// 计算训练误差及交叉检验误差

// Full-batch trainmse

varloss_train_e=0.0

varloss_val_e=0.0

loss_train_e= CNN.CNNeval(train_t,sc.broadcast(cnn_layers),sc.broadcast(cnn_ffb),sc.broadcast(cnn_ffW))

)loss_val_e = CNN.CNNeval(train_v,sc.broadcast(cnn_layers),sc.broadcast(cnn_ffb),sc.broadcast(cnn_ffW))

printf("epoch: Full-batch train mse = %f, valmse = %f.\n",loss_train_e,loss_val_e)

newCNNModel(cnn_layers,cnn_ffW,cnn_ffb)

}

(6) CNNff

前向传播计算,计算每层输出。从输入层->隐含层->输出层。计算每一层每个节点的输出值。

输入參数:

batch_xy1 样本数据

bc_cnn_layers 每层的參数

bc_cnn_ffb 偏置參数

bc_cnn_ffW 权重參数

输出:

每一层的计算结果。

/**

* cnnff是进行前向传播

* 计算神经网络中的每一个节点的输出值;

*/

defCNNff(

batch_xy1: RDD[(BDM[Double], BDM[Double])],

bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],

bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],

bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]):RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double])] = {

层:a(1)=[x]

valtrain_data1= batch_xy1.map { f =>

vallable= f._1

valfeatures= f._2

valnna1= Array(features)

valnna= ArrayBuffer[Array[BDM[Double]]]()

nna+=nna1

(lable,nna)

}

至n-1层计算

valtrain_data2=train_data1.map{ f =>

vallable= f._1

valnn_a= f._2

varinputmaps1=1.0

valn =bc_cnn_layers.value.length

// for each layer

to)
{

valtype1 = bc_cnn_layers.value(l).types

valoutputmap1 = bc_cnn_layers.value(l).outputmaps

valkernelsize1 = bc_cnn_layers.value(l).kernelsize

valscale1 = bc_cnn_layers.value(l).scale

valk1 = bc_cnn_layers.value(l).k

valb1 = bc_cnn_layers.value(l).b

valnna1 = ArrayBuffer[BDM[Double]]()

if (type1 =="c"){

to)
{// output map

// createtemp output map

)().rows
-, )().cols -kernelsize1.toInt
+ )

to)
{// input map

// convolve with corresponding kernel and add to temp outputmap

// z = z + convn(net.layers{l - 1}.a{i}, net.layers{l}.k{i}{j},'valid');

z = )(i),k1(i)(j),"valid")

}

// add bias, pass through nonlinearity

// net.layers{l}.a{j} =sigm(z + net.layers{l}.b{j})

valnna0 = sigm(z +b1(j))

nna1 +=nna0

}

nn_a +=nna1.toArray

inputmaps1 =outputmap1

} elseif (type1=="s"){

to)
{

// z =convn(net.layers{l - 1}.a{j}, ones(net.layers{l}.scale) /(net.layers{l}.scale ^ 2), 'valid'); replace with variable

// net.layers{l}.a{j} = z(1 : net.layers{l}.scale : end, 1 :net.layers{l}.scale : end, :);

)(j),
BDM.ones[Double](scale1.toInt,scale1.toInt) / (scale1 *scale1),"valid")


to - byscale1.toInt).t +0.0


to - byscale1.toInt).t +0.0

valnna0 =zs2

nna1 +=nna0

}

nn_a +=nna1.toArray

}

}

// concatenate all end layer feature mapsinto vector

valnn_fv1= ArrayBuffer[Double]()

tonn_a(n
-).length -) {

)(j).data

}

,nn_fv1.toArray)

// feedforward into outputperceptrons

// net.o =sigm(net.ffW * net.fv +repmat(net.ffb,1, size(net.fv, 2)));

valnn_o= sigm(bc_cnn_ffW.value *nn_fv + bc_cnn_ffb.value)

(lable,nn_a.toArray,nn_fv,nn_o)

}

train_data2

}

(7) CNNbp

后向传播计算。计算每层导数,输出层->隐含层->输入层。计算每一个节点的偏导数。也即误差反向传播。

输入參数:

train_cnnff 前向计算结果

bc_cnn_layers 每层的參数

bc_cnn_ffb 偏置參数

bc_cnn_ffW 权重參数

输出:

每一层的偏导数计算结果。

/**

* CNNbp是后向传播

* 计算权重的平均偏导数

*/

defCNNbp(

train_cnnff: RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double],BDM[Double])],

bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],

bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],

bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]):(RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double], BDM[Double],BDM[Double], BDM[Double], BDM[Double], Array[Array[BDM[Double]]])],BDM[Double], BDM[Double],
Array[CNNLayers]) = {

// error : net.e = net.o - y

valn =bc_cnn_layers.value.length

valtrain_data3= train_cnnff.map { f =>

valnn_e= f._4 - f._1

(f._1, f._2, f._3, f._4,nn_e)

}

// backprop deltas

// 输出层的灵敏度或者残差

// net.od = net.e .* (net.o .* (1 - net.o))

// net.fvd = (net.ffW' * net.od)

valtrain_data4=train_data3.map{ f =>

valnn_e= f._5

valnn_o= f._4

valnn_fv= f._3

valnn_od=nn_e:* (nn_o:* (1.0-nn_o))

).types
=="c") {

// net.fvd = net.fvd .* (net.fv .* (1 - net.fv));

valnn_fvd1 = bc_cnn_ffW.value.t *nn_od

valnn_fvd2 =nn_fvd1:* (nn_fv:* (1.0-nn_fv))

nn_fvd2

} else{

valnn_fvd1 = bc_cnn_ffW.value.t *nn_od

nn_fvd1

}

(f._1, f._2, f._3, f._4, f._5,nn_od,nn_fvd)

}

// reshape feature vector deltas intooutput map style

valsa1=train_data4.map(f=> f._2(n
-)()).take()().rows

valsa2=train_data4.map(f=> f._2(n
-)()).take()().cols

valfvnum=sa1*sa2

valtrain_data5=train_data4.map{ f =>

valnn_a= f._2

valnn_fvd= f._7

valnn_od= f._6

valnn_fv= f._3

varnnd=newArray[Array[BDM[Double]]](n)

valnnd1= ArrayBuffer[BDM[Double]]()

tonn_a(n
-).length -) {

valtmp1 =nn_fvd((j *fvnum)
to (() *),)

valtmp2 =newBDM(sa1,sa2,tmp1.data)

nnd1 +=tmp2

}

) =nnd1.toArray

) to
by -) {

valtype1 = bc_cnn_layers.value(l).types

varnnd2 = ArrayBuffer[BDM[Double]]()

if (type1 =="c"){

tonn_a(l).length
-) {

valtmp_a =nn_a(l)(j)

)(j)

).scale.toInt

valtmp1 =tmp_a:* (1.0-tmp_a)

valtmp2 = expand(tmp_d,Array(tmp_scale,tmp_scale))/
(tmp_scale.toDouble*tmp_scale)

nnd2 += (tmp1 :*tmp2)

}

} elseif (type1=="s"){

tonn_a(l).length
-) {

).).cols)

tonn_a(l
+).length -) {

// z = z + convn(net.layers{l + 1}.d{j}, rot180(net.layers{l +1}.k{i}{j}), 'full');

z = )(j),Rot90(Rot90(bc_cnn_layers.value(l
+).k(i)(j))),"full")

}

nnd2 +=z

}

}

nnd(l) =nnd2.toArray

}

(f._1, f._2, f._3, f._4, f._5, f._6,
f._7, nnd)

}

// dk db calcgradients

varcnn_layers= bc_cnn_layers.value

to)
{

valtype1= bc_cnn_layers.value(l).types

)()

vallena2=train_data5.map(f=> f._2(l
-).length).take()()

if(type1=="c"){

to)
{

to)
{

valrdd_dk_ij =train_data5.map{ f =>

valnn_a = f._2

valnn_d = f._8

valtmp_d =nn_d(l)(j)

)(i)

convn(Rot90(Rot90(tmp_a)),tmp_d,"valid")

}

)().)().cols)

val (dk_ij,count_dk)=rdd_dk_ij.treeAggregate((initdk,0L))(

seqOp = (c, v) => {

// c: (m, count), v: (m)

valm1 = c._1

valm2 =m1 + v

()

},

combOp = (c1, c2) => {

// c: (m, count)

valm1 = c1._1

valm2 = c2._1

valm3 =m1 +m2

(m3, c1._2 + c2._2)

})

valdk =dk_ij/count_dk.toDouble

cnn_layers(l).dk(i)(j) =dk

}

valrdd_db_j =train_data5.map{ f =>

valnn_d = f._8

valtmp_d =nn_d(l)(j)

Bsum(tmp_d)

}

valdb_j =rdd_db_j.reduce(_+ _)

valcount_db =rdd_db_j.count

valdb =db_j/count_db.toDouble

cnn_layers(l).db(j) =db

}

}

}

// net.dffW = net.od * (net.fv)' /size(net.od, 2);

// net.dffb = mean(net.od, 2);

valtrain_data6=train_data5.map{ f =>

valnn_od= f._6

valnn_fv= f._3

nn_od*nn_fv.t

}

valtrain_data7=train_data5.map{ f =>

valnn_od= f._6

nn_od

}

valinitffW= BDM.zeros[Double](bc_cnn_ffW.value.rows, bc_cnn_ffW.value.cols)

val(ffw2,countfffw2)=train_data6.treeAggregate((initffW,0L))(

seqOp = (c, v) => {

// c: (m, count), v: (m)

valm1 = c._1

valm2 =m1 + v

()

},

combOp = (c1, c2) => {

// c: (m, count)

valm1 = c1._1

valm2 = c2._1

valm3 =m1 +m2

(m3, c1._2 + c2._2)

})

valcnn_dffw=ffw2/countfffw2.toDouble

valinitffb= BDM.zeros[Double](bc_cnn_ffb.value.rows, bc_cnn_ffb.value.cols)

val(ffb2,countfffb2)=train_data7.treeAggregate((initffb,0L))(

seqOp = (c, v) => {

// c: (m, count), v: (m)

valm1 = c._1

valm2 =m1 + v

()

},

combOp = (c1, c2) => {

// c: (m, count)

valm1 = c1._1

valm2 = c2._1

valm3 =m1 +m2

(m3, c1._2 + c2._2)

})

valcnn_dffb=ffb2/countfffb2.toDouble

(train_data5,cnn_dffw,cnn_dffb,cnn_layers)

}

(8) CNNapplygrads

权重更新。

输入參数:

train_cnnbp:CNNbp输出值

bc_cnn_ffb:神经网络偏置參数

bc_cnn_ffW:神经网络权重參数

alpha:更新的学习率

输出參数:(cnn_ffW, cnn_ffb, cnn_layers)更新后权重參数。

/**

* NNapplygrads是权重更新

* 权重更新

*/

defCNNapplygrads(

train_cnnbp: (RDD[(BDM[Double], Array[Array[BDM[Double]]], BDM[Double],BDM[Double], BDM[Double], BDM[Double], BDM[Double],Array[Array[BDM[Double]]])], BDM[Double], BDM[Double], Array[CNNLayers]),

bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],

bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]],

alpha: Double): (BDM[Double], BDM[Double], Array[CNNLayers]) = {

valtrain_data5= train_cnnbp._1

valcnn_dffw= train_cnnbp._2

valcnn_dffb= train_cnnbp._3

varcnn_layers= train_cnnbp._4

varcnn_ffb= bc_cnn_ffb.value

varcnn_ffW= bc_cnn_ffW.value

valn =cnn_layers.length

to)
{

valtype1=cnn_layers(l).types

)()

vallena2=train_data5.map(f=> f._2(l
-).length).take()()

if(type1=="c"){

to)
{

to)
{

cnn_layers(l).k(ii)(j) =cnn_layers(l).k(ii)(j)
-cnn_layers(l).dk(ii)(j)

}

cnn_layers(l).b(j) =cnn_layers(l).b(j)
-cnn_layers(l).db(j)

}

}

}

cnn_ffW=cnn_ffW+cnn_dffw

cnn_ffb=cnn_ffb+cnn_dffb

(cnn_ffW,cnn_ffb,cnn_layers)

}

(9) CNNeval

误差计算。

/**

* nneval是进行前向传播并计算输出误差

* 计算神经网络中的每一个节点的输出值,并计算平均误差;

*/

defCNNeval(

batch_xy1: RDD[(BDM[Double], BDM[Double])],

bc_cnn_layers: org.apache.spark.broadcast.Broadcast[Array[CNNLayers]],

bc_cnn_ffb: org.apache.spark.broadcast.Broadcast[BDM[Double]],

bc_cnn_ffW: org.apache.spark.broadcast.Broadcast[BDM[Double]]): Double ={

// CNNff是进行前向传播

valtrain_cnnff= CNN.CNNff(batch_xy1, bc_cnn_layers, bc_cnn_ffb, bc_cnn_ffW)

// error and loss

// 输出误差计算

valrdd_loss1=train_cnnff.map{ f =>

valnn_e= f._4 - f._1

nn_e

}

val(loss2,counte)=rdd_loss1.treeAggregate((0.0,0L))(

seqOp = (c, v) => {

// c: (e, count), v: (m)

vale1 = c._1

vale2 = (v :* v).sum

valesum =e1 +e2

()

},

combOp = (c1, c2) => {

// c: (e, count)

vale1 = c1._1

vale2 = c2._1

valesum =e1 +e2

(esum, c1._2 + c2._2)

})

valLoss= (loss2/counte.toDouble)*0.5

Loss

}

2.2.4 CNNModel解析

(1) CNNModel

CNNModel:存储CNN网络參数,包含:cnn_layers每一层的配置參数,cnn_ffW权重,dbn_b偏置。cnn_ffb偏置。

class CNNModel(

valcnn_layers:Array[CNNLayers],

valcnn_ffW:BDM[Double],

valcnn_ffb: BDM[Double])extends Serializable {

}

(2) predict

predict:依据模型进行预測计算。

/**

* 返回预測结果

*  返回格式:(label, feature, predict_label, error)

*/

defpredict(dataMatrix: RDD[(BDM[Double], BDM[Double])]): RDD[PredictCNNLabel] = {

valsc =dataMatrix.sparkContext

valbc_cnn_layers=sc.broadcast(cnn_layers)

valbc_cnn_ffW=sc.broadcast(cnn_ffW)

valbc_cnn_ffb=sc.broadcast(cnn_ffb)

// CNNff是进行前向传播

valtrain_cnnff= CNN.CNNff(dataMatrix,bc_cnn_layers,bc_cnn_ffb,bc_cnn_ffW)

valrdd_predict=train_cnnff.map{ f =>

vallabel= f._1

)()

valnnan= f._4

valerror= f._4 - f._1

PredictCNNLabel(label,nna1,nnan,error)

}

rdd_predict

}

(3) Loss

Loss:依据预測结果计算误差。

/**

* 计算输出误差

* 平均误差;

*/

defLoss(predict: RDD[PredictCNNLabel]): Double = {

valpredict1= predict.map(f => f.error)

// error and loss

// 输出误差计算

valloss1=predict1

val(loss2,counte)=loss1.treeAggregate((0.0,0L))(

seqOp = (c, v) => {

// c: (e, count), v: (m)

vale1 = c._1

vale2 = (v :* v).sum

valesum =e1 +e2

()

},

combOp = (c1, c2) => {

// c: (e, count)

vale1 = c1._1

vale2 = c2._1

valesum =e1 +e2

(esum, c1._2 + c2._2)

})

valLoss= (loss2/counte.toDouble)*0.5

Loss

}

转载请注明出处:

http://blog.csdn.net/sunbow0

05-11 09:31