问题描述
我有类似于如下code:
VAL fileContent = sc.textFile(文件:/// MYFILE)VAL数据= fileContent.map(行=> {
VAL explodedRow = row.split(,)的地图(S => s.toDouble)。 新LabeledPoint(explodedRow(13),Vectors.dense( 阵列(explodedRow(10),explodedRow(11),explodedRow(12))
))})VAL算法中=新LassoWithSGD()。setIntercept(真)VAL波长= 0.0
algo.optimizer.setRegParam(拉姆达)
algo.optimizer.setNumIterations(100)
algo.optimizer.setStepSize(1.0)VAL模型= algo.run(数据集)
我在云中我的虚拟服务器上的20内核上运行这一点。该文件与几百万行的本地(即不在HDFS)文件。我在本地模式下运行它,用SBT运行(即我不使用群集,我不使用火花提交)。
我本来期望这是获得越来越快的,因为我增加spark.master =本地[*]从设置本地[8]当地[40]。相反,它需要的时间相同数量不管我使用的设置(但我从火花的UI,我的执行程序具有在任何给定时间,该时间等于预期量活动任务的最大数目,即〜8通知本地[8],〜40为本地[40],等等 - 所以似乎并行作品)
在默认情况下的分区数我的数据集RDD是4。我试图迫使分区数到20,没有成功 - 事实上,它减缓了套索算法下来,甚至更多...
是我定标过程中的预期不正确的?有人可以帮我解决这个?
Well, kind of. I hope you don't mind I use a little bit of Python to prove my point.
Lets be generous and say a few million rows is actually ten million. With 40 000 000 values (intercept + 3 features + label per row) it gives around 380 MB of data (Java
Double
is a double-precision 64-bit IEEE 754 floating point). Lets create some dummy data:import numpy as np n = 10 * 1000**2 X = np.random.uniform(size=(n, 4)) # Features y = np.random.uniform(size=(n, 1)) # Labels theta = np.random.uniform(size=(4, 1)) # Estimated parameters
Each step of gradient descent (since default
miniBatchFraction
forLassoWithSGD
is 1.0 it is not really stochastic) ignoring regularization requires operation like this.def step(X, y, theta): return ((X.dot(theta) - y) * X).sum(0)
So lets see how long it takes locally on our data:
%timeit -n 15 step(X, y, theta) ## 15 loops, best of 3: 743 ms per loop
Less than a second per step, without any additional optimizations. Intuitively it is pretty fast and it won't be easy to match this. Just for fun lets see how much it takes to get closed form solution for data like this
%timeit -n 15 np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y) ## 15 loops, best of 3: 1.33 s per loop
Now lets go back to Spark. Residuals for a single point can be computed in parallel. So this is a part which scales linearly when you increase number of partitions which are processed in parallel.
Problem is that you have to aggregate data locally, serialize, transfer to the driver, deserialize and reduce locally to get a final result after each step. Then you have compute new theta, serialize send back and so on.
All of that can be improved by a proper usage of mini batches and some further optimizations but at the end of the day you are limited by a latency of a whole system. It is worth noting that when you increase parallelism on a worker side you also increase amount of work that has to be performed sequentially on a driver and the other way round. One way or another the Amdahl's law will bite you.
Also all of the above ignores actual implementation.
Now lets perform another experiment. First some dummy data:
nCores = 8 # Number of cores on local machine I use for tests rdd = sc.parallelize([], nCores)
and bechmark:
%timeit -n 40 rdd.mapPartitions(lambda x: x).count() ## 40 loops, best of 3: 82.3 ms per loop
It means that with 8 cores, without any real processing or network traffic we get to the point where we cannot do much better by increasing parallelism in Spark (743ms / 8 = 92.875ms per partition assuming linear scalability of the parallelized part)
Just to summarize above:
- if data can be easily processed locally with a closed-form solution using gradient descent is just a waste of time. If you want to increase parallelism / reduce latency you can use good linear algebra libraries
- Spark is designed to handle large amounts of data not to reduce latency. If your data fits in a memory of a few years old smartphone it is a good sign that is not the right tool
- if computations are cheap then constant costs become a limiting factor
Side notes:
- relatively large number of cores per machine is generally speaking not the best choice unless you can match this with IO throughput
这篇关于星火MLLib的LassoWithSGD没有规模?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!