任何人都请帮助我,如何从现有的RDD创建DStream。
我的代码是:
JavaSparkContext ctx = new JavaSparkContext(conf);
JavaRDD<String> rddd = ctx.parallelize(arraylist);
现在,我需要将这些 rddd 用作 JavaStreamingContext 的输入。
最佳答案
试试queueStream API。
将RDD队列作为流,每个推入队列的RDD将被视为DStream中的一批数据,并像流一样进行处理。
public <T> InputDStream<T> queueStream(scala.collection.mutable.Queue<RDD<T>> queue,
boolean oneAtATime,
scala.reflect.ClassTag<T> evidence$15)
Create an input stream from a queue of RDDs. In each batch, it will process either one or all of the RDDs returned by the queue.
NOTE: Arbitrary RDDs can be added to queueStream, there is no way to recover data of those RDDs, so queueStream doesn't support checkpointing.