背景:
我正在使用 Spark Streaming 从 Kafka 以逗号分隔的键值对的形式流式传输事件
以下是事件如何流入我的 spark 应用程序的示例。

Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100

输出 :

我想为给定的批处理间隔计算按流中不同键分组的不同指标(平均、计数等),例如
  • Key1, Key2 的平均响应时间(响应时间是每个事件中的键之一)
  • 按 Key1、Key2 计数

  • 我到目前为止的尝试:
    val stream = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
         ssc, kafkaParams, topicsSet)
    
    val pStream = stream.persist()
    
    val events: DStream[String] = pStream.flatMap(_._2.split(","))
    val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1)))
    // pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on.
    

    更新 - 03/04
    键 Key1、Key2... 可能会在传入流中无序到达。

    感谢您的输入/提示。

    最佳答案

    一种可能的解决方案是这样的:

  • 创建一个代表每条记录的案例类,所以我们没有处理元组:
    case class Record(
      key1: String, key2: String, key3: String, key4: String, rt: Double)
    
  • 使用正则表达式来解析记录并删除格式错误的条目:
    import scala.util.matching.Regex
    
    val recordPattern = new Regex(
      "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++
      "responseTime=(0-9+)$"
    )
    
    val records = pStream.map {
      case recordPattern(key1, key2, key3, key4, rt) =>
        Some(Record(key1, key2, key3, key4, rt.toDouble))
      case _ => None
    }.flatMap(x => x) // Drop malformed
    
  • 将数据重塑为键值对:
    val pairs = records.map(r => ((r.key1, r.key2), r.rt))
    
  • 创建一个分区器并使用 StatCounter 聚合统计信息:
    import org.apache.spark.util.StatCounter
    import org.apache.spark.HashPartitioner
    
    val paritioner: HashPartitioner = ???
    
    pairs.combineByKey[StatCounter](
      StatCounter(_), _ merge _,  _ merge _, paritioner
    )
    
  • 提取感兴趣的字段:
    stats.mapValues(s => (s.count, s.mean))
    

  • 您也可以对无序数据尝试这样的操作,尽管我强烈建议在上游修复:
    val kvPattern = "(\\w+)=(\\w+)".r
    val pairs = pStream.map(line => {
      val kvs = kvPattern.findAllMatchIn(line)
        .map(m => (m.group(1), m.group(2))).toMap
    
      // This will discard any malformed lines
      // (lack of key1, key2, lack or invalid format of responseTime)
      Try((
        (kvs("Key1"), kvs("Key2")),
        kvs("responseTime").toDouble
      ))
    
    }).flatMap(_.toOption)
    

    并像以前一样继续。

    关于scala - Spark Streaming - 从按键分组的键值对计算统计信息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35780331/

    10-13 02:36