处理不同的对象流

处理不同的对象流

本文介绍了使用 RxJava 处理不同的对象流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试处理对象流(通过 http JSon 请求).

I'm trying to process a stream of objects (via a http JSon request).

Observble 返回这样的项目:

The Observble returns items like this:

"2015-05-06T13:24:20Z", Foo, Foo, 1, 2, 3, Foo, Foo

第一项是时间戳,然后是要存储在数据库中的 Foo 对象,然后是代表需要从数据库中删除的 Foo 对象的 id,最后是需要更新的 Foo 对象(我将要做一个为他们更新).

The first item is a timestamp, then Foo objects to store in the db, then ids that represent Foo objects that need to be deleted from the db and finally Foo objects that need to be updated (I'm going to do an upsert for them).

我目前的实现是这样的:

My current implementation looks like this:

public void updateFoos(final CallBack callBack) {

    final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();

    fooService.getFoos(lastFooUpdateTimestamp)
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    callBack.onSuccess();
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Object o) {
                    if (o instanceof String) {
                        localStorage.setLastFooUpdateTimestamp((String) o);
                    }

                    if (o instanceof Foo) {
                        databaseManager.save((Foo) o);
                    }
                }
            });
}

有很多问题:

  1. instanceof 检查不是很 RxJavay,有没有更好的方法?
  2. 时间戳始终是第一个字段,无论如何要清楚地表达这一点?
  3. 我还想对数据库插入进行批处理,因此有一个单独的块来处理同时对它们进行批处理的 Foo 对象会很好.
  4. 是否有更好的设计可以按类型发出多个 Observable?但是,我如何订阅多个观察者?

推荐答案

这是一个如何使用 RxJava 来完成的示例:

Here is an example how it could be done by using RxJava:

public class MultikindSource {
    enum ValueType {
        TIMESTAMP,
        NUMBER,
        FOO
    }
    static final class Foo { }
    static Observable<Object> source(String timestamp) {
        return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
            1, 2, 3, new Foo()));
    }
    public static void main(String[] args) {
        Func1<Object, ValueType> keySelector = o -> {
            if (o instanceof String) {
                return ValueType.TIMESTAMP;
            } else
            if (o instanceof Foo) {
                return ValueType.FOO;
            }
            return ValueType.NUMBER;
        };
        AtomicReference<String> lastTimestamp = new AtomicReference<>(
            "2015-05-08T11:38:00.000Z");
        source(lastTimestamp.get())
        .groupBy(keySelector)
        .flatMap(g -> {
            if (g.getKey() == ValueType.TIMESTAMP) {
                g.subscribe(v -> {
                    System.out.println("Updating timestamp to " + v);
                    lastTimestamp.set((String)v);
                });
            } else
            if (g.getKey() == ValueType.FOO) {
                g.buffer(2).subscribe(v ->
                    System.out.println("Batch inserting " + v));
            } else {
                g.subscribe(v ->
                    System.out.println("Got some number: " + v));
            }
            return Observable.just(1);
        }).count().subscribe(v ->
            System.out.println("Got " + v + " kinds of events."));
    }
}

本质上,您按某个枚举对源数据进行分组,然后链接到这些组并订阅它们以执行工作.

Essentially, you group the source data by some enum, then chain onto these groups and subscribe to them to perform the work.

这篇关于使用 RxJava 处理不同的对象流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 04:29