问题描述
我有一个非常简单的数据框:
I have a very simple dataframe:
+--+------+
|Id|Amount|
+--+------+
|0 |3.47 |
|1 |-3.47 |
|2 |3.47 |
|3 |3.47 |
|4 |2.02 |
|5 |-2.01 |
|6 |-2.01 |
|7 |7.65 |
|8 |7.65 |
+--+------+
我想匹配在给定阈值(假设为 0.5)的情况下相互抵消的行.所以在这种情况下,匹配第0行和第1行、第4行和第5行,返回第2行和第3行.这个有几种解决方案,返回第0行和第2行也可以.
I'd like to match lines that cancel each other given a threshold value (let's say 0.5).So in this case, match line 0 and 1, 4 and 5, and return line 2 and 3. There are several solutions to this, returning lines 0 and 2 is also fine.
一般的想法是,它们应该 2 x 2 匹配,剩下的返回.如果每一行都有匹配项,它应该不返回任何内容,并且应该返回所有不能以这种方式配对的行.
The general idea is that they should be matched 2 by 2 and the leftovers returned. It should return nothing if every line has a match and should return all lines that couldn't be paired that way.
知道怎么做吗?
预期结果:
+--+------+
|Id|Amount|
+--+------+
|0 |3.47 |
|2 |3.47 |
|6 |-2.01 |
|7 |7.65 |
|8 |7.65 |
+--+------+
我一直在考虑使用 UserDefinedAggregateFunction
,但我不确定它是否足够.特别是因为我认为每组行只能返回一个值.
I've been thinking about using an UserDefinedAggregateFunction
, but I'm not sure whether or not it's enough. Especially because I think it can only return one value per group of lines.
推荐答案
我选择了 UDF.用 Java 编写 UDF 非常复杂...
I went with an UDF. Writing UDFs in Java is seriously overcomplicated...
如果有人能找到一种简化这种混乱的方法,请发帖或发表评论.
private UDF1<WrappedArray<Row>, Row[]> matchData() {
return (data) -> {
List<Data> dataList = JavaConversions.seqAsJavaList(data).stream().map(Data::fromRow).collect(Collectors.toList());
Set<Data> matched = new HashSet<>();
for (Data element : dataList) {
if (matched.contains(element)) continue;
dataList.stream().filter(e -> !matched.contains(e) && e != element)
.filter(e -> Math.abs(e.getAmount() + element.getAmount()) < THRESHOLD
&& Math.signum(e.getAmount()) != Math.signum(element.getAmount()))
.min(Comparator.comparingDouble(e -> Math.abs(e.getAmount() + element.getAmount())))
.ifPresent(e -> {
matched.add(e);
matched.add(element);
});
}
if (matched.size() != dataList.size()) {
return dataList.stream().map(Data::toRow).toArray(Row[]::new);
} else {
return new Row[0];
}
};
}
使用 Data 类(使用 Lombok):
With the Data class (using Lombok):
@AllArgsConstructor
@EqualsAndHashCode
@Data
public final class Data {
private String name;
private Double amount;
public static Data fromRow(Row r) {
return new Data(
r.getString(r.fieldIndex("name")),
r.getDouble(r.fieldIndex("amount")));
}
public Row toRow() {
return RowFactory.create(name, amount);
}
}
如果它不起作用,我将返回整个集合,这实际上是我所需要的.
I'm returning the whole set in case it didn't work, this is actually what I need in my case.
这篇关于如何将双行与 Spark 中的阈值匹配?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!