我正在开发一项服务,该服务负责记录发送到我们服务的请求。该服务处于脱机工作状态(正在被触发并且忘记了)。
我们根据一些输入参数(产品ID)将请求保存到其他数据库中。我们不想每次有人提出请求时都将其保存到数据库中-我们希望构建一些要插入的“批处理”并每隔InsertMany的时间(例如10秒)执行一次N。我已经开始实现它,现在我在两件事上苦苦挣扎:


我需要使用ConcurrentDictionary吗?看来我用普通字典也能达到同样的效果
如果对上述问题的回答是“否,在您的情况下ConcurrentDictionary没有任何好处”-是否有办法将我的代码重写为“正确”使用ConcurrentDictionary,所以我可以避免使用锁并确保清除批处理是否不会产生“冲突”?


让我粘贴该片段并进一步说明:

    // dictionary where key is ProductId and value is a list of items to insert to that product database
    ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
    public SaverService(StatelessServiceContext context)
        : base(context)
    {
        _productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
    }

    // this function will be fired and forgotten by the external service
    public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
    {
        await Task.Run(() => {
            foreach (var token in requestData.ProductAccessTokens)
            {
                // this function will extract the specific product request ( one request can contain multiple products )
                var details = SplitQuoteByProduct(requestData, responseData, token);
                _productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
                {
                    list.Add(details);
                    return list;
                });
            }
        });
    }

    // this function will be executed by a timer every N amount of time
    public void SaveRequestsToDatabase()
    {
        lock (_productDetails)
        {
            foreach (var item in _productDetails)
            {
                // copy curent items and start a task which will process them
                SaveProductRequests(item.Key, item.Value.ToList());
                // clear curent items
                item.Value.Clear();
            }
        }
    }

    public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
    {
        // save received items to database
        /// ...
    }


我主要担心的是,在没有锁定的情况下会发生以下情况:


AddOrUpdate被触发-并开始处理数据
SaveRequestsToDatabase函数中调用item.Value.Clear();之前,外部服务会触发另一个SaveRequestsToDatabase函数,该函数使用相同的键执行SaveRecentRequest-这会将请求添加到集合中
AddOrUpdate正在完成,因此清除了集合-但最初由2添加的对象不在集合中,因此未处理

最佳答案

通常,并发问题是由于没有首先选择正确的数据结构而引起的。

就您而言,您有两个工作流程:


n个生产者,同时并连续地对事件进行排队
1个使用者,在给定时间对事件进行出队和处理


您的问题是,即使没有必要,您也要立即对事件进行分类。将事件作为并发部分中的简单流保留,并仅在使用者部分对它们进行排序,因为那里没有并发性。

ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;

public SaverService(StatelessServiceContext context)
    : base(context)
{
    _productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}

// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
    await Task.Run(() => {
        foreach (var token in requestData.ProductAccessTokens)
        {
            // this function will extract the specific product request ( one request can contain multiple products )
            var details = SplitQuoteByProduct(requestData, responseData, token);
            _productDetails.Enqueue((token, details));
        }
    });
}

// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
    var products = new List<(string token, QuoteDetails details)>();

    while (_productDetails.TryDequeue(out var item))
    {
        products.Add(item);
    }

    foreach (var group in products.GroupBy(i => i.token, i => i.Details))
    {
        SaveProductRequests(group.Key, group);
    }
}

public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
    // save received items to database
    /// ...
}

关于c# - C#Concurrentdictionary-锁定值,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45102034/

10-14 20:58