我是Spark的新手,对此没有太多想法。我正在开发一个应用程序,其中数据在不同的2 Kafka主题上遍历,并且Spark Streaming从该主题读取数据。它是一个SpringBoot项目,我有3个Spark消费者类。这些SparkStreaming类的工作是使用来自Kafka主题的数据并将其发送到另一个主题。以下是SparkStreaming类的代码-
@Service
public class EnrichEventSparkConsumer {
Collection<String> topics = Arrays.asList("eventTopic");
public void startEnrichEventConsumer(JavaStreamingContext javaStreamingContext) {
Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
JavaInputDStream<ConsumerRecord<String, String>> enrichEventRDD = KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaDStream<String> enrichEventDStream = enrichEventRDD.map((x) -> x.value());
JavaDStream<EnrichEventDataModel> enrichDataModelDStream = enrichEventDStream.map(convertIntoEnrichModel);
enrichDataModelDStream.foreachRDD(rdd1 -> {
saveDataToElasticSearch(rdd1.collect());
});
enrichDataModelDStream.foreachRDD(enrichDataModelRdd -> {
if(enrichDataModelRdd.count() > 0) {
if(executor != null) {
executor.executePolicy(enrichDataModelRdd.collect());
}
}
});
}
static Function convertIntoEnrichModel = new Function<String, EnrichEventDataModel>() {
@Override
public EnrichEventDataModel call(String record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
EnrichEventDataModel csvDataModel = mapper.readValue(record, EnrichEventDataModel.class);
return csvDataModel;
}
};
private void saveDataToElasticSearch(List<EnrichEventDataModel> baseDataModelList) {
for (EnrichEventDataModel baseDataModel : baseDataModelList)
dataModelServiceImpl.save(baseDataModel);
}
}
我正在使用CommandLineRunner调用方法startEnrichEventConsumer()。
public class EnrichEventSparkConsumerRunner implements CommandLineRunner {
@Autowired
JavaStreamingContext javaStreamingContext;
@Autowired
EnrichEventSparkConsumer enrichEventSparkConsumer;
@Override
public void run(String... args) throws Exception {
//start Raw Event Spark Cosnumer.
JobContextImpl jobContext = new JobContextImpl(javaStreamingContext);
//start Enrich Event Spark Consumer.
enrichEventSparkConsumer.startEnrichEventConsumer(jobContext.streamingctx());
}
}
现在,我想将这三个Spark Streaming类提交到集群。我读到某个地方,我必须先创建一个Jar文件,然后才能使用Spark-submit命令,但是我心中有一些疑问-
我是否应该使用这3个Spark Streaming类创建一个不同的项目?
截至目前,我正在使用CommandLineRunner启动SparkStreaming,然后何时提交集群,是否应在这些类中创建main()方法?
请告诉我该怎么做。提前致谢。
最佳答案
无需其他项目。
您应该创建负责创建JavaStreamingContext的入口点/ main。
创建具有依赖性的jar,将依赖性放在一个jar文件中,不要忘记为所有的spark依赖性提供提供的作用域,因为您将使用集群的库。
执行组装的Spark应用程序正在使用spark-submit命令行应用程序,如下所示:
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
对于本地提交
bin/spark-submit \
--class package.Main \
--master local[2] \
path/to/jar argument1 argument2