I’ve been working on Relax a lot lately. I’ve recently added a Lucene.Net Symbiote project which Relax then uses to provide document indexing and LINQ queries for CouchDB (more about that in another post). A very important part of getting Relax to an RC is understanding how it all behaves under high load.
But what’s high load? We generally target SMBs or internal applications which aren’t going to see social networking kinds of stress. Still, I like knowing the upper bounds of what I’m working on.
I think if you’re familiar with this type of problem and you’re familiar with Rx, just looking at the code samples is probably all you need to appreciate what this is doing. This was a learning experience for me, and I was so happy with what a drastic improvement Reactive Extensions allowed me to easily introduce, I wanted to share it.
Finding A Good Source Of Data
I chose the Stack Overflow dataset (you need to scroll down to find the link to the ClearBits link). Though I often disagree philosophically with Jeff Atwood and Joel Spolsky say, Stack Overflow is a good thing and I really admire the SO team for sharing their data.
I’m only using the posts file atm, which is > 2 million records and the file size is roughly 2.8 GB. I think that’s plenty of data for what I need : )
The Best Way To Bulk Load In Relax / CouchDB
CouchDB provides a bulk document API which allows us to store multiple documents at once in order to save on the overhead involved in the persistence call. Relax makes extensive use of this API behind the scenes. In this case, we want to be able to batch several thousand documents together to persist at once to minimize the overhead cost.
The other thing to note is that CouchDB handles concurrent load exceptionally well (at least from my experience) and so I want the save commands firing off asynchronously as soon as the batch is ready.
Let The Fun Begin
The SO data is all in XML. Yay. This would allow me to use an XML reader to stream through the file and create documents. I’m doing this through an IObservable implementation. I use a base abstract class that provides me with my standard IObservable code. It’s nothing magic, but here’s the source for the sake of clarity:
public abstract class BaseObservable<TNotification>
: IObservable<TNotification>, IDisposable
{
protected ConcurrentBag<IObserver<TNotification>> observers { get; set; }
public virtual void Notify(TNotification notification)
{
observers.ForEach(x => x.OnNext(notification));
}
public virtual void SendCompletion()
{
observers.ForEach(x => x.OnCompleted());
}
public virtual IDisposable Subscribe(IObserver<TNotification> observer)
{
var disposable = this as IDisposable;
observers.Add(observer);
return disposable;
}
protected BaseObservable()
{
this.observers = new ConcurrentBag<IObserver<TNotification>>();
}
public void Dispose()
{
while (observers.Count > 0)
{
IObserver<TNotification> o;
observers.TryTake(out o);
}
}
}
Now for the important part: the observable XmlReader:
public class PostReader
: BaseObservable<XElement>
{
protected string xmlExportPath { get; set; }
public void Start()
{
using(var stream = new FileStream(
xmlExportPath,
FileMode.Open,
FileAccess.Read,
FileShare.None,
2048,
true))
{
using(var reader = XmlReader.Create(stream))
{
reader.MoveToContent();
while(reader.Read())
{
if(reader.NodeType == XmlNodeType.Element && reader.Name == "row")
{
var element = XElement.ReadFrom(reader) as XElement;
Notify(element);
}
}
}
}
SendCompletion();
}
public PostReader(string xmlExportPath)
{
this.xmlExportPath = xmlExportPath;
}
}
Basically, all I’m doing is reading in each row element (the row element represents a Post item), creating an XElement, notifying the observer(s), and sending the complete signal after the entire file has been read.
Why Bother With The Reactive Extensions?
Starting off, I didn’t know how many records I was looking at. I did know I didn’t want to deserialize everything into memory first and then save because that’s a waste of time, waste of RAM and wouldn’t be easy to parallelize. I also know from experience that my bottleneck is IO. Spinning up async tasks faster than the tasks can complete creates memory issues and, in this case, out of memory exceptions.
I’m not suggesting you can’t handle all this without Reactive Extensions. I am suggesting you won’t be able to do it as elegantly or as simply without them.
Enter Reactive Extensions
The Reactive Extensions (or Rx) is a library from Microsoft DevLabs and created by Erik Meijer and his team of ninja assassin developers. Rx and now RxJS are two projects you really ought to be learning about. And yes, that dizzy feeling you’ll get is normal; the human brain isn’t meant to take in so much distilled awesome.
Rx makes it easy to program against asynchronous event streams. Take a moment to think about that and let it sink in…
Making Friends With IObservable
Get comfortable with IObservable because it’s the core of Rx. I like to think of IObservable as a message pump. Eric Meijer likes to compare IObservable with IEnumerable: essentially he sums it up as IEnumerable is a pull mechanism and IObservable is a push mechanism. Rx helps bridge the gap between functionality and tooling for pull mechanisms and push mechanisms and in some cases allows us to interchange the two.
Enough Talk
I’m going to show you the rest of the code and then break it down. Each section has a header so if you’re not interested in that portion of the source, just skip ahead.
class Program
{
static void Main(string[] args)
{
Assimilate
.Core()
.Daemon(x => x
.Arguments(args)
.Name("SOBulkLoader")
.DisplayName("Stack Overflow Bulk Loading Service")
.Description("Does what it says"))
.Relax(x => x.UseDefaults().TimeOut(1000000))
.AddConsoleLogger<LoadingService>(x => x.Info().MessageLayout(m => m.Date().Message().Newline()))
.RunDaemon();
}
}
public class LoadingService
: IDaemon
{
protected IDocumentRepository repository { get; set; }
protected XmlSerializer postSerializer { get; set; }
public void Start()
{
"Loading service starting"
.ToInfo<LoadingService>();
Action<IList<XElement>> saveAction = SaveChunk;
var loader = new BulkPostLoader(@"e:\stackoverflow\062010 so\posts.xml");
var batches = loader.BufferWithCount(5000);
var results = batches.Select(x => saveAction.BeginInvoke(x, null, null));
results
.BufferWithCount(5)
.Subscribe(x => x.ForEach(y => y.AsyncWaitHandle.WaitOne()));
loader.Start();
}
protected void SaveChunk(IList<XElement> x)
{
var list = x.Select(ProcessPost).ToList();
repository.SaveAll(list);
"Posts {0} to {1} chunked and saved"
.ToInfo<LoadingService>(list.First().Id, list.Last().Id);
}
public Post ProcessPost(XElement element)
{
var content = element.ToString();
return postSerializer.Deserialize(new StringReader(content)) as Post;
}
public void Stop()
{
"Loading service stopping"
.ToInfo<LoadingService>();
}
public LoadingService(IDocumentRepository repository)
{
this.repository = repository;
this.postSerializer = new XmlSerializer(typeof (Post));
}
}
Program Main (A Shameless Symbiote Plug)
If you haven’t seen it before, this is a Symbiote Assimilation call: a centralized, fluent API for configuring multiple open source frameworks. In this code, I’m initializing Symbiote with a call to .Core() (that’s always required). Next I define the service I’m creating using Daemon. Then I’m using the default configuration for Relax and changing the timeout to 1k seconds (more that I need). I’m also adding a Log4Net console logger and telling it how I want the message layed out. Lastly, I’m starting the Daemon. The great thing is that Symbiote is registering everything (including configuration and all the different project dependencies) with StructureMap, which has a lot of good implications
What’s an IDaemon?
The Daemon project takes TopShelf and makes it super easy to create windows services. IDaemon requires Start and Stop methods and Symbiote handles the rest (like dependency injection, etc.) While TopShelf is really for Windows Services, I use it for all my console applications because it adds some really nice things and it’s simple to use.
IDocumentRepository
This is the primary interface for storage and retrieval of documents in CouchDB. I’m taking a dependency on it which is supplied by Symbiote when it instantiates and runs the service.
Putting Rx To Work
The load variable is an instance of the PostReader class and takes the path to the posts xml file. From that, we use the BufferWithCount extension method from Rx to produce a new IObservable<IList<XElement>>. At this point I hope you picked up on two things: 1) I haven’t called loader.Start() yet, so nothing is happening. 2) loader is an instance of IObservable<XElement> but calling BufferWithCount produces an IObservable<IList<XElement>> meaning that it transforms messages from the origin to a list of messages of the requested size.
It’s about to get more awesomer. Now that I have an observable that will produce messages containing a list of 5k messages, I want a way to asynchronously queue transforming and persisting these. Calling select against the batches observable lets me kick off an asynchronous call to the SaveChunk method (via the delegate defined earlier). This produces a new IObservable<IAsyncResult> so now we have an observable list of asynchronous results. The usefulness may not seem readily apparent, but remember BufferWithCount? I can use that same call to batch IAsyncResults, then subscribe to each batch of five, get the wait handles and block until all calls in the batch have completed.
Once I have created the IObservables and set everything up, I then tell my loader to start. Everything up to that call is wiring up and defining how I want to handle the XElements as they’re produced. Since I don’t want any XElements getting lost, I don’t start the message pump until everything’s in place.
The Result
Running all this on my local development (moderate Core 2 Duo) laptop yields 100,000 inserts per minute for 15 minutes. Memory utilization is hardly noticeable, this process is actually CPU bound due to the transform from XML to class type being fairly expensive.
To put these metrics into some perspective; the highest tweet-per-minute average this month (88k) could be imported real-time into CouchDB via Relax without any special tuning or hardware.
Tags: couchdb, symbiote, relax, reactive