我正在开发一项服务,该服务负责记录发送到我们服务的请求。该服务处于脱机工作状态(正在被触发并且忘记了)。
我们根据一些输入参数(产品ID)将请求保存到其他数据库中。我们不想每次有人提出请求时都将其保存到数据库中-我们希望构建一些要插入的“批处理”并每隔InsertMany
的时间(例如10秒)执行一次N
。我已经开始实现它,现在我在两件事上苦苦挣扎:
我需要使用ConcurrentDictionary
吗?看来我用普通字典也能达到同样的效果
如果对上述问题的回答是“否,在您的情况下ConcurrentDictionary
没有任何好处”-是否有办法将我的代码重写为“正确”使用ConcurrentDictionary
,所以我可以避免使用锁并确保清除批处理是否不会产生“冲突”?
让我粘贴该片段并进一步说明:
// dictionary where key is ProductId and value is a list of items to insert to that product database
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) =>
{
list.Add(details);
return list;
});
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
lock (_productDetails)
{
foreach (var item in _productDetails)
{
// copy curent items and start a task which will process them
SaveProductRequests(item.Key, item.Value.ToList());
// clear curent items
item.Value.Clear();
}
}
}
public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
我主要担心的是,在没有锁定的情况下会发生以下情况:
AddOrUpdate
被触发-并开始处理数据在
SaveRequestsToDatabase
函数中调用item.Value.Clear();
之前,外部服务会触发另一个SaveRequestsToDatabase
函数,该函数使用相同的键执行SaveRecentRequest
-这会将请求添加到集合中AddOrUpdate
正在完成,因此清除了集合-但最初由2添加的对象不在集合中,因此未处理 最佳答案
通常,并发问题是由于没有首先选择正确的数据结构而引起的。
就您而言,您有两个工作流程:
n个生产者,同时并连续地对事件进行排队
1个使用者,在给定时间对事件进行出队和处理
您的问题是,即使没有必要,您也要立即对事件进行分类。将事件作为并发部分中的简单流保留,并仅在使用者部分对它们进行排序,因为那里没有并发性。
ConcurrentQueue<(string token, QuoteDetails details)> _productDetails;
public SaverService(StatelessServiceContext context)
: base(context)
{
_productDetails = new ConcurrentQueue<(string, QuoteDetails)>();
}
// this function will be fired and forgotten by the external service
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData)
{
await Task.Run(() => {
foreach (var token in requestData.ProductAccessTokens)
{
// this function will extract the specific product request ( one request can contain multiple products )
var details = SplitQuoteByProduct(requestData, responseData, token);
_productDetails.Enqueue((token, details));
}
});
}
// this function will be executed by a timer every N amount of time
public void SaveRequestsToDatabase()
{
var products = new List<(string token, QuoteDetails details)>();
while (_productDetails.TryDequeue(out var item))
{
products.Add(item);
}
foreach (var group in products.GroupBy(i => i.token, i => i.Details))
{
SaveProductRequests(group.Key, group);
}
}
public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests)
{
// save received items to database
/// ...
}
关于c# - C#Concurrentdictionary-锁定值,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45102034/