本篇博客将介绍Spark RDD的Map系算子的基本用法。

    1、map

    map将RDD的元素一个个传入call方法,经过call方法的计算之后,逐个返回,生成新的RDD,计算之后,记录数不会缩减。示例代码,将每个数字加10之后再打印出来, 代码如下

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

public class Map {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
		JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
		JavaRDD<Integer> listRDD = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4));

		JavaRDD<Integer> numRDD = listRDD.map(new Function<Integer, Integer>() {
			@Override
			public Integer call(Integer num) throws Exception {
				return num + 10;
			}
		});
		numRDD.foreach(new VoidFunction<Integer>() {
			@Override
			public void call(Integer num) throws Exception {
				System.out.println(num);
			}
		});
	}

}

    执行结果:

Spark RDD操作之Map系算子-LMLPHP

    2、flatMap

    flatMap和map的处理方式一样,都是把原RDD的元素逐个传入进行计算,但是与之不同的是,flatMap返回值是一个Iterator,也就是会一生多,超生

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;

public class FlatMap {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("spark map").setMaster("local[*]");
		JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
		JavaRDD<String> listRDD = javaSparkContext
				.parallelize(Arrays.asList("hello wold", "hello java", "hello spark"));
		JavaRDD<String> rdd = listRDD.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(String input) throws Exception {
				return Arrays.asList(input.split(" ")).iterator();
			}
		});
		rdd.foreach(new VoidFunction<String>() {
			private static final long serialVersionUID = 1L;
			@Override
			public void call(String num) throws Exception {
				System.out.println(num);
			}
		});
	}

}

    执行结果:

Spark RDD操作之Map系算子-LMLPHP

   

03-07 23:39