我正在使用akka-http向http服务发出请求,该服务会发回分块响应。这是相关代码的样子:

val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
        println("-----")
        println(chunk.utf8String)
    }
}

在命令行中产生的输出如下所示:
-----
{"data":
-----
"some text"}

-----
{"data":
-----
"this is a longer
-----
text"}

-----
{"data": "txt"}

-----
...

逻辑数据-在这种情况下,json以行符号\r\n的结尾结束,但是问题是,json并不总是适合单个http响应块,如上例所示。

我的问题是-如何将传入的分块数据连接到完整的json中,以使所得的容器类型仍保持为Source[Out,M1]Flow[In,Out,M2]?我想遵循akka-stream的思想。

更新:还值得一提的是,响应是无限的,聚合必须实时完成

最佳答案

找到了解决方案:

val request: HttpRequest = //build the request
request.flatMap { response =>
    response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
        .filter(_.contains("\r\n"))
        .runForeach { json =>
            println("-----")
            println(json)
        }
}

关于scala - akka-http分块响应串联,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33278853/

10-16 10:15
查看更多