我很难实现一些看起来应该很容易的东西:
我的目标是使用第二个 RDD/数据帧作为查找表或翻译字典在 RDD/数据帧中进行翻译。我想在多列中进行这些翻译。
解释问题的最简单方法是举例。假设我输入了以下两个 RDD:
Route SourceCityID DestinationCityID
A 1 2
B 1 3
C 2 1
和
CityID CityName
1 London
2 Paris
3 Tokyo
我想要的输出 RDD 是:
Route SourceCity DestinationCity
A London Paris
B London Tokyo
C Paris London
我应该如何去生产它?
这是 SQL 中的一个简单问题,但我不知道 Spark 中 RDD 的明显解决方案。 join、cogroup 等方法似乎不太适合多列 RDD,并且不允许指定要加入的列。
有任何想法吗? SQLContext 是答案吗?
最佳答案
rdd方式:
routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])
print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()
哪个打印:
[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]
和 SQLContext 方式:
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
df_routes = sqlContext.createDataFrame(\
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(\
cities, ["CityID", "CityName"])
temp = df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")
print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()
哪个打印:
[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]
关于apache-spark - 使用另一个 RDD/df 在 Spark RDD 或数据帧中执行查找/转换,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33092723/