本文介绍了Spark - 嵌套的 RDD 操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 RDD 说

   rdd1 = 
id            | created     | destroyed | price   
1            | 1            | 2            | 10        
2            | 1            | 5            | 11       
3            | 2            | 3            | 11        
4            | 3            | 4            | 12        
5            | 3            | 5            | 11       

rdd2 =

[1,2,3,4,5] # lets call these value as timestamps (ts)

rdd2 基本上是使用 range(intial_value, end_value, interval) 生成的.这里的参数可能会有所不同.大小可以与 rdd1 相同或不同.这个想法是使用过滤标准根据 rdd2 的值将记录从 rdd1 提取到 rdd2(来自 rdd1 的记录可以在提取时重复,正如您在输出中看到的那样)

rdd2 is basically generated using range(intial_value, end_value, interval). The params here can vary. The size can be same or different to rdd1. The idea is to fetch records from rdd1 into rdd2 based on the values of rdd2 using a filtering criertia(records from rdd1 can repeat while fetching as you can see in output)

过滤条件 rdd1.created

filtering criteria rdd1.created <= ts < rdd1.destroyed)

预期输出:

ts             | prices  
1              | 10,11       # i.e. for ids 1,2 of rdd1      
2              | 11,11       # ids 2,3
3              | 11,12,11    # ids 2,4,5 
4              | 11,11       # ids 2,5

现在我想根据一些条件过滤 RDD1,该条件使用 RDD2 的键.(如上所述) 并返回连接RDD2的key和RDD1的过滤结果的结果

Now I want to filter RDD1 based on some condition which uses the keys of RDD2. (described above) And returns the results which joins the keys of RDD2 and filtered results of RDD1

所以我这样做:

rdd2.map(lambda x : somefilterfunction(x, rdd1))  

def somefilterfunction(x, rdd1):
    filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
    prices = filtered_rdd1.map(lambda x : x[3])
    res = prices.collect()
    return (x, list(res))

我得到:

异常:您似乎正在尝试广播 RDD 或从动作或转换中引用 RDD.RDD 转换和动作只能由驱动程序调用,不能在其他内部调用转换;例如,rdd1.map(lambda x: rdd2.values.count() *x) 无效,因为值转换和计数操作不能在 rdd1.map 转换内执行.更多信息,请参阅 SPARK-5063.

我尝试使用 groupBy ,但由于这里 rdd1 的元素可以一次又一次地重复,而我理解的分组会将 rdd1 的每个元素在某个特定的插槽中只进行一次.

I tried using groupBy , but since here elements of rdd1 can be repeated again and again as compared to grouping which I understand would club each element of rdd1 in some particular slot just once.

现在唯一的方法是使用普通的 for 循环并进行过滤并最终加入所有内容.

The only way is now to use a normal for loop and do the filtering and join everything in the end.

有什么建议吗?

推荐答案

由于您使用常规范围,因此根本没有理由创建第二个 RDD.您可以简单地为每条记录生成特定范围内的值:

Since you use regular range there is no reason to create a second RDD at all. You can simply generate values in a specific range for each record:

from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile

rdd1 = sc.parallelize([
    (1, 1, 2, 10),        
    (2, 1, 5, 11),       
    (3, 2, 3, 11),        
    (4, 3, 4, 12),        
    (5, 3, 5, 11),  
])


def generate(start, end, step):
    def _generate(id, created, destroyed, price):
        # Smallest ts >= created
        start_for_record = int(ceil((created - start) / step) * step + start)
        rng = takewhile(
            lambda x: created <= x < destroyed,
            xrange(start_for_record, end, step)) # In Python 3.x use range
        for i in rng:
            yield i, price

    return _generate

result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()

结果:

result.mapValues(list).collect()

## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]

这篇关于Spark - 嵌套的 RDD 操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 02:45