Jul 1 2010

An Improved Rx Approach For Limiting Asynchronous Calls

Category: .Net Framework | Tips | ToolsAlexRobson @ 00:44

Not a day after I posted Using Reactive Extensions To Throttle Asynchronous Tasks, Josh Bush was already (kindly) saying “I think your code may have a problem”. The issue with the first example is two-fold: one it doesn’t really work as posted and two, if it did it would behave in a less than ideal way. Basically, it calls wait after each item. Not exactly what I was going for.

It Would Be Cool If…

You have some known/unknown quantity of total work (Y), and you want to limit the number of worker threads in process at any given time (X). What my first try was actually doing was making X asynchronous calls and for every call past that (X+N) it was immediately calling WaitOne on the wait handle.

Josh To The Rescue

I fought with my example for an hour or so and realized that there wasn’t just some really simple thing for me to tack on to the existing code sample. This morning, I read Josh’s post about his approach to limiting the number of asynchronous calls using IEnumerable only. The key to his approach (and little did I know, to mine as well) was the really cool Aggregate extension method.

My Improved Solution

Action<IList<XElement>> saveAction = SaveChunk;
var loader = new BulkPostLoader(@"e:\stackoverflow\062010 so\posts.xml");
var batches = loader.BufferWithCount(5000);
var results = batches.Select(x => saveAction.BeginInvoke(x, null, null));
        
results
    .Aggregate(new HashSet<IAsyncResult>(), (set, result) =>
    {
        set.RemoveWhere(x => x.IsCompleted);
        set.Add(result);
        if(set.Count > 5)
        {
            var inProcess = set.Where(x => !x.IsCompleted).FirstOrDefault();
            if(inProcess != null)
            {
                inProcess.AsyncWaitHandle.WaitOne();
            }
        }
        return set;
    }
    .Subscribe(x => {});

As you can see, the main difference is that I’m using a HashSet to aggregate calls over the stream. Every time I clear out any calls that have completed to prevent completed calls from making my code behave as if the number of calls in process have reached the limit. Every time I add the most recent async handle to the set and then, if the set is above the limit, I take an uncompleted call and wait on it.

Well That Wasn’t So Bad, Was It?

By now you’ve certainly realized I’m not an Rx expert, I’m just sharing what I’m trying to learn as I learn it. Hopefully it’s more helpful than it is distracting.

Tags: ,

Jun 29 2010

Using Reactive Extensions To Throttle Asynchronous Tasks

Category: Tips | ToolsAlexRobson @ 07:58

This weekend I discovered two very valuable ways to control asynchronous processes using the fantastic, amazing and immensely useful Reactive Extensions from the Microsoft Could Programmability Team (not the Research team as I had previously thought).

Limiting The Number Of Asynchronous Tasks

I recently blogged about ETL with Couch, Relax and Symbiote here. The source batched asynchronous results and blocked until they were completed causing the work to get queued 5 calls at a time and then block until those 5 finish. Not exactly ideal. (thanks Josh Bush for pointing this out)

EDIT

This first approach has a pretty serious bug in it. It only sort of works as described for a much better solution (thanks again to Josh for providing me with a good idea) please see this new post: An Improved Rx Approach For Limiting Asynchronous Calls

Here’s the code snippet:

Action<IList<XElement>> saveAction = SaveChunk;
var loader = new BulkPostLoader(@"e:\stackoverflow\062010 so\posts.xml");
var batches = loader.BufferWithCount(5000);
var results = batches.Select(x => saveAction.BeginInvoke(x, null, null));
        
results
    .BufferWithCount(5)
    .Subscribe(x => x.ForEach(y => y.AsyncWaitHandle.WaitOne()));

The fix here is probably quite obvious to anyone more familiar with Reactive Extensions than I am. If I want to ensure a steady flow of asynchronous tasks and only limit how many are running at a time, I can use a the .DoWhile extension method and provide a predicate which will only activate the following Subscriber when the condition is met. Here’s the code block from above with the change applied:

Action<IList<XElement>> saveAction = SaveChunk;
var loader = new BulkPostLoader(@"e:\stackoverflow\062010 so\posts.xml");
var batches = loader.BufferWithCount(5000);
var results = batches.Select(x => saveAction.BeginInvoke(x, null, null));
        
results
    .DoWhile(()=> results.Count > 5)
    .Subscribe(x => x.AsyncWaitHandle.WaitOne()));

