本文介绍了来自Spark Streaming的RestAPI服务调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,从卡夫卡读取消息后,我需要从Spark Streaming调用RESTAPI进行计算,然后将结果保存回HDFS和第三方应用程序.

I have a use case where I need to call RESTAPI from spark streaming after messages are read from Kafka to perform some calculation and save back the result to HDFS and third party application.

我在这里几乎没有疑问:

I have few doubts here:

  • 我们如何直接从Spark流中调用RESTAPI.
  • 如何使用流式批处理时间管理RESTAPI超时.

推荐答案

该代码无法原样编译.但这是给定用例的方法.

This code will not compile as it is. But this the approach for the given usecase.

val conf = new SparkConf().setAppName("App name").setMaster("yarn")
val ssc = new StreamingContext(conf, Seconds(1))

val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

dstream.foreachRDD { rdd =>

  //Write the rdd to HDFS directly
  rdd.saveAsTextFile("hdfs/location/to/save")

  //loop through each parttion in rdd
  rdd.foreachPartition { partitionOfRecords =>

    //1. Create HttpClient object here
    //2.a POST data to API

    //Use it if you want record level control in rdd or partion
    partitionOfRecords.foreach { record =>
      //2.b Post the the date to API
      record.toString
    }
  }
  //Use 2.a or 2.b to POST data as per your req
}

ssc.start()
ssc.awaitTermination()


大多数HttpClient(用于REST调用)都支持请求超时.


Most of the HttpClients (for REST call) supports request timeout.

使用Apache HttpClient的超时示例Http POST呼叫

val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec).

val requestConfig = RequestConfig.custom()
  .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
  .setConnectTimeout(CONNECTION_TIMEOUT_MS)
  .setSocketTimeout(CONNECTION_TIMEOUT_MS)
  .build();

HttpClientBuilder.create().build();

val client: CloseableHttpClient = HttpClientBuilder.create().build();

val url = "https://selfsolve.apple.com/wcResults.do"
val post = new HttpPost(url);

//Set config to post
post.setConfig(requestConfig)

post.setEntity(EntityBuilder.create.setText("some text to post to API").build())

val response: HttpResponse = client.execute(post)

这篇关于来自Spark Streaming的RestAPI服务调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-25 04:54