我正在使用RXJS,并提出了分页数据游标的实现。在没有花太多时间进行反应性函数编程时,我想知道我的实现是否本着打算使用该库的精神。
我想要一个可以从端点加载页面的类。如果您订阅它,您将收到被查询的最后一页。第一次订阅会导致自动查询第一页。调用“ getPage”应该为所有订阅触发一个onNext。多个订阅不应引起多个请求。
我写了一个满足此要求的基本示例,并对我的思考过程进行了评论:https://jsfiddle.net/gfmn708g/1/
我的问题是:
这是RXJS的精神所在吗?对我来说,同时使用ReplySubject和shareReplay感觉很不对劲,但是我发现没有其他方法可以实现我想要的行为。我读到,使用Subjects是“不好的”,违反了范式的原则。
在所有机上请求完成并处理之后,第63行是否会取消订阅/完成所有item $订阅(第82和89行)?
处理错误的正确方法是什么,以便将错误传播到订户,但它们不会破坏流并阻止我发送更多请求?
(这是根据SO的问题指南列出的代码)
const logDiv = $("#log");
function log(message, cls) {
logDiv.append($("<li>").text(message).addClass(cls));
}
/* interface IRequest {
url: string;
page: number:
refresh?: boolean
}
interface IEndpoint {
get(request: IRequest): [];
} */
// Class that represents a cursor into paginated data
function PagedData(endpoint, url) {
this._endpoint = endpoint;
this._url = url;
// Our request queue is an observable of structurs of type IRequest
// We use a reply subject so that the last URL requested is in the stream when the first subscriber subscribes.
this._requestQueue = new Rx.ReplaySubject(1);
// This is our data observable, subscribe to it to
// A) receive the last page that this cursor has produced
// B) receive all future pages
this.items$ = this._requestQueue
// Don't re-query unless the "refresh" boolean is true
.distinctUntilChanged(req => req, (left, right) => right.refresh ? false : left.page == right.page)
// Make the request...
.flatMapLatest(request => Rx.Observable.of(request).zip(this._endpoint.get(request)))
// Wrap data returned with an envelope with data such as which page was requested
.map(data => {
const request = data[0];
const response = data[1];
return {
page: request.page,
url: request.url,
items: response
};
})
// Replay last page worth of data on each subscription
.shareReplay(1);
// Queue up the first page to be retrieved on first subscription
this.getPage(1);
}
PagedData.prototype.getPage = function(page, refresh) {
refresh = refresh || false;
// Fire off the workflow
this._requestQueue.onNext({
url: this._url,
refresh: refresh,
page: page
});
}
PagedData.prototype.dispose = function() {
// Question: this should unsubscribe ALL of the subscriptions to this.items$, right?
this._requestQueue.completed();
}
// -----------------
// EXAMPLE USAGE
var dummyEndpoint = {
get(request) {
log(`GET: ${request.url} at page ${request.page}`, "service");
return Rx.Observable.range(request.page * 10, 10)
.delay(1000)
.map(i => ({id: i, title: `Track ${i}`}))
.toArray();
}
};
const tracks = new PagedData(dummyEndpoint, "/api/tracks");
// This results in getting the first page
tracks.items$.subscribe(data => {
log(`On page ${data.page}, ${data.items.map(i => i.title).join(",")}`, "first")
});
// Wait one second after getting the first page
window.setTimeout(() => {
// Subscribe again, we will receive the first page with no re-query
tracks.items$.subscribe(data => log(`Got page ${data.page} after delay`, "second"));
// Get the second page
tracks.getPage(2);
// Wait another second after getting the second page
window.setTimeout(() => {
log("Getting second page (without refresh)");
// This shouldn't result in anything, since "refresh" is false/undefined
tracks.getPage(2);
// Wait one more second...
window.setTimeout(() => {
log("Getting second page (with refresh)");
// This should result in getting the second page, refresh is true
tracks.getPage(2, true);
// Should get rid of all subscriptions after the last in-flight request?
tracks.dispose();
}, 1000);
}, 2000);
}, 2000);
最佳答案
Subjects
并不是很坏,因为它们倾向于成为新用户的拐杖,因此他们不必实际使用该范式(一个Observable和Observer以一个价格出售,我怎么负担不起不使用它?)。
认真地说,尽管我认为您的直觉是正确的,但使用ReplaySubject
+ shareReplay
是一种代码味道。可能有帮助的是尝试考虑您的数据实际来自何处。在大多数情况下,功能本身并不存在,它们实际上是由其他东西触发的。
您需要找到其他内容,然后继续进行操作,直到找到根源为止。在大多数情况下,此源将是您可以使用fromEvent
或fromPromise
打包的用户或网络事件。一旦有了该起点,只需要将该源连接到您要执行的操作即可。
因此,我将重构将端点调用为Observable
扩展名的业务逻辑:
Rx.Observable.prototype.paginate = function(endpoint, url) {
return this
.startWith({
page: 1,
refresh: false
})
.map(req =>
({page: req.page,url: url,refresh: req.refresh}))
.distinctUntilChanged(req => req,
(left, right) => right.refresh ? false :
left.page == right.page)
.flatMapLatest(request => endpoint.get(request),
(request, response) => ({
page: request.page,
url: request.url,
items: response
}))
.shareReplay(1)
}
上面的内容将等待第一个订阅,然后在该订阅发生时自动发出第一个请求。之后,每个后续订户将从分页接收最新值。
从那里开始,这将取决于您的来源,但我想您可能会做类似的事情:
var trigger = Rx.Observable.fromEvent($nextPageButton, 'click')
.scan((current, _) => current + 1, 1)
.paginate(endpoint, url);
trigger.subscribe(/*Handle result*/);
在这种情况下,您可能要等到需要卸载页面后才取消订阅,而是只在加载时挂接管道,其余的将由它来处理。一直订阅
trigger
总是会为您提供最新数据。我使用重构现有代码添加了一个工作示例。
const logDiv = $("#log");
function log(message, cls) {
logDiv.append($("<li>").text(message).addClass(cls));
}
/* interface IRequest {
url: string;
page: number:
refresh?: boolean
}
interface IEndpoint {
get(request: IRequest): [];
} */
Rx.Observable.prototype.paginate = function(endpoint, url) {
return this
.startWith({
page: 1,
refresh: false
})
.map(req =>
({page: req.page,url: url,refresh: req.refresh}))
.distinctUntilChanged(req => req,
(left, right) => right.refresh ? false :
left.page == right.page)
.flatMapLatest(request => endpoint.get(request),
(request, response) => ({
page: request.page,
url: request.url,
items: response
}))
.shareReplay(1)
}
// -----------------
// EXAMPLE USAGE
var dummyEndpoint = {
get(request) {
log(`GET: ${request.url} at page ${request.page} with${request.refresh ? "" : "out"} refresh`, "service");
return Rx.Observable.range(request.page * 10, 10)
.delay(1000)
.map(i => ({
id: i,
title: `Track ${i}`
}))
.toArray();
}
};
var trigger = Rx.Observable.concat(
Rx.Observable.just({
page: 2
}).delay(2000),
Rx.Observable.just({
page: 2
}).delay(2000),
Rx.Observable.just({
page: 2,
refresh: true
}).delay(1000)
);
const tracks = trigger.paginate(dummyEndpoint, "/api/tracks");
tracks.delaySubscription(2000).subscribe(data => log(`Got page ${data.page} after delay`, "second"));
// This results in getting the first page
tracks.subscribe(data => {
log(`On page ${data.page}, ${data.items.map(i => i.title).join(",")}`, "first")
});
#log li.first {
color: green;
}
#log li.second {
color: blue;
}
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>
<ol id="log">
</ol>