Sharp Learning Curve

An Improved Rx Approach for Limiting Asynchronous Calls

Not a day after I posted Using Reactive Extensions To Throttle Asynchronous Tasks, Josh Bush was already (kindly) saying “I think your code may have a problem”. The issue with the first example is two-fold: one it doesn’t really work as posted and two, if it did it would behave in a less than ideal way. Basically, it calls wait after each item. Not exactly what I was going for.

It Would Be Cool If…

You have some known/unknown quantity of total work (Y), and you want to limit the number of worker threads in process at any given time (X). What my first try was actually doing was making X asynchronous calls and for every call past that (X+N) it was immediately calling WaitOne on the wait handle.

Josh To The Rescue

I fought with my example for an hour or so and realized that there wasn’t just some really simple thing for me to tack on to the existing code sample. This morning, I read Josh’s post about his approach to limiting the number of asynchronous calls using IEnumerable only. The key to his approach (and little did I know, to mine as well) was the really cool Aggregate extension method.

My Improved Solution

As you can see, the main difference is that I’m using a HashSet to aggregate calls over the stream. Every time I clear out any calls that have completed to prevent completed calls from making my code behave as if the number of calls in process have reached the limit. Every time I add the most recent async handle to the set and then, if the set is above the limit, I take an uncompleted call and wait on it.

Well That Wasn’t So Bad, Was It?

By now you’ve certainly realized I’m not an Rx expert, I’m just sharing what I’m trying to learn as I learn it. Hopefully it’s more helpful than it is distracting.