Clean and simple. Have I mentioned I love the Reactive Extensions?

Throttling Execution Rate

While this next example certainly isn’t the only way one can throttle asynchronous processes, I liked this so much I wanted to share it. The challenge I was facing is that I don’t know how much work is actually available, I can only monitor how quickly work becomes available. I want to maximize throughput and minimize latency and still be kind to the processor.

First step is exposing the work load as an IObservable or IEnumerable and then using the Reactive Extensions to monitor the time intervals between each call. You can then use this interval to calculate a thread sleep time to use when there is no work available.

Here’s some very generic pseudo-code:

// First the code running inside the IObservable which
// is dequeueing messages and then notifying all the
// subscribed observers. This code is running in a
// worker thread (asynchronously) from the main thread. while(Running) { // this is where we check to see if there's any work var message = queue.Dequeue(); if(message != null) NotifyObservers(message) // calls send on each of the // observers else Thread.Sleep(SleepFor); // SleepFor would be a property // that we can set externally } // This code runs outside of the IObservable and is // using the Reactive Extensions to react everytime // a message is processed. In this case, I'm doing the // simplest thing and just setting sleep time to the // last interval between messages but you can do other // things... observer .TimeInterval() .Subscribe(x => { observer.SleepFor = x.Interval.Miliseconds; });

While that’s a very simplified version of what I’m actually using, I hope it demonstrates the principle. You can get insight into the time between each message and use it to throttle the loop that’s checking for work so that the code isn’t just cycling as fast as it can when there’s nothing to do and using up valuable processor cycles.

Suggestions? Feedback?

I’d be interested in seeing other patterns similar to these using the Reactive Extensions, it really is a great tool!

Tags: ,

Feb 5 2010

How To Fix Broken Authentication To Gitosis

Category: TipsAlexRobson @ 10:52

