I am trying to receive streaming data from kafka. In this process I am able to receive and store the streaming data into JavaPairInputDStream. Now I need to analyze this data with out storing it into any database.So I want to convert this JavaPairInputDStream to DataSet or DataFrame


What I tried so far is:

//Streaming Working Code

这是使用 Spark 2.0 的完整工作代码.

Here is the complete working code using Spark 2.0.

public class KafkaToSparkStreaming {
    public static  void main(String arr[]) throws InterruptedException

        SparkConf conf = new SparkConf();
        conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data.
        //conf.set("spark.ui.port", "7077");    //Port for application's dashboard, which shows memory and workload data.
        conf.set("dynamicAllocation.enabled","false");  //Which scales the number of executors registered with this application up and down based on the workload
        //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP
        conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");  //For serializing objects that will be sent over the network or need to be cached in serialized form.
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");

        JavaSparkContext sc = new JavaSparkContext(conf);
        // Create the context with 2 seconds batch size
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Map<String, String> kafkaParams = new HashMap<String, String>();

        kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path.
        kafkaParams.put("group.id", "testgroup");   //String that uniquely identifies the group of consumer processes to which this consumer belongs
        kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic.
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker.
        kafkaParams.put("request.required.acks", "1");  //Producer to require an acknowledgement from the Broker that the message was received.

        Set<String> topics = Collections.singleton("ny-2008.csv");

        //Create an input DStream for Receiving data from socket
        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                kafkaParams, topics);

        //Create JavaDStream<String>
        JavaDStream<String> msgDataStream = directKafkaStream.map(new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String, String> tuple2) {
              return tuple2._2();
        //Create JavaRDD<Row>
        msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
              public void call(JavaRDD<String> rdd) {
                  JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
                      public Row call(String msg) {
                        Row row = RowFactory.create(msg);
                        return row;
        //Create Schema
        StructType schema = DataTypes.createStructType(new StructField[] {DataTypes.createStructField("Message", DataTypes.StringType, true)});
        //Get Spark 2.0 session
        SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
        Dataset<Row> msgDataFrame = spark.createDataFrame(rowRDD, schema);



class JavaSparkSessionSingleton {
      private static transient SparkSession instance = null;
      public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
          instance = SparkSession
        return instance;


08-24 04:39