我正在使用分页API。
我使用了Adam Millerchip提供的以下解决方案,效果很好。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.processors.BehaviorProcessor;
public class Pagination {
// Fetch all pages and return the items contained in those pages, using the provided page fetcher function
public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) {
// Processor issues page indices
BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0);
// When an index number is issued, fetch the corresponding page
return processor.concatMap(index -> fetchPage.apply(index).toFlowable())
// when returning the page, update the processor to get the next page (or stop)
.doOnNext(page -> {
if (page.hasNext()) {
processor.onNext(page.getNextPageIndex());
} else {
processor.onComplete();
}
})
.concatMapIterable(Page::getElements);
}
public static void main(String[] args) {
fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println);
}
// A function to fetch a page of our paged data
private static Single<Page<String>> examplePageFetcher(int index) {
return Single.just(pages.get(index));
}
// Create some paged data
private static ArrayList<Page<String>> pages = new ArrayList<>(3);
static {
pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1)));
pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2)));
pages.add(new Page<>(Arrays.asList("five"), Optional.empty()));
}
static class Page<T> {
private List<T> elements;
private Optional<Integer> nextPageIndex;
public Page(List<T> elements, Optional<Integer> nextPageIndex) {
this.elements = elements;
this.nextPageIndex = nextPageIndex;
}
public List<T> getElements() {
return elements;
}
public int getNextPageIndex() {
return nextPageIndex.get();
}
public boolean hasNext() {
return nextPageIndex.isPresent();
}
}
}
但是我有两个问题:
在此实现中,在加载所有页面时,最后处理元素(subscribe(System.out :: println))。如果收集的数据很多,这可能会导致内存问题。我希望在加载它们时立即处理它们(数据库保存)(在.doOnNext(page-> {}中)。我已经能够做到这一点,但是以一种“肮脏的方式”(在数据库中添加数据库保存代码) doOnNext)。我该怎么做?
在“页面”类的实现中,我使用了自定义的Gson解串器。而且我不知道如何处理通用数据。我不得不写“ list.add((MyGenericClass)context.deserialize(anArray.getAsJsonObject(),MyGenericClass.class));”我想要的东西像“ list.add((T)context.deserialize(anArray.getAsJsonObject(),T.class));”。我如何才能保持真正的通用性?
public static JsonDeserializer<Paginator> deserializer = new JsonDeserializer<Paginator>() {
@Override
public Paginator deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
JsonObject jsonObject = json.getAsJsonObject();
Paginator paginator = new Paginator(null, Optional.of(1));
if (jsonObject.get("data") != null && !jsonObject.get("data").isJsonNull()) {
JsonArray array = jsonObject.getAsJsonArray("data");
List<MyGenericClass> list = new ArrayList<>();
for (JsonElement anArray : array) {
list.add((MyGenericClass)context.deserialize(anArray.getAsJsonObject(), MyGenericClass.class));
}
paginator.setData(list);
}
paginator.setCurrent_page(jsonAsInt(jsonObject, "current_page",-1));
paginator.setFrom(jsonAsInt(jsonObject,"from",-1));
paginator.setTo(jsonAsInt(jsonObject,"to",-1));
paginator.setTotal(jsonAsInt(jsonObject,"total",-1));
paginator.setLast_page(jsonAsInt(jsonObject, "last_page", -1));
paginator.setNextPage(); // calculate next page
return paginator;
}
};
最佳答案
要回答您的第一个问题:
在此实现中,在加载所有页面时,最后处理元素(订阅(System.out :: println))。”
这是不正确的。反应式编程的全部目的是避免这种情况。 fetchItems()
返回一个Flowable<T>
,在某些内容订阅之前,它实际上不会获取任何项目。当您订阅某些内容时,每次项目准备就绪时都会通知订阅者。您应该调用subscribe()
并传递一个函数,该函数在每次准备就绪时都会被调用。在我的示例中,我传递了System.out::println
来打印值,但是您可以实现自己的保存到数据库的处理程序。
我希望在加载它们时立即处理它们(数据库保存)(在.doOnNext(page-> {}中)
这混淆了发布者和使用者之间的区别。发布者生产项目-在我的示例中,是一个Flowable<T>
生产类型为T
的项目。消费者消费发布者生产的商品。 doOnNext()
是发布者的功能。它说:“当您发布内容时,也会产生这种副作用”。在我的示例中,副作用是发出要提取的下一个页码。您不应该处理保存在那里的数据库,应该编写自己的回调函数(Consumer)或Subscriber来处理它,并将其提供给订阅调用。