本文介绍了如何在Spark Dataframe中按组/分区重命名列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些按通道名称而不是按通道名称存储在表中的传感器数据(这是为了避免拥有非常宽的表,原因是许多传感器仅在少数设备上使用,这是稀疏的工作)我知道,但我只是数据的用户).像这样:

I have some sensor data that is stored in a table by channel name, rather that sensor name (this is to avoid having very wide tables owing to the fact that many sensors are only used on a few devices - the job for sparse columns, I know, but I am simply a user of the data). Something like this:

from functools import reduce

import numpy as np
import pandas as pd

np.random.seed(0)

data_df = pd.DataFrame({
    'id': ['a']*5 + ['b']*5 + ['c']*5,
    'chan1': range(15),
    'chan2': np.random.uniform(0, 10, size=15),
    'chan3': np.random.uniform(0, 100, size=15)
})

第二张表告诉我们如何根据设备的特定ID将通道名称映射到传感器名称:

There is a second table that tells us how to map channel names to sensor names according to the particular ID of the device:

sensor_channel_df = pd.DataFrame([
    {'id': 'a', 'channel': 'chan1', 'sensor': 'weight'},
    {'id': 'a', 'channel': 'chan2', 'sensor': 'torque'},
    {'id': 'a', 'channel': 'chan3', 'sensor': 'temp'},
    {'id': 'b', 'channel': 'chan1', 'sensor': 'weight'},
    {'id': 'b', 'channel': 'chan2', 'sensor': 'temp'},
    {'id': 'b', 'channel': 'chan3', 'sensor': 'speed'},
    {'id': 'c', 'channel': 'chan1', 'sensor': 'temp'},
    {'id': 'c', 'channel': 'chan2', 'sensor': 'weight'},
    {'id': 'c', 'channel': 'chan3', 'sensor': 'acceleration'},
])

我可以像这样创建一个重命名字典:

I can create a renaming dictionary like so:

channel_rename_dict = sensor_channel_df.groupby('id')\
                                       .apply(lambda grp: dict(zip(grp['channel'], grp['sensor'])))\
                                       .to_dict()

然后再用groupby/apply重命名所有列:

Then rename all the columns with a further groupby/apply:

data_df.groupby('id')\
       .apply(lambda group: group.rename(columns=channel_rename_dict[group.name]))\
       .reset_index(level=0, drop=True)

我们得到这样的结果:

    acceleration id      speed       temp    torque    weight
0            NaN  a        NaN   8.712930  5.488135  0.000000
1            NaN  a        NaN   2.021840  7.151894  1.000000
2            NaN  a        NaN  83.261985  6.027634  2.000000
3            NaN  a        NaN  77.815675  5.448832  3.000000
4            NaN  a        NaN  87.001215  4.236548  4.000000
5            NaN  b  97.861834   6.458941       NaN  5.000000
6            NaN  b  79.915856   4.375872       NaN  6.000000
7            NaN  b  46.147936   8.917730       NaN  7.000000
8            NaN  b  78.052918   9.636628       NaN  8.000000
9            NaN  b  11.827443   3.834415       NaN  9.000000
10     63.992102  c        NaN  10.000000       NaN  7.917250
11     14.335329  c        NaN  11.000000       NaN  5.288949
12     94.466892  c        NaN  12.000000       NaN  5.680446
13     52.184832  c        NaN  13.000000       NaN  9.255966
14     41.466194  c        NaN  14.000000       NaN  0.710361

这一切都很好(尽管我不会惊讶地发现在熊猫中有更好的方法),并且我用它向一些同事展示了此过程的逻辑.

This is all fine (though I would be unsurprised to learn that there is better way of doing it in pandas), and I used it to demonstrate the logic of this process to some colleagues.

但是,对于项目体系结构,已决定我们将使用spark.有没有办法可以在Spark数据帧中实现相同的行为?

However, for the project architecture, it was decided that we would be using spark. Is there a way I can achieve this same behavior in Spark dataframes?

我最初的想法是首先cache完整的data_df,然后用filter分解id上的数据帧.例如,假设data_df现在是一个火花数据帧:

My initial thought was to first cache the full data_df, then break up the dataframe on id with filter. E.g., assuming data_df is now a spark dataframe:

data_df.cache()
unique_ids = data_df.select('id').distinct().rdd.map(lambda row: row[0]).collect()
split_dfs = {id: data_df.filter(data_df['id'] == id) for id in unique_ids}

然后,如果我们像以前一样具有列重命名字典,则可以执行以下操作:

Then, if we have the column rename dictionary as before, we can perform something along the lines of:

dfs_paired_with_rename_tuple_lists = [
    (split_dfs[id], list(channel_rename_dict[id].items()))
    for id in unique_ids
]

new_dfs = [
    reduce(lambda df_i, rename_tuple: df_i.withColumnRenamed(*rename_tuple), rename_tuple_list, df)
    for df, rename_tuple_list in dfs_paired_with_rename_tuple_lists
]

