Asynchronous Streams

 



Asynchronously enumerating over all the results of an API that uses parameters for paging:

async IAsyncEnumerable<string> GetValuesAsync(HttpClient client)
{
  int offset = 0;
  const int limit = 10;
  while (true)
  {
    // Get the current page of results and parse them.
    string result = await client.GetStringAsync(
        $"https://example.com/api/values?offset={offset}&limit={limit}");
    string[] valuesOnThisPage = result.Split('\n');

    // Produce the results for this page.
    foreach (string value in valuesOnThisPage)
      yield return value;

    // If this is the last page, we're done.
    if (valuesOnThisPage.Length != limit)
      break;

    // Otherwise, proceed to the next page.
    offset += limit;
  }
}

When GetValuesAsync starts, it does an asynchronous request for the first page of data, and then produces the first element. When the second element is then requested, GetValuesAsync produces it immediately, since it is also in that same first page of data. The next element is also in that page, and so on, up to 10 elements. Then, when the 11th element is requested, all the values in valuesOnThisPage will have been produced, so there are no more elements on the first page. GetValuesAsync will continue executing its while loop, proceed to the next page, do an asynchronous request for the second page of data, receive back a new batch of values, and then it’ll produce the 11th element.

Processing of each element:

IAsyncEnumerable<string> GetValuesAsync(HttpClient client);

public async Task ProcessValueAsync(HttpClient client)
{
  await foreach (string value in GetValuesAsync(client))
  {
    await Task.Delay(100); // asynchronous work
    Console.WriteLine(value);
  }
}


As an example, one of the common questions about LINQ is how to use the Where operator if the predicate for Where is asynchronous. In other words, you want to filter a sequence based on some asynchronous condition—e.g., you need to look up each element in a database or API to see if it should be included in the result sequence. Where doesn’t work with an asynchronous condition because the Where operator requires that its delegate return an immediate, synchronous answer.

Asynchronous streams have a support library that defines many useful operators. In the following example, WhereAwait is the proper choice:

IAsyncEnumerable<int> values = SlowRange().WhereAwait(
    async value =>
    {
      // Do some asynchronous work to determine
      //  if this element should be included.
      await Task.Delay(10);
      return value % 2 == 0;
    });

await foreach (int result in values)
{
  Console.WriteLine(result);
}

// Produce sequence that slows down as it progresses.
async IAsyncEnumerable<int> SlowRange()
{
  for (int i = 0; i != 10; ++i)
  {
    await Task.Delay(i * 100);
    yield return i;
  }
}

public interface IMyAsyncHttpService
{
  void DownloadString(Uri address, Action<string, Exception> callback);
}

await Task.Run(() => Parallel.ForEach(...));