我需要将代码从PySpark 1.3移植到2.3(也仅在Python 2.7上),并且在rdd上执行以下映射转换:
import cPickle as pickle
import base64
path = "my_filename"
my_rdd = "rdd with data" # pyspark.rdd.PipelinedRDD()
# saving RDD to a file but first encoding everything
my_rdd.map(lambda line: base64.b64encode(pickle.dumps(line))).saveAsTextFile(path)
# another my_rdd.map doing the opposite of the above, fails with the same error
my_rdd = sc.textFile(path).map(lambda line: pickle.loads(base64.b64decode(line)))
运行此部分时,出现以下错误:
raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
似乎在
map
函数中不再允许这样的操作。有什么建议如何可能重写这一部分吗?更新:
奇怪的是,只需执行以下操作:
my_rdd.saveAsTextFile(path)
也因相同的错误而失败。
最佳答案
归根结底,问题出在进行转换的函数中。在这种情况下,重写比调试容易。
关于python - 在PySpark中使用rdd.map解串和编码字符串,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52333043/