然后在确保火花数据帧具有公共列之后,可以在此火花数据帧列表上用Union()进行reduce

I could then perform a reduce with a Union() on this list of spark Dataframes after ensuring they have common columns.

我的感觉是这将会非常缓慢,并且可能有更好的方法来解决这个问题.

My feeling is that this would be extraordinarily slow, and that there is likely a much better way to go about this.

推荐答案

首先,让我们重新定义映射到channel的分组并返回MapType Column( toolz 很方便,但可以替换为itertools.chain)*:

First, let's redefine mapping to group by channel and return MapType Column (toolz are convenient, but can be replaced with itertools.chain)*:

from toolz import concat, interleave
from pyspark.sql.functions import col, create_map, lit, struct

# Create literal column from id to sensor -> channel map
channel_map = create_map(*concat((lit(k), v) for k, v in sensor_channel_df
    .groupby("id")
    # Create map Column from literal label to channel
    .apply(lambda grp: create_map(*interleave([
        map(lit, grp["sensor"]),
        map(col, grp["channel"])])))
    .to_dict()
    .items()))

接下来,获取传感器列表:

Next, get a list of sensors:

sensors = sorted(sensor_channel_df["sensor"].unique().tolist())

并合并数据列:

df = spark.createDataFrame(data_df)
data_cols = struct(*[c for c in df.columns if c != "id"])

上面定义的组件可以组合:

Components defined above can be combined:

cols = [channel_map[col("id")][sensor].alias(sensor) for sensor in sensors]

df.select(["id"] + cols)
+---+------------------+------------------+------------------+------------------+------------------+
| id|      acceleration|             speed|              temp|            torque|            weight|
+---+------------------+------------------+------------------+------------------+------------------+
|  a|              null|              null| 8.712929970154072|5.4881350392732475|               0.0|
|  a|              null|              null| 2.021839744032572| 7.151893663724195|               1.0|
|  a|              null|              null|  83.2619845547938| 6.027633760716439|               2.0|
|  a|              null|              null| 77.81567509498505| 5.448831829968968|               3.0|
|  a|              null|              null| 87.00121482468191| 4.236547993389047|               4.0|
|  b|              null|  97.8618342232764| 6.458941130666561|              null|               5.0|
|  b|              null| 79.91585642167236| 4.375872112626925|              null|               6.0|
|  b|              null|46.147936225293186| 8.917730007820797|              null|               7.0|
|  b|              null| 78.05291762864555| 9.636627605010293|              null|               8.0|
|  b|              null|11.827442586893323|3.8344151882577773|              null|               9.0|
|  c| 63.99210213275238|              null|              10.0|              null| 7.917250380826646|
|  c| 14.33532874090464|              null|              11.0|              null| 5.288949197529044|
|  c| 94.46689170495839|              null|              12.0|              null| 5.680445610939323|
|  c|52.184832175007166|              null|              13.0|              null|  9.25596638292661|
|  c| 41.46619399905236|              null|              14.0|              null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+

虽然效率较低,但也可以使用udf:

It is also possible, although less efficient, to use udf:

from toolz import unique
from pyspark.sql.types import *
from pyspark.sql.functions import udf

channel_dict = (sensor_channel_df
    .groupby("id")
    .apply(lambda grp: dict(zip(grp["sensor"], grp["channel"])))
    .to_dict())

def remap(d):
    fields = sorted(unique(concat(_.keys() for _ in d.values())))
    schema = StructType([StructField(f, DoubleType()) for f in fields])
    def _(row, id):
        return tuple(float(row[d[id].get(f)]) if d[id].get(f) is not None
                     else None for f in fields)
    return udf(_, schema)

(df
    .withColumn("vals", remap(channel_dict)(data_cols, "id"))
    .select("id", "vals.*"))
+---+------------------+------------------+------------------+------------------+------------------+
| id|      acceleration|             speed|              temp|            torque|            weight|
+---+------------------+------------------+------------------+------------------+------------------+
|  a|              null|              null| 8.712929970154072|5.4881350392732475|               0.0|
|  a|              null|              null| 2.021839744032572| 7.151893663724195|               1.0|
|  a|              null|              null|  83.2619845547938| 6.027633760716439|               2.0|
|  a|              null|              null| 77.81567509498505| 5.448831829968968|               3.0|
|  a|              null|              null| 87.00121482468191| 4.236547993389047|               4.0|
|  b|              null|  97.8618342232764| 6.458941130666561|              null|               5.0|
|  b|              null| 79.91585642167236| 4.375872112626925|              null|               6.0|
|  b|              null|46.147936225293186| 8.917730007820797|              null|               7.0|
|  b|              null| 78.05291762864555| 9.636627605010293|              null|               8.0|
|  b|              null|11.827442586893323|3.8344151882577773|              null|               9.0|
|  c| 63.99210213275238|              null|              10.0|              null| 7.917250380826646|
|  c| 14.33532874090464|              null|              11.0|              null| 5.288949197529044|
|  c| 94.46689170495839|              null|              12.0|              null| 5.680445610939323|
|  c|52.184832175007166|              null|              13.0|              null|  9.25596638292661|
|  c| 41.46619399905236|              null|              14.0|              null|0.7103605819788694|
+---+------------------+------------------+------------------+------------------+------------------+

