This weekend I discovered two very valuable ways to control asynchronous processes using the fantastic, amazing and immensely useful Reactive Extensions from the Microsoft Could Programmability Team (not the Research team as I had previously thought).
Limiting The Number Of Asynchronous Tasks
I recently blogged about ETL with Couch, Relax and Symbiote here. The source batched asynchronous results and blocked until they were completed causing the work to get queued 5 calls at a time and then block until those 5 finish. Not exactly ideal. (thanks Josh Bush for pointing this out)
EDIT
This first approach has a pretty serious bug in it. It only sort of works as described for a much better solution (thanks again to Josh for providing me with a good idea) please see this new post: An Improved Rx Approach For Limiting Asynchronous Calls
Here’s the code snippet:
Action<IList<XElement>> saveAction = SaveChunk;
var loader = new BulkPostLoader(@"e:\stackoverflow\062010 so\posts.xml");
var batches = loader.BufferWithCount(5000);
var results = batches.Select(x => saveAction.BeginInvoke(x, null, null));
results
.BufferWithCount(5)
.Subscribe(x => x.ForEach(y => y.AsyncWaitHandle.WaitOne()));
The fix here is probably quite obvious to anyone more familiar with Reactive Extensions than I am. If I want to ensure a steady flow of asynchronous tasks and only limit how many are running at a time, I can use a the .DoWhile extension method and provide a predicate which will only activate the following Subscriber when the condition is met. Here’s the code block from above with the change applied:
Action<IList<XElement>> saveAction = SaveChunk;
var loader = new BulkPostLoader(@"e:\stackoverflow\062010 so\posts.xml");
var batches = loader.BufferWithCount(5000);
var results = batches.Select(x => saveAction.BeginInvoke(x, null, null));
results
.DoWhile(()=> results.Count > 5)
.Subscribe(x => x.AsyncWaitHandle.WaitOne()));
Clean and simple. Have I mentioned I love the Reactive Extensions?
Throttling Execution Rate
While this next example certainly isn’t the only way one can throttle asynchronous processes, I liked this so much I wanted to share it. The challenge I was facing is that I don’t know how much work is actually available, I can only monitor how quickly work becomes available. I want to maximize throughput and minimize latency and still be kind to the processor.
First step is exposing the work load as an IObservable or IEnumerable and then using the Reactive Extensions to monitor the time intervals between each call. You can then use this interval to calculate a thread sleep time to use when there is no work available.
Here’s some very generic pseudo-code:
// First the code running inside the IObservable which
// is dequeueing messages and then notifying all the
// subscribed observers. This code is running in a
// worker thread (asynchronously) from the main thread.
while(Running)
{
// this is where we check to see if there's any work
var message = queue.Dequeue();
if(message != null)
NotifyObservers(message) // calls send on each of the
// observers
else
Thread.Sleep(SleepFor); // SleepFor would be a property
// that we can set externally
}
// This code runs outside of the IObservable and is
// using the Reactive Extensions to react everytime
// a message is processed. In this case, I'm doing the
// simplest thing and just setting sleep time to the
// last interval between messages but you can do other
// things...
observer
.TimeInterval()
.Subscribe(x =>
{
observer.SleepFor = x.Interval.Miliseconds;
});
While that’s a very simplified version of what I’m actually using, I hope it demonstrates the principle. You can get insight into the time between each message and use it to throttle the loop that’s checking for work so that the code isn’t just cycling as fast as it can when there’s nothing to do and using up valuable processor cycles.
Suggestions? Feedback?
I’d be interested in seeing other patterns similar to these using the Reactive Extensions, it really is a great tool!
Tags: Reactive Extensions, Threading