I’ve been working with MongoDB for the past few years and lately the guys behind the C# driver are working on some interesting things, especially around async/await support.
This year they released a complete rewrite of the driver with an async-only API. Now since IEnumerator
doesn’t support asynchronous enumeration (because MoveNext
is synchronous) they used their own enumerator interface called IAsyncCursor
which looks like this:
public interface IAsyncCursor<out TDocument> : IDisposable
{
IEnumerable<TDocument> Current { get; }
Task<bool> MoveNextAsync(
CancellationToken cancellationToken = default(CancellationToken));
}
To enable asynchronous iteration without having to implement it yourself they added a subset of the LINQ operators, but only those that materialize the query (e.g. ForEachAsync
, ToListAsync
, etc.)
In the latest version of the driver (v2.1) they added full “LINQ to MongoDB” support (i.e. IQueryable
) that you can use to create your mongo queries. Just like with LINQ to SQL you can use Where
, Select
and so forth to build a complex expression-based query and have it sent to the server only when you start enumerating the IQueryable
.
The problem arises when you need an expression to be executed on the client-side and not be sent to the server. This is mainly relevant for expressions that can’t be translated into a mongo query. For example the following code comparing two of the queried document’s fields (which is unsupported in MongoDB) will throw an exception saying: “Unsupported filter: ([FirstName] == [LastName]).”
async Task LinqToMongo()
{
IMongoCollection<Hamster> collection = GetCollection();
IMongoQueryable<Hamster> queryable = collection.AsQueryable();
queryable = queryable.Where(_ => _.FirstName == _.LastName); // unsupported filter
Console.WriteLine(await queryable.CountAsync());
}
Normally you would just cast the IQueryable
into an IEnumerable
(with AsEnumerable
) and use LINQ to Objects which also supports deferred execution. However, since IEnumerable
is synchronous doing that defeats the whole purpose of using async/await to begin with. You could also materialize the entire collection into memory and then use client-side filters but that can take too much memory and time.
A friend of mine (Tsach Vayness) suggested finding an existing library with async LINQ to Objects support and plugging it into the MongoDB C# driver. That enables using all the LINQ to Objects operators over MongoDB. There are a few of these libraries and the best, in my opinion, is Reactive Extensions’ Interactive Extensions (Ix-Async on nuget.org).
Hey, it’s me from the future. In C# 8.0 async streams (i.e. IAsyncEnumerable
s) were added to support truly asynchronous enumeration. That makes the desire to treat MongoDB’s IAsynCursor
as a “regular” async stream you can use with async LINQ and await foreach
. So I made a small library allowing just that: MongoAsyncEnumerableAdapter. It’s much more robust and safe than the implementation below and I encourage you to use it instead. The custom implementation deep dive below is still interesting so I also encourage you to keep reading.
All that’s needed is an adapter from mongo’s IAsyncCursorSource
, IAsyncCursor
to Interactive Extensions’ IAsyncEnumerable
, IAsyncEnumerator
(which are already pretty similar) and then you can use all of Ix’s operators on the MongoDB cursors. Here’s the previous example comparing two of the queried document’s fields fixed by moving the filter to the client-side:
async Task LinqToObjects()
{
IMongoCollection<Hamster> collection = GetCollection();
IMongoQueryable<Hamster> queryable = collection.AsQueryable();
IAsyncEnumerable<Hamster> asyncEnumerable = queryable.ToAsyncEnumerable();
asyncEnumerable = asyncEnumerable.Where(_ => _.FirstName == _.LastName);
Console.WriteLine(await asyncEnumerable.Count());
}
Most of the “magic” enabling this happens in AsyncEnumeratorAdapter.MoveNext
. First, you create an IAsyncCursor
out of the IAsyncCursorSource
in an async fashion with ToCursorAsync
(which is possible because MoveNext
returns a Task
). Then you call (and await) MoveNext
on the created _asyncCursor
. If it returned true
then _asyncCursor.Current
contains a batch of items you can enumerate and call _asyncCursor.MoveNext
again when the batch is completed. You repeat that continually until the underlying MoveNext
returns false
meaning there are no more items to enumerate.
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
if (_asyncCursor == null)
{
_asyncCursor = await _asyncCursorSource.ToCursorAsync(cancellationToken);
}
if (_batchEnumerator != null && _batchEnumerator.MoveNext())
{
return true;
}
if (_asyncCursor != null && await _asyncCursor.MoveNextAsync(cancellationToken))
{
_batchEnumerator?.Dispose();
_batchEnumerator = _asyncCursor.Current.GetEnumerator();
return _batchEnumerator.MoveNext();
}
return false;
}
Here is the full code for the adapters:
public static class AsyncCursorSourceExtensions
{
public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IAsyncCursorSource<T> asyncCursorSource) =>
new AsyncEnumerableAdapter<T>(asyncCursorSource);
private class AsyncEnumerableAdapter<T> : IAsyncEnumerable<T>
{
private readonly IAsyncCursorSource<T> _asyncCursorSource;
public AsyncEnumerableAdapter(IAsyncCursorSource<T> asyncCursorSource)
{
_asyncCursorSource = asyncCursorSource;
}
public IAsyncEnumerator<T> GetEnumerator() =>
new AsyncEnumeratorAdapter<T>(_asyncCursorSource);
}
private class AsyncEnumeratorAdapter<T> : IAsyncEnumerator<T>
{
private readonly IAsyncCursorSource<T> _asyncCursorSource;
private IAsyncCursor<T> _asyncCursor;
private IEnumerator<T> _batchEnumerator;
public T Current => _batchEnumerator.Current;
public AsyncEnumeratorAdapter(IAsyncCursorSource<T> asyncCursorSource)
{
_asyncCursorSource = asyncCursorSource;
}
public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
if (_asyncCursor == null)
{
_asyncCursor = await _asyncCursorSource.ToCursorAsync(cancellationToken);
}
if (_batchEnumerator != null &&
_batchEnumerator.MoveNext())
{
return true;
}
if (_asyncCursor != null &&
await _asyncCursor.MoveNextAsync(cancellationToken))
{
_batchEnumerator?.Dispose();
_batchEnumerator = _asyncCursor.Current.GetEnumerator();
return _batchEnumerator.MoveNext();
}
return false;
}
public void Dispose()
{
_asyncCursor?.Dispose();
_asyncCursor = null;
}
}
}