在Spark 2.3或更高版本中,您可以通过矢量化UDF 应用当前代码.

In Spark 2.3 or later you can apply your current code with vectorized UDF.

*要了解这里发生的情况,我们来看一个由apply处理的单个组:

* To understand what is going on here let's take a look at a single group as one, processed by apply:

grp = sensor_channel_df.groupby("id").get_group("a")

首先,我们将sensor传感器列转换为Spark常量Columns的序列(考虑常量值):

First we convert sensor sensor column to a sequence of Spark literals Columns (think about constant value):

keys = list(map(lit, grp["sensor"]))
keys
Column<b'weight'>, Column<b'torque'>, Column<b'temp'>]

sensor列到Spark Columns的序列(考虑指向数据的指针):

and sensor column to sequence of Spark Columns (think about pointer to the data):

values = list(map(col, grp["channel"]))
values
[Column<b'chan1'>, Column<b'chan2'>, Column<b'chan3'>]

在上下文中进行评估时,前者将产生恒定的输出:

When evaluated in a context the former one will result in constant output:

df_ = df.drop_duplicates(subset=["id"])

df_.select(keys).show()
+------+------+----+
|weight|torque|temp|
+------+------+----+
|weight|torque|temp|
|weight|torque|temp|
|weight|torque|temp|
+------+------+----+

而后一个将重复数据:

df_.select(values).show(3)
+-----+------------------+-----------------+
|chan1|             chan2|            chan3|
+-----+------------------+-----------------+
|   10| 7.917250380826646|63.99210213275238|
|    5| 6.458941130666561| 97.8618342232764|
|    0|5.4881350392732475|8.712929970154072|
+-----+------------------+-----------------+

接下来,我们将这两个元素交织在一起,并合并为MapType列:

Next we interleave these two and combine into a MapType column:

mapping = create_map(*interleave([keys, values]))
mapping
Column<b'map(weight, chan1, torque, chan2, temp, chan3)'>

这使我们能够从度量标准名称映射到数据列(想想Python dict),以及在评估时:

This gives us mapping from a metric name to the data column (think about Python dict), and when evaluated:

df_.select(mapping).show(3, False)
+---------------------------------------------------------------------------+
|map(weight, chan1, torque, chan2, temp, chan3)                             |
+---------------------------------------------------------------------------+
|Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238)|
|Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764)  |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072)|
+---------------------------------------------------------------------------+

最后,外部理解对所有组都重复此操作,因此channel_mapColumn:

Finally external comprehension repeats this for all groups, so channel_map is a Column:

Column<b'map(a, map(weight, chan1, torque, chan2, temp, chan3), b, map(weight, chan1, temp, chan2, speed, chan3), c, map(temp, chan1, weight, chan2, acceleration, chan3))'>

进行评估的

具有以下结构:

which evaluated gives following structure:

df_.select(channel_map.alias("channel_map")).show(3, False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Map(a -> Map(weight -> 10.0, torque -> 7.917250380826646, temp -> 63.99210213275238), b -> Map(weight -> 10.0, temp -> 7.917250380826646, speed -> 63.99210213275238), c -> Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238))|
|Map(a -> Map(weight -> 5.0, torque -> 6.458941130666561, temp -> 97.8618342232764), b -> Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764), c -> Map(temp -> 5.0, weight -> 6.458941130666561, acceleration -> 97.8618342232764))      |
|Map(a -> Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072), b -> Map(weight -> 0.0, temp -> 5.4881350392732475, speed -> 8.712929970154072), c -> Map(temp -> 0.0, weight -> 5.4881350392732475, acceleration -> 8.712929970154072))|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

最后,我们使用id列选择感兴趣的map:

Finally we use id column to select map of interest:

df_.select(channel_map[col("id")].alias("data_mapping")).show(3, False)
+---------------------------------------------------------------------------------+
|data_mapping                                                                     |
+---------------------------------------------------------------------------------+
|Map(temp -> 10.0, weight -> 7.917250380826646, acceleration -> 63.99210213275238)|
|Map(weight -> 5.0, temp -> 6.458941130666561, speed -> 97.8618342232764)         |
|Map(weight -> 0.0, torque -> 5.4881350392732475, temp -> 8.712929970154072)      |
+---------------------------------------------------------------------------------+

和列名称以从map中提取值:

and column names to extract values from the map:

df_.select(channel_map[col("id")]["weight"].alias("weight")).show(3, False)
+-----------------+
|weight           |
+-----------------+
|7.917250380826646|
|5.0              |
|0.0              |
+-----------------+

最终,这只是对包含符号表达式的数据结构的一堆简单转换.

At the end of the day this just a bunch of simple transformation on data structures containing symbolic expressions.

这篇关于如何在Spark Dataframe中按组/分区重命名列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:52