我想获取IdStream的最新值,并在命令Execute操作中使用它。

public IObservable<Option<Guid>> IdStream { get; }

IdStream = documentStream.OfType<DocumentOpened().Select(x => x.Document.Id.Some())
.Merge(documentStream.OfType<DocumentClosed().Select(x => Option<Guid>.None()));

var saveCommand = ReactiveCommand.Create(() => Save(id), CanExecute);


我曾尝试使用答案https://stackoverflow.com/a/31168822/7779560并得到如下信息:

var saveCommand = ReactiveCommand.Create(() => { }, CanExecute);
saveCommand.WithLatestFrom(IdStream, (_, id) => id)
            .Subscribe(id => Save(id));


它确实有效,但是在这种情况下,我无法使用IsExecuting和ThrownExceptions命令的功能(它们仅对在命令创建过程中作为Execute传递的空操作起作用)。

UPD:

执行顺序:


IdStream创建
命令创建
documentStream处理DocumentOpened事件(获取一些ID值-我检查了它)
saveCommand执行


我该如何实现?

UPD 2:我还需要等待命令主体中的方法(例如SaveAsync)。

最佳答案

这对您有用吗?重播将保留发布的最新值。执行该命令时,它将获取最新值,之后Take(1)取消订阅,因为您只需要一个值,然后将其推送到Save;

    [Test]
    public void ExecuteUsingLastProducedValue()
    {
        Subject<string> producer = new Subject<string>();
        IObservable<bool> CanExecute = Observable.Return(true);
        IObservable<string> IdStream = producer;
        string SaveCalledWith = String.Empty;

        Func<string, Task> SaveAsync = (id) =>
        {
            SaveCalledWith = id;
            return Task.Delay(0);
        };

        // IdStream creating
         var connectedIdStream =
            IdStream
            .Replay(1);

        connectedIdStream
            .Connect();

        //Command creating
        var command = ReactiveCommand.CreateFromObservable(() =>
        {
            return connectedIdStream
                .Take(1)
                .Do(async id =>
                {
                    await SaveAsync(id);
                });
        }
        , CanExecute);


        //Alternate way
        //Command creating
        //var command = ReactiveCommand.CreateFromObservable(() =>
        //{
        //    return connectedIdStream
        //        .Take(1)
        //        .SelectMany(id => SaveAsync(id).ToObservable());
        //}
        //, CanExecute);


        //documentStream processes DocumentOpened event (get some Id value - I checked it)
        producer.OnNext("something random");
        producer.OnNext("working");

        //At this point Save still hasen't been called so just verifiyng it's still empty
        Assert.AreEqual(String.Empty, SaveCalledWith);

        //trigger execution of command
        command.Execute(Unit.Default).Subscribe();

        //Verified Saved Called With is called
        Assert.AreEqual(SaveCalledWith, "working");
    }

关于c# - RX:如何将最新值从可观察值传递到ReactiveCommand,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43072060/

10-11 14:21