我已经实现了功能性unzip()
操作,如下所示:
public static <T, U, V> Tuple2<Stream<U>, Stream<V>> unzip(
Stream<T> stream,
Function<T, Tuple2<U, V>> unzipper) {
return stream.map(unzipper)
.reduce(new Tuple2<>(Stream.<U>empty(), Stream.<V>empty()),
(unzipped, tuple) -> new Tuple2<>(
Stream.concat(unzipped.$1(), Stream.of(tuple.$1())),
Stream.concat(unzipped.$2(), Stream.of(tuple.$2()))),
(unzipped1, unzipped2) -> new Tuple2<>(
Stream.concat(unzipped1.$1(), unzipped2.$1()),
Stream.concat(unzipped1.$2(), unzipped2.$2())));
}
给定输入流中没有太多元素的情况,这可以很好地工作。这是因为访问深度串联流的元素可能会导致StackOverflowException
。根据docs of Stream.concat()
:实施说明:
从重复级联构造流时请谨慎。访问深度串联流的元素会导致深度调用链,甚至
StackOverflowException
。对于少数元素,我的
unzip
实现有效。给定一个Person
类:class Person {
public final String name;
public final int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
}
如果我有很多人:Stream<Person> people = Stream.of(
new Person("Joe", 52),
new Person("Alan", 34),
new Person("Peter", 42));
我可以通过以下方式使用unzip()
实现:Tuple2<Stream<String>, Stream<Integer>> result = StreamUtils.unzip(people,
person -> new Tuple2<>(person.name, person.age));
List<String> names = result.$1()
.collect(Collectors.toList()); // ["Joe", "Alan", "Peter"]
List<Integer> ages = result.$2()
.collect(Collectors.toList()); // [52, 34, 42]
哪个是正确的。所以我的问题是:是否可以使用
unzip()
处理许多元素(可能无限)? 注意:为了完整起见,这是我不变的
Tuple2
类:public final class Tuple2<A, B> {
private final A $1;
private final B $2;
public Tuple2(A $1, B $2) {
this.$1 = $1;
this.$2 = $2;
}
public A $1() {
return $1;
}
public B $2() {
return $2;
}
}
最佳答案
您的解决方案不仅易于潜在的StackOverflowError
,而且即使没有StackOverflowError
的风险也无法处理潜在的无限流。关键是,您正在构造一个流,但是它是连接的单个元素流的流,一个用于源流的每个元素。换句话说,在返回unzip
方法后,您将拥有一个完全物化的数据结构,该结构将比收集ArrayList
或简单的toArray()
操作的结果消耗更多的内存。
但是,当您以后要执行collect
时,支持潜在的无限流的想法无论如何都没有意义,因为收集意味着对所有元素的处理而不会发生短路。
一旦您放弃了支持无限流的想法,而将精力集中在收集操作上,就会有一个更简单的解决方案。从this solution中获取代码,用Pair
替换Tuple2
并将累加器逻辑从“有条件”更改为“两者”,我们得到:
public static <T, A1, A2, R1, R2> Collector<T, ?, Tuple2<R1,R2>> both(
Collector<T, A1, R1> first, Collector<T, A2, R2> second) {
Supplier<A1> s1=first.supplier();
Supplier<A2> s2=second.supplier();
BiConsumer<A1, T> a1=first.accumulator();
BiConsumer<A2, T> a2=second.accumulator();
BinaryOperator<A1> c1=first.combiner();
BinaryOperator<A2> c2=second.combiner();
Function<A1,R1> f1=first.finisher();
Function<A2,R2> f2=second.finisher();
return Collector.of(
()->new Tuple2<>(s1.get(), s2.get()),
(p,t)->{ a1.accept(p.$1(), t); a2.accept(p.$2(), t); },
(p1,p2)->new Tuple2<>(c1.apply(p1.$1(), p2.$1()), c2.apply(p1.$2(), p2.$2())),
p -> new Tuple2<>(f1.apply(p.$1()), f2.apply(p.$2())));
}
这可以像
Tuple2<List<String>, List<Integer>> namesAndAges=
Stream.of(new Person("Joe", 52), new Person("Alan", 34), new Person("Peter", 42))
.collect(both(
Collectors.mapping(p->p.name, Collectors.toList()),
Collectors.mapping(p->p.age, Collectors.toList())));
List<String> names = namesAndAges.$1(); // ["Joe", "Alan", "Peter"]
List<Integer> ages = namesAndAges.$2(); // [52, 34, 42]
链接答案的说明也适用于此。在收集器中,您几乎可以做所有可以表达为流操作的事情。
如果您想通过函数(从stream元素映射到
Tuple2
)更接近原始代码,则可以像上面这样包装解决方案public static <T, T1, T2, A1, A2, R1, R2> Collector<T, ?, Tuple2<R1,R2>> both(
Function<? super T, ? extends Tuple2<? extends T1, ? extends T2>> f,
Collector<T1, A1, R1> first, Collector<T2, A2, R2> second) {
return Collectors.mapping(f, both(
Collectors.mapping(Tuple2::$1, first),
Collectors.mapping(Tuple2::$2, second)));
}
并像这样使用
Tuple2<List<String>, List<Integer>> namesAndAges=
Stream.of(new Person("Joe", 52), new Person("Alan", 34), new Person("Peter", 42))
.collect(both(
p -> new Tuple2<>(p.name, p.age), Collectors.toList(), Collectors.toList()));
您可以识别函数
p -> new Tuple2<>(p.name, p.age)
,就像您传递给unzip
方法的函数一样。上面的解决方案是懒惰的,但是需要将“解压缩”之后的操作表示为收集器。如果您想要Stream
而不是接受解决方案的非延迟性质,就像您最初的unzip
操作一样,但是希望它比concat
更高效,则可以使用:public static <T, U, V> Tuple2<Stream<U>, Stream<V>> unzip(
Stream<T> stream, Function<T, Tuple2<U, V>> unzipper) {
return stream.map(unzipper)
.collect(Collector.of(()->new Tuple2<>(Stream.<U>builder(), Stream.<V>builder()),
(unzipped, tuple) -> {
unzipped.$1().accept(tuple.$1()); unzipped.$2().accept(tuple.$2());
},
(unzipped1, unzipped2) -> {
unzipped2.$1().build().forEachOrdered(unzipped1.$1());
unzipped2.$2().build().forEachOrdered(unzipped1.$2());
return unzipped1;
},
tuple -> new Tuple2<>(tuple.$1().build(), tuple.$2().build())
));
}
这可以替代基于
concat
的解决方案。它还将完全存储流元素,但是将使用Stream.Builder
,该代码针对在一次增量填充和消费一次的情况下进行了优化(在Stream
操作中)。这比收集到ArrayList
(至少在参考实现中)更为有效,因为它使用了“旋转缓冲区”,在增加容量时不需要复制。对于大小可能未知的流,这是最有效的解决方案(对于大小已知的流,toArray()
的性能会更好)。