我有概念上的问题,了解如何在具有不同返回类型的stream \ Observable之间进行组合。
这是我尝试编写的草稿方法:
public void findSeat() {
rx.Observable<GameObject> userObs = context.getUser();
rx.Observable<ActiveGame> gameObs = context.findGame();
rx.Observable.zip(userObs, gameObs, (userObj, game) -> {
User user = ...;
final List<Object> results = new ArrayList<Object>(3);
if(userObj.getStatus() != ErrorCodes.STATUS_OK) {
results.add(-1);
return results;
}
...
...
//***********************************
// THE PROBLEM IS HERE:
// "context.getActiveGameManager().updateGame(game)" returns Observable<GameOBject> and not List<Object> like .zip() expects.
// because of that I cannot do:
// "return context.getActiveGameManager().updateGame(game);"
// How can I do this convertion from Observable<GameObject> to List<Object>
//************************************
context.getActiveGameManager().updateGame(game)
.map((gameObj) -> {
if(gameObj.getStatus() != ErrorCodes.STATUS_OK) {
results.add(-2);
return (Observable<? extends Object>) results;
}
results.add(ErrorCodes.STATUS_OK);
results.add(user);
results.add(gameObj);
return gameObs;
});
return Observable.empty();
}).subscribe((results) -> {
int status = (int) results.get(0);
User user = (User) results.get(1);
ActiveGame game = (ActiveGame) results.get(2);
replyObj.reply(new JsonObject()
.putString("action", CommandActions.FIND_SEAT)
.putNumber("status", status);
.putNumber("game_id", game.getGameId())
);
});
}
流程如下:
1.使用.zip方法发出2可观察到的。
2.对流的返回值执行一些逻辑,如果它导致错误代码->将其放在列表中并返回,则“订阅”可以将错误返回给用户。
3.如果没有错误,请使用flatMap()发出另一个“更新”方法-这就是我遇到的问题。
4.最终,所有结果应在“订阅”中处理,因为这是我承认用户有关其请求的要点。
希望足够清楚...
顺便说一句,我正在尝试学习rxJava,但是很难找到足够的\好的资源-有人可以向我推荐学习它的最佳方法吗?我尝试查看Youtube,Wikipedia,Github上的教程...他们中的大多数人都使用Scala和其他脚本语言教书-在Java中找不到任何东西。
感谢您为尝试理解它所做的一切!!
最佳答案
我认为您已经快到了,但是请尝试将.zip lambda中的代码分解为较小的Rx操作。例如:
rx.Observable
.zip(userObs, gameObs, (userObj, game) -> {
// Combine the user & game objects and pass them to the
// next Rx operation.
return new UserAndActiveGame(userObj, game);
})
.filter(userAndActiveGame -> {
// Remove this filter if you want errors to make it to the subscriber.
return userAndActiveGame.getUserObj().getStatus() == ErrorCodes.STATUS_OK;
})
.flatMap(userAndActiveGame -> {
// Remove this check if you filter errors above.
if (userAndActiveGame.getUserObj().getStatus() != ErrorCodes.STATUS_OK) {
return Observable.just(new FindSeatResult(-1));
}
return context.getActiveGameManager().updateGame(userAndActiveGame.getGame())
.map(gameObj -> {
if (gameObj.getStatus() != ErrorCodes.STATUS_OK) {
return new FindSeatResult(-2);
}
User user =...; // Whatever you are doing to get this in your example code.
return new FindSeatResult(ErrorCodes.STATUS_OK, user, gameObj);
});
})
以下类用于传递中间结果和最终结果:
private class UserAndActiveGame {
private final GameObject userObj;
private final ActiveGame game;
public UserAndActiveGame(GameObject userObj, ActiveGame game) {
this.userObj = userObj;
this.game = game;
}
public GameObject getUserObj() {
return userObj;
}
public ActiveGame getGame() {
return game;
}
}
private class FindSeatResult {
private final int status;
private final User user;
private final ActiveGame game;
public FindSeatResult(int status) {
this(status, null, null);
}
public FindSeatResult(int status, User user, ActiveGame game) {
this.status = status;
this.user = user;
this.game = game;
}
public User getUser() {
return user;
}
public int getStatus() {
return status;
}
public ActiveGame getGame() {
return game;
}
}
然后,您的订户使用打包的结果类似于您已经在做的事情。
.subscribe((results) -> {
// You don't need this if you filter errors above.
if (findSeatResult.getStatus() == -1) {
return;
}
int status = findSeatResult.getStatus();
User user = findSeatResult.getUser();
ActiveGame game = findSeatResult.getGame();
replyObj.reply(new JsonObject()
.putString("action", CommandActions.FIND_SEAT)
.putNumber("status", status);
.putNumber("game_id", game.getGameId())
);
});
通过使用中间结果类和最终结果类,而不是在
List<Object>
中传递结果,您的代码将更能容忍更改,并且编译器将为您键入检查所有内容。关于java - 如何正确组成流\可观察对象,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/26552253/