Efficiency driven developer. Ex-Microsoft (ADX, AATP, ATA). Ex-Aorato (Acquired by Microsoft).

profile for i3arnon on Stack Overflow
© Bar Arnon
  • December 16, 2015 | MongoDB, Async/Await, Rx, Ix, AsyncEnumerable | comments | Edit
  • Async LINQ To Objects Over MongoDB
  • I’ve been working with MongoDB for the past few years and lately the guys behind the C# driver are working on some interesting things, especially around async/await support.

    This year they released a complete rewrite of the driver with an async-only API. Now since IEnumerator doesn’t support asynchronous enumeration (because MoveNext is synchronous) they used their own enumerator interface called IAsyncCursor which looks like this:

    public interface IAsyncCursor<out TDocument> : IDisposable
    {
        IEnumerable<TDocument> Current { get; }
        Task<bool> MoveNextAsync(
            CancellationToken cancellationToken = default(CancellationToken));
    }
    

    To enable asynchronous iteration without having to implement it yourself they added a subset of the LINQ operators, but only those that materialize the query (e.g. ForEachAsync, ToListAsync, etc.)

    In the latest version of the driver (v2.1) they added full “LINQ to MongoDB” support (i.e. IQueryable) that you can use to create your mongo queries. Just like with LINQ to SQL you can use Where, Select and so forth to build a complex expression-based query and have it sent to the server only when you start enumerating the IQueryable.

    The problem arises when you need an expression to be executed on the client-side and not be sent to the server. This is mainly relevant for expressions that can’t be translated into a mongo query. For example the following code comparing two of the queried document’s fields (which is unsupported in MongoDB) will throw an exception saying: “Unsupported filter: ([FirstName] == [LastName]).”

    async Task LinqToMongo()
    {
        IMongoCollection<Hamster> collection = GetCollection();
        IMongoQueryable<Hamster> queryable = collection.AsQueryable();
        queryable = queryable.Where(_ => _.FirstName == _.LastName); // unsupported filter
        Console.WriteLine(await queryable.CountAsync());
    }
    

    Normally you would just cast the IQueryable into an IEnumerable (with AsEnumerable) and use LINQ to Objects which also supports deferred execution. However, since IEnumerable is synchronous doing that defeats the whole purpose of using async/await to begin with. You could also materialize the entire collection into memory and then use client-side filters but that can take too much memory and time.

    A friend of mine (Tsach Vayness) suggested finding an existing library with async LINQ to Objects support and plugging it into the MongoDB C# driver. That enables using all the LINQ to Objects operators over MongoDB. There are a few of these libraries and the best, in my opinion, is Reactive Extensions’ Interactive Extensions (Ix-Async on nuget.org).


    Edit (2021-12):

    Hey, it’s me from the future. In C# 8.0 async streams (i.e. IAsyncEnumerables) were added to support truly asynchronous enumeration. That makes the desire to treat MongoDB’s IAsynCursor as a “regular” async stream you can use with async LINQ and await foreach. So I made a small library allowing just that: MongoAsyncEnumerableAdapter. It’s much more robust and safe than the implementation below and I encourage you to use it instead. The custom implementation deep dive below is still interesting so I also encourage you to keep reading.


    All that’s needed is an adapter from mongo’s IAsyncCursorSource, IAsyncCursor to Interactive Extensions’ IAsyncEnumerable, IAsyncEnumerator (which are already pretty similar) and then you can use all of Ix’s operators on the MongoDB cursors. Here’s the previous example comparing two of the queried document’s fields fixed by moving the filter to the client-side:

    async Task LinqToObjects()
    {
        IMongoCollection<Hamster> collection = GetCollection();
        IMongoQueryable<Hamster> queryable = collection.AsQueryable();
        IAsyncEnumerable<Hamster> asyncEnumerable = queryable.ToAsyncEnumerable();
        asyncEnumerable = asyncEnumerable.Where(_ => _.FirstName == _.LastName);
        Console.WriteLine(await asyncEnumerable.Count());
    }
    

    Most of the “magic” enabling this happens in AsyncEnumeratorAdapter.MoveNext. First, you create an IAsyncCursor out of the IAsyncCursorSource in an async fashion with ToCursorAsync (which is possible because MoveNext returns a Task). Then you call (and await) MoveNext on the created _asyncCursor. If it returned true then _asyncCursor.Current contains a batch of items you can enumerate and call _asyncCursor.MoveNext again when the batch is completed. You repeat that continually until the underlying MoveNext returns false meaning there are no more items to enumerate.

    public async Task<bool> MoveNext(CancellationToken cancellationToken)
    {
        if (_asyncCursor == null)
        {
            _asyncCursor = await _asyncCursorSource.ToCursorAsync(cancellationToken);
        }
        if (_batchEnumerator != null && _batchEnumerator.MoveNext())
        {
            return true;
        }
        if (_asyncCursor != null && await _asyncCursor.MoveNextAsync(cancellationToken))
        {
            _batchEnumerator?.Dispose();
            _batchEnumerator = _asyncCursor.Current.GetEnumerator();
            return _batchEnumerator.MoveNext();
        }
        return false;
    }
    

    Here is the full code for the adapters:

    public static class AsyncCursorSourceExtensions
    {
        public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
            this IAsyncCursorSource<T> asyncCursorSource) => 
            new AsyncEnumerableAdapter<T>(asyncCursorSource);
    
        private class AsyncEnumerableAdapter<T> : IAsyncEnumerable<T>
        {
            private readonly IAsyncCursorSource<T> _asyncCursorSource;
    
            public AsyncEnumerableAdapter(IAsyncCursorSource<T> asyncCursorSource)
            {
                _asyncCursorSource = asyncCursorSource;
            }
    
            public IAsyncEnumerator<T> GetEnumerator() => 
                new AsyncEnumeratorAdapter<T>(_asyncCursorSource);
        }
    
        private class AsyncEnumeratorAdapter<T> : IAsyncEnumerator<T>
        {
            private readonly IAsyncCursorSource<T> _asyncCursorSource;
            private IAsyncCursor<T> _asyncCursor;
            private IEnumerator<T> _batchEnumerator;
    
            public T Current => _batchEnumerator.Current;
    
            public AsyncEnumeratorAdapter(IAsyncCursorSource<T> asyncCursorSource)
            {
                _asyncCursorSource = asyncCursorSource;
            }
    
            public async Task<bool> MoveNext(CancellationToken cancellationToken)
            {
                if (_asyncCursor == null)
                {
                    _asyncCursor = await _asyncCursorSource.ToCursorAsync(cancellationToken);
                }
                
                if (_batchEnumerator != null &&
                    _batchEnumerator.MoveNext())
                {
                    return true;
                }
                
                if (_asyncCursor != null &&
                    await _asyncCursor.MoveNextAsync(cancellationToken))
                {
                    _batchEnumerator?.Dispose();
                    _batchEnumerator = _asyncCursor.Current.GetEnumerator();
                    return _batchEnumerator.MoveNext();
                }
                
                return false;
            }
    
            public void Dispose()
            {
                _asyncCursor?.Dispose();
                _asyncCursor = null;
            }
        }
    }