I recently had to replace a hard drive in my laptop. While this was a great opportunity to clean things up and have a fresh machine, it turn into a huge, huge time-sink :(

We’ve started using git internally and for our “primary” git repository we’re using gitosis. Normally gitosis makes things “just work” so well that the fact that the server is debian and not even on the domain is completely transparent to the developers. As you may know, git uses ssh to handle communication. In order to do that, you generate a key pair and share your public key with the git server which it uses to authenticate you (in addition to your password).

The problem is that I “lost” my key pair that was originally provided to gitosis for authentication. Even though I could still get my public key from the server, I couldn’t re-generate the same private key… The only way gitosis supports addition/changing of keys is through the gitosis-config repository. You actually configure gitosis by pulling it’s configuration to your local machine via git, check-in the changes and push it back to the server. Every time you push, a post-update trigger fires to do some gitosis magic.

Well, since this is a new process and I’m the only one who knows diddly about gitosis, I was the only person able to change the gitosis configuration… yeah, I’m that dumb. The problem was that no matter what I tried, I could not get gitosis to accept a new key for my account; and I tried a lot of different things.

What finally worked was directly editing the authorized_keys file (which you’re not supposed to ever do), and change the existing key to my new public key. I then changed the key in the gitosis-admin repository and was able to push that back up to the server so that it would properly reconfigure itself (and also not break in the future due to a stale public key getting pushed).

Hopefully that will help someone else out there, I lost the better part of my day : \

Tags: , ,

Aug 25 2008

Translate List&lt;A&gt; to List&lt;B&gt; With LINQ

Category: .Net Framework | TipsAlexRobson @ 17:23

Let me just skip to the fun bit that the title promised before going off on some rant-flavored-jelly-filled-technical-jargon-journey that you really didn't pay for. If you have a generic List of type A and you want a generic List of type B, AND you know A can be cast as B then you could do the following:

List<B> TranslateList<A, B>(List<A> listofATypeThingies)
{
    List<B> listofBTypeThingies;
    
    foreach(A item in listofATypeThingies)
    {
        listofBTypeThingies.Add((B) item);
    }
 
    return listofBTypeThingies;
}

And then call TranslateList all over the place until the end of time. I prefer the following LINQified snippet:

List<B> listofBTypeThingies = new List<B>(listofATypeThingies.Select(item => (B) item));

Yes. The appropriate response in this case would be "Oh snap!"

*** EDIT ***

Fortunately for all of us, someone smarter than me is reading this blog so that they can correct me when I post something ignorant (like just now). There's already an extension method that addresses this particular use case and can be used like so:

   1: List<B> listofBTypeThingies = new List<B>(listofATypeThingies.Cast<B>());

Thanks to Jeff Cutsinger for catching this one. (If you ever start posting to a blog, Jeff, I'll link it :)

******

If you want to know WHY you have to do this in the first place read on. Otherwise, ignorance really is bliss. I know, because I'm still pretty ignorant and consequently, moderately happy.

First you need to speak the language of computer science. No, I'm not going to regurgitate text book definitions at you, if you want that, read a text book. But in order to understand the why, you need to know the following definitions (someone correct me if I'm butchering this) covariance allows the implicit conversion from a base type to a derived type, contravariance allows implicit conversion from a derived type to a base type and invariance disallows type conversion.

.Net allows for covariance in return types, function arguments, delegates and arrays. Generic type parameters are always invariant. At first I was irritable because I thought it was an unreasonable limitation, but then I started to play around with all this and now I understand why there's a genuine need for some type invariance in statically typed languages that allow side effects. Behold, I give you example code:

   1: public class MotherOfAllClasses
   2: {
   3: }
   4:  
   5: public class Child1 : MotherOfAllClasses
   6: {    
   7: }
   8:  
   9: public class Child2 : MotherOfAllClasses
  10: {    
  11: }
  12:  
  13: public class TypeShinanigans
  14: {
  15:     public void ThorDestroyerOfTypeSafety()
  16:     {
  17:         Child1 child1 = new Child1();
  18:         Child2 child2 = new Child2();
  19:  
  20:         MotherOfAllClasses mom1 = new MotherOfAllClasses();
  21:         MotherOfAllClasses mom_Child1 = new Child1();
  22:         MotherOfAllClasses mom_Child2 = new Child2();
  23:  
  24:         //This line is legal because mom_Child1 actually
  25:         //holds a reference to an instance of Child1
  26:         Child1 workingCast1 = (Child1)mom_Child1;
  27:  
  28:         //Here is some wonderfully breaking code
  29:         //which upon first glance may seem legal
  30:         //and even more disturbingly will build
  31:         
  32:         //Child1 breakingCast1 = (Child1) mom1;
  33:         //Child2 breakingCast2 = (Child2) mom1;
  34:         //Child1 breakingCast3 = (Child1) mom_Child2;
  35:  
  36:         //The third line here will break because we're
  37:         //trying to sneak in an instance of the parent class
  38:         //into an array of Child1 which as we saw earlier is
  39:         //a runtime exception
  40:         Child1[] chillenz1 = new Child1[5];
  41:         MotherOfAllClasses[] mamaz = chillenz1;
  42:         mamaz[0] = new MotherOfAllClasses();
  43:     }
  44: }

Hopefully that's helpful to understand WHY variance is dangerous enough to begin with but would be especially easy to break in conjunction with generic usage. If you'd like to discuss it more, I'll tell you what I can but I'm at a slight disadvantage not having taken lambda calculus (no, I'm not making it up, it's a real class).

Tags: ,