我正在尝试使用Spark Streaming编写一个简单的应用程序,以从Kafka中读取内容,并对从主题中读取一个单词的次数进行持久计数。我在调用非常重要的updateStateByKey方法时遇到问题,在这里我似乎遇到了泛型问题,但是我不确定这是怎么回事。
错误:
The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>)
in the type JavaPairDStream<String,Integer> is not applicable for the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>)
我的代码:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.Arrays;
import scala.Tuple2;
import scala.collection.immutable.List;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.google.common.base.Optional;
public class SimpleSparkApp {
static String appName = "Streaming";
static String master = "local[*]";
static String zk = "localhost:2181";
static String consumerGroupId = "sparkStreaming";
static String[] topics = {"testTopic", };
static Integer numThreads = new Integer(1);
static final Pattern SPACE = Pattern.compile(" ");
static String checkpointDir = "/tmp";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(10000));
jsc.checkpoint(checkpointDir);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jsc, zk, consumerGroupId, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Arrays.asList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.get();
scala.collection.Iterator<Integer> i = values.iterator();
while(i.hasNext()){
newSum += i.next();
}
runningCount.addAndGet(newSum);
System.out.print("Total number of words: " + String.valueOf(runningCount.get()));
return Optional.of(newSum);
}
};
//ERROR is here
JavaPairDStream<String, Integer> runningCounts =
wordCounts.updateStateByKey(UPDATE_FUNCTION);
runningCounts.print();
jsc.start();
jsc.awaitTermination();
}
}
我认为泛型和与Scala的交互可能存在问题?当我进入updateStateByKey时,会看到一个适当的函数声明,因此我不确定在这里缺少什么:
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
最佳答案
已解决问题-事实证明我导入了错误的List和Iterator类(我将原因归咎于Eclipse):
注释掉:
//import scala.collection.immutable.List;
//import scala.collection.Iterator;
添加在:
import java.util.Iterator;
import java.util.List;
轻微更改的更新功能:
Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.get();
Iterator<Integer> i = values.iterator();
while(i.hasNext()){
newSum += i.next();
}
runningCount.addAndGet(newSum);
System.out.print("Total number of words: " + String.valueOf(runningCount.get()));
return Optional.of(newSum);
}
};