将IAsyncEnumerable与Dapper一起使用

将IAsyncEnumerable与Dapper一起使用

本文介绍了将IAsyncEnumerable与Dapper一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们最近将使用 Dapper 的ASP.NET Core API迁移到了.NET Core 3.1.迁移后,我们觉得有机会将C#8 中的最新 IAsyncEnumerable 功能用于我们的一个端点.

We have recently migrated our ASP.NET Core API which uses Dapper to .NET Core 3.1. After the migration, we felt there was an opportunity to use the latest IAsyncEnumerable feature from C# 8 for one of our endpoints.

这是更改之前的伪代码:

Here is the pseudocode before the changes:

public async Task<IEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
       return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
}

private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
        foreach (var item in items)
        {
            yield return item;
        }
    }
}

IAsyncEnumerable 代码更改后:

// Import Nuget pacakage: System.Linq.Async

public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
        return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
}

private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
       await foreach (var item in items.ToAsyncEnumerable())
       {
           yield return item;
       }
    }
 }

上述方法是使用 ToAsyncEnumerable 的灵感来自这篇文章,但是我不确定100%是否在正确的位置/上下文中使用它.

The above approach is to use ToAsyncEnumerable is loosely inspired from this post, but I'm not 100% sure if I'm using it in the right place/ context.

问题:

  • dapper库仅返回 IEnumerable ,但我们可以使用 ToAsyncEnumerable 将其转换为 IAsyncEnumerable 以便用于 async stream ?
  • The dapper library only returns IEnumerable but can we use ToAsyncEnumerable to convert it into IAsyncEnumerable for async stream like above?

注意:此问题与,但我认为这并不能回答我的问题.

Note: This question looks similar to What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)? but I do not think that answers my question.

推荐答案

更新:第一次编写此答案时,我并不知道异步迭代器.感谢Theodor Zoulias指出了这一点.有鉴于此,一种更简单的方法是可能的:

Update: I wasn't aware of async iterators when I first wrote this answer. Thanks to Theodor Zoulias for pointing it out. In light of that, a much simpler approach is possible:

using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();

while (await reader.ReadAsync()) {
    yield return rowParser(reader);
}

原始答案:

这是我写的 IAsyncEnumerable 包装器,它可以帮助那些希望使用async/await流式传输非缓冲数据并且还希望使用Dapper的类型映射功能的人:

Here's an IAsyncEnumerable wrapper I wrote that may help those who want to stream unbuffered data using async/await and also want the power of Dapper's type mapping:

public class ReaderParser<T> : IAsyncEnumerable<T> {
    public ReaderParser(SqlDataReader reader) {
        Reader = reader;
    }
    private SqlDataReader Reader { get; }
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
        return new ReaderParserEnumerator<T>(Reader);
    }
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
    public ReaderParserEnumerator(SqlDataReader reader) {
        Reader = reader;
        RowParser = reader.GetRowParser<T>();
    }
    public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
    private SqlDataReader Reader { get; }
    private Func<IDataReader, T> RowParser { get; }
    public async ValueTask DisposeAsync() {
        await Reader.DisposeAsync();
    }
    public async ValueTask<bool> MoveNextAsync() {
        return await Reader.ReadAsync();
    }
}

用法:

var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);

然后,包 System.Linq.Async 基本上添加了您知道并喜欢的所有漂亮的 IEnumerable 扩展,例如在我的用法中:

And then, package System.Linq.Async adds basically all the nice IEnumerable extensions you know and love, e.g. in my usage:

var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();

这篇关于将IAsyncEnumerable与Dapper一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!