我试图模型中,我们正试图解决一个问题遗传学中的步骤建立它。我可以成功运行星火例子的PiAverage例子。这个例子在(本例中10 ^ 6)一个圈投飞镖,并计算的数量土地在圈内估计PI
我想知道的东西像下面的想法的智慧。我想用一个累加器追踪所产生的估计。 JSC是我SparkContext,全$ C $单次运行的c是在问题的结束,感谢您的输入!
累加器LT;双> ACCUM = jsc.accumulator(0.0);//做一个清单1000长通过并行(没有在星火循环,对吧?)
清单<整数GT; numberOfEstimates =新的ArrayList<整数GT;(HOW_MANY_ESTIMATES);//通过这个虚拟目录并行,然后
//积累的估计。 PieceOfPI将包含一个
双totalPi = accum.value();//输出平均值平均值
的System.out.println(+ HOW_MANY_ESTIMATES +平均,皮的估计是+ totalPi / HOW_MANY_ESTIMATES);
进口org.apache.spark.storage.StorageLevel;公共类PiAverage实现Serializable {公共静态无效的主要(字串[] args){ PiAverage PA =新PiAverage();
pa.go();}公共无效围棋(){ //应该做一个类似的参数所有这些决赛应该是
// INT切片=(args.length == 1)?的Integer.parseInt(参数[0]):2;
最终诠释SLICES = 16; //多少个飞镖,在圈内都扔得到一个单一丕估计
最终诠释HOW_MANY_DARTS = 1000000; //许多标靶如何收集平均郫县估计,我们希望在实际PI收敛
最终诠释HOW_MANY_ESTIMATES = 1000; SparkConf sparkConf =新SparkConf()。setAppName(PiAverage)
.setMaster(本地[4]); JavaSparkContext JSC =新JavaSparkContext(sparkConf); //设置虚拟大小HOW_MANY_DARTS的ArrayList的 - 有多少飞镖扔
清单<整数GT; throwsList =新的ArrayList<整数GT;(HOW_MANY_DARTS);
的for(int i = 0; I< HOW_MANY_DARTS;我++){
} //设置虚拟大小HOW_MANY_ESTIMATES的ArrayList中
清单<整数GT; numberOfEstimates =新的ArrayList<整数GT;(HOW_MANY_ESTIMATES);
的for(int i = 0; I< HOW_MANY_ESTIMATES;我++){
} JavaRDD<整数GT;数据= jsc.parallelize(throwsList,洗净切片); 长totalPi = dataSet.filter(新功能<整数,布尔>(){
双X =的Math.random();
双Y =的Math.random();
如果(X * X + Y * Y'。1){
})。计数(); 的System.out.println(
+ HOW_MANY_DARTS +平均,皮的估计是+ 4 * totalPi /(双)HOW_MANY_DARTS); jsc.stop();
等。分为两类;那些需要数据的洗牌作为输入从所有的分区,以及那些不。操作像 GROUPBY
, flatMap
, flatMap
在Spark分布的 JavaPiSpark
的例子是因为它得到一样好。你应该研究为什么它的工作原理。这对于大数据系统的正确数据流模型。你可以使用聚合。在,单击索引,并期待在 AGG
和 aggregateByKey
I am trying to model a genetics problem we are trying to solve, building up to it in steps. I can successfully run the PiAverage examples from Spark Examples. That example "throws darts" at a circle (10^6 in our case) and counts the number that "land in the circle" to estimate PI
Let's say I want to repeat that process 1000 times (in parallel) and average all those estimates. I am trying to see the best approach, seems like there's going to be two calls to parallelize? Nested calls? Is there not a way to chain map or reduce calls together? I can't see it.
I want to know the wisdom of something like the idea below. I thought of tracking the resulting estimates using an accumulator. jsc is my SparkContext, full code of single run is at end of question, thanks for any input!
Accumulator<Double> accum = jsc.accumulator(0.0);
// make a list 1000 long to pass to parallelize (no for loops in Spark, right?)
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
// pass this "dummy list" to parallelize, which then
// calls a pieceOfPI method to produce each individual estimate
// accumulating the estimates. PieceOfPI would contain a
// parallelize call too with the individual test in the code at the end
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES)));
// get the value of the total of PI estimates and print their average
double totalPi = accum.value();
// output the average of averages
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES);
It doesn't seem like a matrix or other answers I see on SO give the answer to this specific question, I have done several searches but I am not seeing how to do this without "parallelizing the parallelization." Is that a bad idea?
(and yes I realize mathematically I could just do more estimates and effectively get the same results :) Trying to build a structure my boss wants, thanks again!
I have put my entire single-test program here if that helps, sans an accumulator I was testing out. The core of this would become PieceOfPI():
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.Accumulable;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
public class PiAverage implements Serializable {
public static void main(String[] args) {
PiAverage pa = new PiAverage();
public void go() {
// should make a parameter like all these finals should be
// int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
final int SLICES = 16;
// how many "darts" are thrown at the circle to get one single Pi estimate
final int HOW_MANY_DARTS = 1000000;
// how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi
final int HOW_MANY_ESTIMATES = 1000;
SparkConf sparkConf = new SparkConf().setAppName("PiAverage")
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw
List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS);
for (int i = 0; i < HOW_MANY_DARTS; i++) {
// setup "dummy" ArrayList of size HOW_MANY_ESTIMATES
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
for (int i = 0; i < HOW_MANY_ESTIMATES; i++) {
JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES);
long totalPi = dataSet.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer i) {
double x = Math.random();
double y = Math.random();
if (x * x + y * y < 1) {
return true;
} else
return false;
"The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS);
Let me start with your "background question". Transformation operations like map
, join
, groupBy
, etc. fall into two categories; those that require a shuffle of data as input from all the partitions, and those that don't. Operations like groupBy
and join
require a shuffle, because you need to bring together all records from all the RDD's partitions with the same keys (think of how SQL JOIN
ops work). On the other hand, map
, flatMap
, filter
, etc. don't require shuffling, because the operation works fine on the input of the previous step's partition. They work on single records at a time, not groups of them with matching keys. Hence, no shuffling is necessary.
This background is necessary to understand that an "extra map" does not have a significant overhead. A sequent of operations like map
, flatMap
, etc. are "squashed" together into a "stage" (which is shown when you look at details for a job in the Spark Web console) so that only one RDD is materialized, the one at the end of the stage.
On to your first question. I wouldn't use an accumulator for this. They are intended for "side-band" data, like counting how many bad lines you parsed. In this example, you might use accumulators to count how many (x,y) pairs were inside the radius of 1 vs. outside, as an example.
The JavaPiSpark
example in the Spark distribution is about as good as it gets. You should study why it works. It's the right dataflow model for Big Data systems. You could use "aggregators". In the Javadocs, click the "index" and look at the agg
, aggregate
, and aggregateByKey
functions. However, they are no more understandable and not necessary here. They provide greater flexibility than map
then reduce
, so they are worth knowing
The problem with your code is that you are effectively trying to tell Spark what to do, rather than expressing your intent and letting Spark optimize how it does it for you.
Finally, I suggest you buy and study O'Reilly's "Learning Spark". It does a good job explaining the internal details, like staging, and it shows lots of example code you can use, too.