使用旧数据而不是最新数据

使用旧数据而不是最新数据

本文介绍了TPL Dataflow 使用旧数据而不是最新数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 TPL 数据流是为了一次为每个交易品种执行一项任务.前两个 Operationtaking... 消息是正确的,但接下来的消息使用旧数据.换句话说,它使用下面截图中标有绿色的旧数据,而不是最新的数据(蓝色标的).

I'm using TPL Dataflow in order to perform one task per symbol at a time. The first two Operation taking... messages are correct but the next ones are using old data. In other words, it uses the old data marked with green on the screenshot below, instead of the newest data (the blue marked one).

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects.Spot;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Logging;

namespace SubscribeToCandlesEventFixTest
{
    public class BinanceSocketHandler
    {
        private readonly IBinanceClient _client;
        private readonly IBinanceSocketClient _socketClient;

        public BinanceSocketHandler()
        {
            _client = new BinanceClient(new BinanceClientOptions
            {
                ApiCredentials = new ApiCredentials("not required", "not required"),
                AutoTimestamp = true,
                AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            });

            _socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
            {
                ApiCredentials = new ApiCredentials("not required", "not required"),
                AutoReconnect = true,
                ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
                LogVerbosity = LogVerbosity.Debug
#endif
            });
        }

        private Dictionary<string, ActionBlock<IBinanceStreamKlineData>> _ab = new();

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            var symbols = new[] { "TRXUSDT", "BTCUSDT" };
            var interval = KlineInterval.OneMinute;

            foreach (var symbol in symbols)
            {
                _ab[symbol] = new ActionBlock<IBinanceStreamKlineData>(
                    async data =>
                    {
                        Console.WriteLine($"Operation taking 10 seconds to execute... | Symbol: {data.Symbol} | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");

                        await Task.Delay(10000, cancellationToken).ConfigureAwait(false);
                    },
                    new ExecutionDataflowBlockOptions
                    {
                        MaxDegreeOfParallelism = 1
                    });

                await _socketClient.Spot.SubscribeToKlineUpdatesAsync(symbol, interval,
                    async data =>
                    {
                        if (data.Data.Final)
                        {
                            Console.WriteLine(
                                $"[{DateTime.UtcNow}] [{data.Symbol}] New final candle | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");
                        }
                        else
                        {
                            Console.WriteLine(
                                $"[{DateTime.UtcNow}] [{data.Symbol}] Candle update | Timestamp: {data.Data.OpenTime} | Price: {data.Data.Close}");

                            // TODO: Use the most up-to-date value
                            await _ab[symbol].SendAsync(data, cancellationToken).ConfigureAwait(false);
                        }
                    }).ConfigureAwait(false);
            }
        }

        public async Task StopAsync()
        {
            foreach (var symbol in _ab.Keys)
            {
                _ab[symbol].Complete();
                await _ab[symbol].Completion.ConfigureAwait(false);
            }
        }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            var test = new BinanceSocketHandler();
            await test.StartAsync(new CancellationToken()).ConfigureAwait(false);

            Console.ReadLine();
        }
    }
}

推荐答案

TPL Dataflow 将按顺序处理所有项目;这就是它要做的.您可以尝试使用 BroadcastBlock 来执行最新的方法,但是由于该块链接到另一个块,因此您可能最终会得到一个正在处理的块,一个等待处理的块,第三个是实际被覆盖的那个.

TPL Dataflow will process all items in order; that's what it's made to do. You can try to do a most-recent kind of approach by using a BroadcastBlock, but since that block is linked to another block, you'll probably end up with one in process, one waiting to be processed, and the third one being the one actually getting overwritten.

如果您希望它比这更严格(即,一个正在处理中,一个等待也被覆盖),那么我建议使用 Channels.具体来说,使用 BoundedChannelFullMode.DropOldest 的有界通道.

If you want it tighter than that (i.e., one in process and one waiting that is also overwritten), then I'd recommend Channels. Specifically, a bounded channel using BoundedChannelFullMode.DropOldest.

这篇关于TPL Dataflow 使用旧数据而不是最新数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 07:42