/**
* :: DeveloperApi ::
* GeneralizedLinearModel (GLM) represents a model trained using
* GeneralizedLinearAlgorithm. GLMs consist of a weight vector and
* an intercept.
*
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
@DeveloperApi
abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double // 主构造器)
extends Serializable { /**
* Predict the result given a data point and the weights learned.
*
* @param dataMatrix Row vector containing the features for this data point
* @param weightMatrix Column vector containing the weights of the model
* @param intercept Intercept of the model.
*/
protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double // 预测所属标签 /**
* Predict values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return RDD[Double] where each entry contains the corresponding prediction
*/
def predict(testData: RDD[Vector]): RDD[Double] = {
// A small optimization to avoid serializing the entire model. Only the weightsMatrix
// and intercept is needed.
val localWeights = weights
val bcWeights = testData.context.broadcast(localWeights)
val localIntercept = intercept
testData.mapPartitions { iter =>
val w = bcWeights.value //broadcast调用 read-only(类似Hadoop -》 DistributedCache)
iter.map(v => predictPoint(v, w, localIntercept))
}
} /**
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return Double prediction from the trained model
*/
def predict(testData: Vector): Double = {
predictPoint(testData, weights, intercept)
}
}
// 根据训练数据集得到的weights来预测新的数据点的分类
/**
* Regression model trained using LinearRegression.
*
* @param weights Weights computed for every feature.
* @param intercept Intercept computed for this model.
*/
class LinearRegressionModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double): Double = {
weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept //两向量点乘v1 = [a1, b1], v2 = [a2, b2], v1.v2 = a1 * a2 + b1 * b2
}
}
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.SparkException /**
* Class that represents the features and labels of a data point.
*
* @param label Label for this data point.
* @param features List of features for this data point.
*/
case class LabeledPoint(label: Double, features: Vector /*主构造器*/) {
override def toString: String = {
"(%s,%s)".format(label, features)
}
} /**
* Parser for [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
object LabeledPoint {
/**
* Parses a string resulted from `LabeledPoint#toString` into
* an [[org.apache.spark.mllib.regression.LabeledPoint]].
*/
def parse(s: String): LabeledPoint = {
if (s.startsWith("(")) {
NumericParser.parse(s) match {
case Seq(label: Double, numeric: Any) =>
LabeledPoint(label, Vectors.parseNumeric(numeric))
case other =>
throw new SparkException(/*字符串插值*/s"Cannot parse $other.")
}
}
else { // dense format used before v1.0
val parts = s.split(',')
val label = java.lang.Double.parseDouble(parts(0))
val features = Vectors.dense(parts(1).trim().split(' ').map(java.lang.Double.parseDouble))
LabeledPoint(label, features)
}
}
}
/**
* :: DeveloperApi ::
* GeneralizedLinearAlgorithm implements methods to train a Generalized Linear Model (GLM).
* This class should be extended with an Optimizer to create a new GLM.
*/
@DeveloperApi
abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
extends Logging with Serializable { protected val validators: Seq[RDD[LabeledPoint] => Boolean] = List() /** The optimizer to solve the problem. */
def optimizer: Optimizer /** Whether to add intercept (default: false). */
protected var addIntercept: Boolean = false protected var validateData: Boolean = true /**
* Whether to perform feature scaling before model training to reduce the condition numbers
* which can significantly help the optimizer converging faster. The scaling correction will be
* translated back to resulting model weights, so it's transparent to users.
* Note: This technique is used in both libsvm and glmnet packages. Default false.
*/
private var useFeatureScaling = false /**
* Set if the algorithm should use feature scaling to improve the convergence during optimization.
*/
private[mllib] def setFeatureScaling(useFeatureScaling: Boolean): this.type = {
this.useFeatureScaling = useFeatureScaling
this
} /**
* Create a model given the weights and intercept
*/
protected def createModel(weights: Vector, intercept: Double): M /**
* Set if the algorithm should add an intercept. Default false.
* We set the default to false because adding the intercept will cause memory allocation.
*/
def setIntercept(addIntercept: Boolean): this.type = {
this.addIntercept = addIntercept
this
} /**
* Set if the algorithm should validate data before training. Default true.
*/
def setValidateData(validateData: Boolean): this.type = {
this.validateData = validateData
this
} /**
* Run the algorithm with the configured parameters on an input
* RDD of LabeledPoint entries.
*/
def run(input: RDD[LabeledPoint]): M = {
val numFeatures: Int = input.first().features.size
val initialWeights = Vectors.dense(new Array[Double](numFeatures)) //初始化为0向量
run(input, initialWeights)
} /**
* Run the algorithm with the configured parameters on an input RDD
* of LabeledPoint entries starting from the initial weights provided.
*/
def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { // Check the data properties before running the optimizer
if (validateData && !validators.forall(func => func(input))) {
throw new SparkException("Input validation failed.")
} /**
* Scaling columns to unit variance as a heuristic to reduce the condition number:
*
* During the optimization process, the convergence (rate) depends on the condition number of
* the training dataset. Scaling the variables often reduces this condition number
* heuristically, thus improving the convergence rate. Without reducing the condition number,
* some training datasets mixing the columns with different scales may not be able to converge.
*
* GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return
* the weights in the original scale.
* See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf
*
* Here, if useFeatureScaling is enabled, we will standardize the training features by dividing
* the variance of each column (without subtracting the mean), and train the model in the
* scaled space. Then we transform the coefficients from the scaled space to the original scale
* as GLMNET and LIBSVM do.
*
* Currently, it's only enabled in LogisticRegressionWithLBFGS
*/
val scaler = if (useFeatureScaling) {
(new StandardScaler).fit(input.map(x => x.features))
} else {
null
} // Prepend an extra variable consisting of all 1.0's for the intercept.
val data = if (addIntercept) {
if(useFeatureScaling) {
input.map(labeledPoint =>
(labeledPoint.label, appendBias(scaler.transform(labeledPoint.features))))
} else {
input.map(labeledPoint => (labeledPoint.label, /*加入惩罚函数*/appendBias(labeledPoint.features)))
}
} else {
if (useFeatureScaling) {
input.map(labeledPoint => (labeledPoint.label, scaler.transform(labeledPoint.features)))
} else {
input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
}
} val initialWeightsWithIntercept = if (addIntercept) {
appendBias(initialWeights)
} else {
initialWeights
}
//Very important
val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept) val intercept = if (addIntercept) weightsWithIntercept(weightsWithIntercept.size - 1) else 0.0
var weights =
if (addIntercept) {
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1))
} else {
weightsWithIntercept
} /**
* The weights and intercept are trained in the scaled space; we're converting them back to
* the original scale.
*
* Math shows that if we only perform standardization without subtracting means, the intercept
* will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i
* is the coefficient in the original space, and v_i is the variance of the column i.
*/
if (useFeatureScaling) {
weights = scaler.transform(weights)
} createModel(weights, intercept)
}
}
LinearRegressionWithSGD类主要接收外部数据集、算法参数等输入进行训练得到一个逻辑回归模型LogisticRegressionModel
接收的输入参数包括:
input:输入数据集合,分类标签lable只能是1.0和0.0两种,feature为double类型
numIterations:迭代次数,默认为100
stepSize:迭代步伐大小,默认为1.0
miniBatchFraction:每次迭代参与计算的样本比例,默认为1.0
initialWeights:weight向量初始值,默认为0向量
/**
* Train a linear regression model with no regularization using Stochastic Gradient Descent.
* This solves the least squares regression formulation
* f(weights) = 1/n ||A weights-y||^2
* (which is the mean squared error).
* Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with
* its corresponding right hand side label y.
* See also the documentation for the precise formulation.
*/
class LinearRegressionWithSGD private[mllib] (
private var stepSize: Double,
private var numIterations: Int,
private var miniBatchFraction: Double)
extends GeneralizedLinearAlgorithm[LinearRegressionModel] with Serializable { private val gradient = new LeastSquaresGradient()
private val updater = new SimpleUpdater()
override val optimizer = new GradientDescent(gradient, updater)
.setStepSize(stepSize)
.setNumIterations(numIterations)
.setMiniBatchFraction(miniBatchFraction) /**
* Construct a LinearRegression object with default parameters: {stepSize: 1.0,
* numIterations: 100, miniBatchFraction: 1.0}.
*/
def this() = this(1.0, 100, 1.0) override protected[mllib] def createModel(weights: Vector, intercept: Double) = {
new LinearRegressionModel(weights, intercept)
}
} /**
* Top-level methods for calling LinearRegression.
*/
object LinearRegressionWithSGD { /**
* Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
* in gradient descent are initialized using the initial weights provided.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
* @param initialWeights Initial set of weights to be used. Array should be equal in size to
* the number of features in the data.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
initialWeights: Vector): LinearRegressionModel = {
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
.run(input, initialWeights)
} /**
* Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @param stepSize Step size to be used for each iteration of gradient descent.
* @param miniBatchFraction Fraction of data to be used per iteration.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double): LinearRegressionModel = {
new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
} /**
* Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using the specified step size. We use the entire data set to
* compute the true gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param stepSize Step size to be used for each iteration of Gradient Descent.
* @param numIterations Number of iterations of gradient descent to run.
* @return a LinearRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int,
stepSize: Double): LinearRegressionModel = {
train(input, numIterations, stepSize, 1.0)
} /**
* Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
* of iterations of gradient descent using a step size of 1.0. We use the entire data set to
* compute the true gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
* matrix A as well as the corresponding right hand side label y
* @param numIterations Number of iterations of gradient descent to run.
* @return a LinearRegressionModel which has the weights and offset from training.
*/
def train(
input: RDD[LabeledPoint],
numIterations: Int): LinearRegressionModel = {
train(input, numIterations, 1.0, 1.0)
}
}
(梯度下降 or
最小二乘法求导,计算梯度)
/**
* :: DeveloperApi ::
* Class used to compute the gradient for a loss function, given a single data point.
*/
@DeveloperApi
abstract class Gradient extends Serializable {
/**
* Compute the gradient and loss given the features of a single data point.
*
* @param data features for one data point
* @param label label for this data point
* @param weights weights/coefficients corresponding to features
*
* @return (gradient: Vector, loss: Double)
*/
def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) /**
* Compute the gradient and loss given the features of a single data point,
* add the gradient to a provided vector to avoid creating new objects, and return loss.
*
* @param data features for one data point
* @param label label for this data point
* @param weights weights/coefficients corresponding to features
* @param cumGradient the computed gradient will be added to this vector
*
* @return loss
*/
def compute(data: Vector, label: Double, weights: Vector, cumGradient: Vector): Double
} /**
* :: DeveloperApi ::
* Compute gradient and loss for a logistic loss function, as used in binary classification.
* See also the documentation for the precise formulation.
*/
@DeveloperApi
class LogisticGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val margin = -1.0 * dot(data, weights)
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
val gradient = data.copy
scal(gradientMultiplier, gradient)
val loss =
if (label > 0) {
math.log1p(math.exp(margin)) // log1p is log(1+p) but more accurate for small p
} else {
math.log1p(math.exp(margin)) - margin
} (gradient, loss)
} override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val margin = -1.0 * dot(data, weights)
val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
axpy(gradientMultiplier, data, cumGradient)
if (label > 0) {
math.log1p(math.exp(margin))
} else {
math.log1p(math.exp(margin)) - margin
}
}
} /**
* :: DeveloperApi ::
* Compute gradient and loss for a Least-squared loss function, as used in linear regression.
* This is correct for the averaged least squares loss function (mean squared error)
* L = 1/n ||A weights-y||^2
* See also the documentation for the precise formulation.
*/
@DeveloperApi
class LeastSquaresGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val diff = dot(data, weights) - label
val loss = diff * diff
val gradient = data.copy
scal(2.0 * diff, gradient)
(gradient, loss)
} override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val diff = dot(data, weights) - label
axpy(2.0 * diff, data, cumGradient)
diff * diff
}
} /**
* :: DeveloperApi ::
* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
* See also the documentation for the precise formulation.
* NOTE: This assumes that the labels are {0,1}
*/
@DeveloperApi
class HingeGradient extends Gradient {
override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
val dotProduct = dot(data, weights)
// Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = 2 * label - 1.0
if (1.0 > labelScaled * dotProduct) {
val gradient = data.copy
scal(-labelScaled, gradient)
(gradient, 1.0 - labelScaled * dotProduct)
} else {
(Vectors.sparse(weights.size, Array.empty, Array.empty), 0.0)
}
} override def compute(
data: Vector,
label: Double,
weights: Vector,
cumGradient: Vector): Double = {
val dotProduct = dot(data, weights)
// Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
// Therefore the gradient is -(2y - 1)*x
val labelScaled = 2 * label - 1.0
if (1.0 > labelScaled * dotProduct) {
axpy(-labelScaled, data, cumGradient)
1.0 - labelScaled * dotProduct
} else {
0.0
}
}
}
Updater类负责weight的迭代更新计算,包含了SimpleUpdater、L1Updater、SquaredL2Update
/**
* :: DeveloperApi ::
* Class used to perform steps (weight update) using Gradient Descent methods.
*
* For general minimization problems, or for regularized problems of the form
* min L(w) + regParam * R(w),
* the compute function performs the actual update step, when given some
* (e.g. stochastic) gradient direction for the loss L(w),
* and a desired step-size (learning rate).
*
* The updater is responsible to also perform the update coming from the
* regularization term R(w) (if any regularization is used).
*/
@DeveloperApi
abstract class Updater extends Serializable {
/**
* Compute an updated value for weights given the gradient, stepSize, iteration number and
* regularization parameter. Also returns the regularization value regParam * R(w)
* computed using the *updated* weights.
*
* @param weightsOld - Column matrix of size dx1 where d is the number of features.
* @param gradient - Column matrix of size dx1 where d is the number of features.
* @param stepSize - step size across iterations
* @param iter - Iteration number
* @param regParam - Regularization parameter
*
* @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
* and the second element is the regularization value computed using updated weights.
*/
def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double)
} /**
* :: DeveloperApi ::
* A simple updater for gradient descent *without* any regularization.
* Uses a step-size decreasing with the square root of the number of iterations.
*/
@DeveloperApi
class SimpleUpdater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) (Vectors.fromBreeze(brzWeights), 0)
}
} /**
* :: DeveloperApi ::
* Updater for L1 regularized problems.
* R(w) = ||w||_1
* Uses a step-size decreasing with the square root of the number of iterations. * Instead of subgradient of the regularizer, the proximal operator for the
* L1 regularization is applied after the gradient step. This is known to
* result in better sparsity of the intermediate solution.
*
* The corresponding proximal operator for the L1 norm is the soft-thresholding
* function. That is, each weight component is shrunk towards 0 by shrinkageVal.
*
* If w > shrinkageVal, set weight component to w-shrinkageVal.
* If w < -shrinkageVal, set weight component to w+shrinkageVal.
* If -shrinkageVal < w < shrinkageVal, set weight component to 0.
*
* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
*/
@DeveloperApi
class L1Updater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
// Take gradient step
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
// Apply proximal operator (soft thresholding)
val shrinkageVal = regParam * thisIterStepSize
var i = 0
while (i < brzWeights.length) {
val wi = brzWeights(i)
brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal)
i += 1
} (Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam)
}
} /**
* :: DeveloperApi ::
* Updater for L2 regularized problems.
* R(w) = 1/2 ||w||^2
* Uses a step-size decreasing with the square root of the number of iterations.
*/
@DeveloperApi
class SquaredL2Updater extends Updater {
override def compute(
weightsOld: Vector,
gradient: Vector,
stepSize: Double,
iter: Int,
regParam: Double): (Vector, Double) = {
// add up both updates from the gradient of the loss (= step) as well as
// the gradient of the regularizer (= regParam * weightsOld)
// w' = w - thisIterStepSize * (gradient + regParam * w)
// w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
val thisIterStepSize = stepSize / math.sqrt(iter)
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
brzWeights :*= (1.0 - thisIterStepSize * regParam)
brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
val norm = brzNorm(brzWeights, 2.0) (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
}
}
/**
* :: DeveloperApi ::
* Trait for optimization problem solvers.
*/
@DeveloperApi
trait Optimizer extends Serializable { /**
* Solve the provided convex optimization problem.
*/
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector
}
GradientDescent(梯度下降算法)
/**
* Class used to solve an optimization problem using Gradient Descent.
* @param gradient Gradient function to be used.
* @param updater Updater to be used to update weights after every iteration.
*/
class GradientDescent private[mllib] (private var gradient: Gradient, private var updater: Updater)
extends Optimizer with Logging { private var stepSize: Double = 1.0
private var numIterations: Int = 100
private var regParam: Double = 0.0
private var miniBatchFraction: Double = 1.0 /**
* Set the initial step size of SGD for the first step. Default 1.0.
* In subsequent steps, the step size will decrease with stepSize/sqrt(t)
*/
def setStepSize(step: Double): this.type = {
this.stepSize = step
this
} /**
* :: Experimental ::
* Set fraction of data to be used for each SGD iteration.
* Default 1.0 (corresponding to deterministic/classical gradient descent)
*/
@Experimental
def setMiniBatchFraction(fraction: Double): this.type = {
this.miniBatchFraction = fraction
this
} /**
* Set the number of iterations for SGD. Default 100.
*/
def setNumIterations(iters: Int): this.type = {
this.numIterations = iters
this
} /**
* Set the regularization parameter. Default 0.0.
*/
def setRegParam(regParam: Double): this.type = {
this.regParam = regParam
this
} /**
* Set the gradient function (of the loss function of one single data example)
* to be used for SGD.
*/
def setGradient(gradient: Gradient): this.type = {
this.gradient = gradient
this
} /**
* Set the updater function to actually perform a gradient step in a given direction.
* The updater is responsible to perform the update from the regularization term as well,
* and therefore determines what kind or regularization is used, if any.
*/
def setUpdater(updater: Updater): this.type = {
this.updater = updater
this
} /**
* :: DeveloperApi ::
* Runs gradient descent on the given training data.
* @param data training data
* @param initialWeights initial weights
* @return solution vector
*/
@DeveloperApi
def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
val (weights, _) = GradientDescent.runMiniBatchSGD(
data,
gradient,
updater,
stepSize,
numIterations,
regParam,
miniBatchFraction,
initialWeights)
weights
} } /**
* :: DeveloperApi ::
* Top-level method to run gradient descent.
*/
@DeveloperApi
object GradientDescent extends Logging {
/**
* Run stochastic gradient descent (SGD) in parallel using mini batches.
* In each iteration, we sample a subset (fraction miniBatchFraction) of the total data
* in order to compute a gradient estimate.
* Sampling, and averaging the subgradients over this subset is performed using one standard
* spark map-reduce in each iteration.
*
* @param data - Input data for SGD. RDD of the set of data examples, each of
* the form (label, [feature values]).
* @param gradient - Gradient object (used to compute the gradient of the loss function of
* one single data example)
* @param updater - Updater function to actually perform a gradient step in a given direction.
* @param stepSize - initial step size for the first step
* @param numIterations - number of iterations that SGD should be run.
* @param regParam - regularization parameter
* @param miniBatchFraction - fraction of the input data set that should be used for
* one iteration of SGD. Default value 1.0.
*
* @return A tuple containing two elements. The first element is a column matrix containing
* weights for every feature, and the second element is an array containing the
* stochastic loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIterations: Int,
regParam: Double,
miniBatchFraction: Double,
initialWeights: Vector): (Vector, Array[Double]) = { val stochasticLossHistory = new ArrayBuffer[Double](numIterations) val numExamples = data.count()
val miniBatchSize = numExamples * miniBatchFraction // if no data, return initial weights to avoid NaNs
if (numExamples == 0) { logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
return (initialWeights, stochasticLossHistory.toArray) } // Initialize weights as a column vector
var weights = Vectors.dense(initialWeights.toArray)
val n = weights.size /**
* For the first iteration, the regVal will be initialized as sum of weight squares
* if it's L2 updater; for L1 updater, the same logic is followed.
*/
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2 for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
(grad1 += grad2, loss1 + loss2)
}) /**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
weights = update._1
regVal = update._2
} logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", "))) (weights, stochasticLossHistory.toArray) }
}