Class ProdsTransformer:

    def __init__(self):
      self.products_lookup_hmap = {}
      self.broadcast_products_lookup_map = None

    def create_broadcast_variables(self):
      self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap)

    def create_lookup_maps(self):
    // The code here builds the hashmap that maps Prod_ID to another space.

pt = ProdsTransformer ()
pt.create_broadcast_variables()

pairs = distinct_users_projected.map(lambda x: (x.user_id,
                         pt.broadcast_products_lookup_map.value[x.Prod_ID]))

我收到以下错误:



如何处理广播变量的任何帮助将是巨大的!

最佳答案

通过在map lambda中引用包含广播变量的对象,Spark将尝试序列化整个对象并将其发送给worker。由于对象包含对SparkContext的引用,因此会出现错误。代替这个:

pairs = distinct_users_projected.map(lambda x: (x.user_id, pt.broadcast_products_lookup_map.value[x.Prod_ID]))

试试这个:
bcast = pt.broadcast_products_lookup_map
pairs = distinct_users_projected.map(lambda x: (x.user_id, bcast.value[x.Prod_ID]))

后者避免了对对象的引用(pt),因此Spark只需要发送广播变量。

关于python - Spark : Broadcast variables: It appears that you are attempting to reference SparkContext from a broadcast variable,操作或转换,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31508689/

10-12 16:30