步骤1:我想让microservice中的一个CompletableFuture<String> asyncFuture启动,通过saysupplyAsync运行异步任务。
第2步:然后手动完成同一个未来对象,方法是从不同的微服务B调用asyncFuture.complete(T value),这将由某个异步事件触发。
显然,microservice a和microservice b有不同的jvm。
实际上,在kubernetes中,microservice a和microservice b是同一个microservice在不同pods上运行的不同实例。
在步骤1和步骤2之间,未来的对象将存储在redis中,microservice b可以安全地检索该redis。
在快速搜索之后,我想我会尝试以下几种解决方案:
1>Hazelcast的分布式执行器服务,我可以在调用时将其作为第二个参数传入

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

裁判:
http://docs.hazelcast.org/docs/2.3/manual/html/ch09.html
2>使用来自apache ignite的共享executorservice
参考:https://apacheignite.readme.io/v1.2/docs/executor-service
不确定这两种方法是否可行?我也想知道以前有人处理过这样的事情吗?如果是这样的话,如果你能和我分享你的解决方案,我将不胜感激。

最佳答案

关于ApacheIgnite,有很多选择如何协作节点(微服务)。其中之一是连续查询[1],它允许监听缓存上发生的数据修改。
例如,在服务a上,可以创建continuousquery并在缓存中更改wait for value:

private String waitForValueChanged(IgniteCache<Integer, String> cache, Integer key) throws InterruptedException {
    ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

    qry.setInitialQuery(new ScanQuery<>((k, v) -> k == key));

    final CountDownLatch waitForValueChanged = new CountDownLatch(1);
    final AtomicReference<String> result = new AtomicReference<>();

    CacheEntryUpdatedListener<Integer, String> listener = new CacheEntryUpdatedListener<Integer, String>() {
        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> iterable) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends Integer, ? extends String> entry: iterable) {
                result.set(entry.getValue());
            }

            waitForValueChanged.countDown();
        }
    };

    qry.setLocalListener(listener);

    try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry);) {
        waitForValueChanged.await(60000, TimeUnit.MILLISECONDS);
    }

    return result.get();
}

在服务B上,您只需将值放入缓存中即可“完成未来”:
private void completeFuture(IgniteCache<Integer, String> cache, Integer key, String value) {
    cache.put(key, value);
}

下面是一个示例项目,它展示了连续查询是如何工作的[2]。
[1]https://apacheignite.readme.io/docs#section-continuous-queries
[2]https://github.com/gromtech/ignite-continuous-query-example

10-01 10:16
查看更多