UDF中使用StructType列

UDF中使用StructType列

本文介绍了在PySpark UDF中使用StructType列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于正在处理的列之一,我具有以下架构,

I have the following schema for one of columns that I'm processing,

 |-- time_to_resolution_remainingTime: struct (nullable = true)
 |    |-- _links: struct (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |-- completedCycles: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- breached: boolean (nullable = true)
 |    |    |    |-- elapsedTime: struct (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- millis: long (nullable = true)
 |    |    |    |-- goalDuration: struct (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- millis: long (nullable = true)
 |    |    |    |-- remainingTime: struct (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- millis: long (nullable = true)
 |    |    |    |-- startTime: struct (nullable = true)
 |    |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |    |-- jira: string (nullable = true)
 |    |    |    |-- stopTime: struct (nullable = true)
 |    |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |    |-- jira: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- ongoingCycle: struct (nullable = true)
 |    |    |-- breachTime: struct (nullable = true)
 |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |-- jira: string (nullable = true)
 |    |    |-- breached: boolean (nullable = true)
 |    |    |-- elapsedTime: struct (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- millis: long (nullable = true)
 |    |    |-- goalDuration: struct (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- millis: long (nullable = true)
 |    |    |-- paused: boolean (nullable = true)
 |    |    |-- remainingTime: struct (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- millis: long (nullable = true)
 |    |    |-- startTime: struct (nullable = true)
 |    |    |    |-- epochMillis: long (nullable = true)
 |    |    |    |-- friendly: string (nullable = true)
 |    |    |    |-- iso8601: string (nullable = true)
 |    |    |    |-- jira: string (nullable = true)
 |    |    |-- withinCalendarHours: boolean (nullable = true)

我有兴趣根据特定条件获取时间字段(例如completedCycles [x] .elapsedTime,进行中的Cycle.remainingTime)等.我正在使用的UDF是:

I'm interested in getting the time fields (e.g completedCycles[x].elapsedTime, ongoingCycle.remainingTime) etc, based on certain conditions. The UDF I'm using is:

@udf("string")
def extract_time(s, field):
  # Return ongoing cycle field
  if has_column(s, 'ongoingCycle'):
    field = 'ongoingCycle.{}'.format(field)
    return s[field]

  # return last element of completed cycles
  s = s.get(size(s) - 1)
  return s[field]

cl = 'time_to_resolution_remainingTime'
df = df.withColumn(cl, extract_time(cl, lit("elapsedTime.friendly"))).select(cl)
display(df)

这会导致错误:

SparkException: Job aborted due to stage failure: Task 0 in stage 549.0 failed 4 times, most recent failure: Lost task 0.3 in stage 549.0 (TID 1597, 10.155.239.76, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/types.py", line 1514, in __getitem__
    idx = self.__fields__.index(item)
ValueError: 'ongoingCycle.elapsedTime.friendly' is not in list

我在这里显然做错了非常严重的事情,但是我无法解决这个问题.是否可以将UDF中的 s 数据帧转换为python字典并对其进行计算?还是有更好的方法来做到这一点?

I'm obviously doing something terribly wrong here, but I'm unable to resolve this. Is it possible to convert the s data frame in the UDF to a python dictionary and perform calculations on that? or is there a much better way to do this?

样本数据

{
   "_links":{
      "self":"https:///...."
   },
   "completedCycles":[

   ],
   "id":"630",
   "name":"Time to resolution",
   "ongoingCycle":{
      "breachTime":{
         "epochMillis":1605583651354,
         "friendly":"17/Nov/20 3:27 PM +12:00",
         "iso8601":"2020-11-17T15:27:31+1200",
         "jira":"2020-11-17T15:27:31.354+1200"
      },
      "breached":true,
      "elapsedTime":{
         "friendly":"57h 32m",
         "millis":207148646
      },
      "goalDuration":{
         "friendly":"4h",
         "millis":14400000
      },
      "paused":false,
      "remainingTime":{
         "friendly":"-53h 32m",
         "millis":-192748646
      },
      "startTime":{
         "epochMillis":1605511651354,
         "friendly":"16/Nov/20 7:27 PM +12:00",
         "iso8601":"2020-11-16T19:27:31+1200",
         "jira":"2020-11-16T19:27:31.354+1200"
      },
      "withinCalendarHours":false
   }
}

Expected output: -53h 23m

已完成周期,但没有正在进行的周期

With completed cycles but no ongoing cycle

{
   "_links":{
      "self":"https://...."
   },
   "completedCycles":[
      {
         "breached":true,
         "elapsedTime":{
            "friendly":"72h 43m",
            "millis":261818073
         },
         "goalDuration":{
            "friendly":"4h",
            "millis":14400000
         },
         "remainingTime":{
            "friendly":"-68h 43m",
            "millis":-247418073
         },
         "startTime":{
            "epochMillis":1605156449463,
            "friendly":"12/Nov/20 4:47 PM +12:00",
            "iso8601":"2020-11-12T16:47:29+1200",
            "jira":"2020-11-12T16:47:29.463+1200"
         },
         "stopTime":{
            "epochMillis":1606282267536,
            "friendly":"Today 5:31 PM +12:00",
            "iso8601":"2020-11-25T17:31:07+1200",
            "jira":"2020-11-25T17:31:07.536+1200"
         }
      }
   ],
   "id":"630",
   "name":"Time to resolution",
   "ongoingCycle": null
}

Expected output: -68h 43m

我让这段代码正常工作,但不确定这是否是解决此问题的最佳方法,

I got this code to work but not sure if it's the best way to go about solving this,

@udf("string")
def extract_time(s, field):
  if s is None:
    return None

  # Return ongoing cycle field
  if has_column(s, 'ongoingCycle'):
    if s['ongoingCycle'] is not None:
      return s['ongoingCycle']['remainingTime']['friendly']

  # Get the last completed cycles' remaining time
  s_completed = s['completedCycles']
  if len(s_completed) > 0:
    return s_completed[-1]['remainingTime']['friendly']
  return None

推荐答案

使用 when 函数检查与在 UDF 中实现的逻辑相同.

Use when function to check same logic as you have implemented in UDF.

检查以下代码.

df.show()

|_links         |completedCycles                                                                                                                                                                                                                                                         |id |name              |ongoingCycle                                                                                                                                                                                                                                                                            |

|[https:///....]|[]                                                                                                                                                                                                                                                                      |630|Time to resolution|[[1605583651354, 17/Nov/20 3:27 PM +12:00, 2020-11-17T15:27:31+1200, 2020-11-17T15:27:31.354+1200], true, [57h 32m, 207148646], [4h, 14400000], false, [-53h 32m, -192748646], [1605511651354, 16/Nov/20 7:27 PM +12:00, 2020-11-16T19:27:31+1200, 2020-11-16T19:27:31.354+1200], false]|
|[https://....] |[[true, [72h 43m, 261818073], [4h, 14400000], [-68h 43m, -247418073], [1605156449463, 12/Nov/20 4:47 PM +12:00, 2020-11-12T16:47:29+1200, 2020-11-12T16:47:29.463+1200], [1606282267536, Today 5:31 PM +12:00, 2020-11-25T17:31:07+1200, 2020-11-25T17:31:07.536+1200]]]|630|Time to resolution|null                                                                                                                                                                                                                                                                                    |



df.withColumn("time_to_resolution_remainingTime",F.expr("CASE WHEN ongoingCycle IS NOT NULL THEN ongoingCycle.elapsedTime.friendly WHEN size(completedCycles) > 0 THEN completedCycles[size(completedCycles)-1].remainingTime.friendly ELSE null END"))\
.select("time_to_resolution_remainingTime")\
.show(false)

+--------------------------------+
|time_to_resolution_remainingTime|
+--------------------------------+
|57h 32m                         |
|-68h 43m                        |
+--------------------------------+

这篇关于在PySpark UDF中使用StructType列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-14 02:27