本文介绍了Playframework和Twitter Streaming API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何从Twitter Streaming API中读取响应数据-POST状态/过滤器?我已经建立了连接,并且收到200状态代码,但是我不知道如何阅读推文.我只想打印即将发布的推文.

How to read response data from Twitter Streaming API - POST statuses/filter ?I have established connection and I receive 200 status code, but I don't know how to read tweets. I just want to println tweets as they coming.

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
  if(response.headers.status == 200)
    println(response.body)
}

我找到了这个解决方案

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
  if(response.headers.status == 200){
    response.body
      .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
      .filter(_.contains("\r\n"))
      .map(json => Try(parse(json).extract[Tweet]))
      .runForeach {
        case Success(tweet) =>
          println("-----")
          println(tweet.text)
        case Failure(e) =>
          println("-----")
          println(e.getStackTrace)
      }
  }
}

推荐答案

流式WS请求的响应主体是Akka流 Source 个字节.由于Twitter Api响应通常以换行符分隔,因此您可以使用 Framing.delimiter 将其拆分为字节块,将这些块解析为JSON,然后对它们进行所需的处理.这样的事情应该起作用:

The body of the response for a streaming WS request is an Akka Streams Source of bytes. Since Twitter Api responses are newline delimited (usually) you can use Framing.delimiter to split them up into byte chunks, parse the chunks to JSON, and do what you want with them. Something like this should work:

import akka.stream.scaladsl.Framing
import scala.util.{Success, Try}
import akka.util.ByteString
import play.api.libs.json.{JsSuccess, Json, Reads}
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken}

case class Tweet(id: Long, text: String)
object Tweet {
  implicit val reads: Reads[Tweet] = Json.reads[Tweet]
}

def twitter = Action.async { implicit request =>
  ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016")
      .sign(OAuthCalculator(consumerKey, requestToken))
      .withMethod("POST")
      .stream().flatMap { response =>
    response.body
      // Split up the byte stream into delimited chunks. Note
      // that the chunks are quite big
      .via(Framing.delimiter(ByteString.fromString("\n"), 20000))
      // Parse the chunks into JSON, and then to a Tweet.
      // A better parsing strategy would be to account for all
      // the different possible responses, but here we just
      // collect those that match a Tweet.
      .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet]))
      .collect {
        case Success(JsSuccess(tweet, _)) => tweet.text
      }
      // Print out each chunk
      .runForeach(println).map { _ =>
        Ok("done")
    }
  }
}

注意:要实现流,您需要将隐式Materializer注入到控制器中.

Note: to materialize the stream you'll need to inject an implicit Materializer into your controller.

这篇关于Playframework和Twitter Streaming API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-22 21:25