Processing Large Datasets in Real TimeFrom Workingmouse WikiIntroductionTom Adams, Workingmouse, December 2007 I've had the good fortune to recently complete a project at Veitch Lister Consulting (VLC) (a transport planning consultancy) processing large datasets in real-time. This article is a summary of the technical aspects of the project, I won't be talking about the process we used, suffice to say it was XP-like, supported by tools such as my BDD framework Instinct and web-based agile project management tools. The project ran for around 7-9 weeks, the team was comprised of five developers (3 full-time on the project) consisting of two Workingmouse developers, myself and Sanjiv Sahayam, and three VLC developers, Jamie Cook, Glen Maddern and Nick Partridge. As a consultancy we usually work in corporate environments and as such are bound by the architectural constraints of the organisation. This usually includes the usual "enterprise" constraints such as language (Java 1.4 is common), application servers (usually WebLogic or WebSphere) and databases (Oracle or DB2). However this was one of those rare projects where you have no technical constraints and have the fortune to be working with great people. A presentation based on this work is available on the Off the Grid page. Finding a working solutionGoing in, we had three basic requirements, 1) must be callable from a Rails-based web app, 2) must return responses to typical requests in real-time to the webapp (30 seconds was our target) and 3) must query against a dataset that was initially estimated at around 45 TB, but later came down to around 100 GB. Our technical approach initially was two-fold, firstly to understand the nature of the problem and size of the datasets, and secondly to research potential solutions to the problem. I won't talk too much about the nature of the problem domain and type of data, except to say that the data is the output from transportation simulations and ended up being around 100 GB (of raw uncompressed text), which has since been converted into a binary format. To use SQL nomenclature, our data was divided into three tables, our requests to the data storage layer consisted of a single query consisting of two joins between these three tables. The system then performed further data analysis based on the results of the query. We had complete control over the data generation (this company wrote the piece that generates the data) and storage format, something that we could exploit to provide performance far above conventional generic storage engines (i.e. our solution can not be generalised). The system we were developing was completely read only, which simplified our design considerably. Our dataset was initially estimated to be around 45 TB (based on extrapolations from the data generation component), which led us to think we'd need to distribute the data across multiple machines. Our first investigations (e.g. Hadoop) were based around this. Later, as we worked out we could batch process the majority of the data offline (as an input to the real-time system) and input only a subset, we were able to look towards conventional tools such as SQL databases. As there were enough problems to solve already, we didn't want to write our own storage engine. We looked towards conventional SQL databases to achieve this, including: We spent several weeks tuning the data, the queries, indices and the databases themselves, however we were not able to get the performance that we required out of any of the conventional SQL databases. To be fair, we didn't complete our investigations into DB2, and Oracle failed to complete its install on two separate occasions. PostgreSQL was the worst performer and MySQL, while good for the smaller datasets we threw at it (~10 GB), would not have been performant enough even if it maintained linear scalability with the larger dataset. We also looked at distributed memory solutions like those provided by GigaSpaces and Terracotta however we no longer needed to distribute the data, so they weren't suited. After switching away from traditional SQL databases, we looked at a number of column databases including: However these were either unsupported and outdated (C-Store), core-dumped when queried (two versions of MonetDB, both source & binaries) or were not feature rich enough for our needs at the time (HBase). We also briefly looked at commercial column stores such as: In the end we ruled these out also as we were starting to think we could easily solve our problem using a plain vanilla filesystem and some simple parsing code, and we didn't want to deal with the sales teams at these companies (I've personally been there before and knew the process would not be pretty). The data storage solution we came up with was laughable simply. We stored CSV files on a filesystem (ext3 on our dev boxes) and indexed into the data using directories. As we had to search linearly (table scan equivalent) through some of the CSV files we split these files into chunks. We were thus able to distribute the requests based on chunk to each of our nodes. The downside of this approach was that the chunking was a manual process, we had to select the chunk size (31 initially) up front, there was no dynamic indexing. The upside was that we didn't need to maintain the indexing scheme (using B or AVL trees) and the files could be batch updated with only a small configuration change (number of chunks). We've found that our data storage was extremely performant, contributing only 18% to the total time it takes to process a request, the remainder of the time is spent in analysing the data and distribution overhead. Subsequent conversion of the CSV files to a binary format result in even faster processing as we no longer need to parse out the CSV delimiters. Our initial performance tests of the core algorithm showed that we'd need to distribute the computation (and potentially the data) in order to achieve our "real-time" goal. Based on this we looked at languages such as Erlang and Scala for their Actor concurrency model, both languages also have ways of distributing work across multiple machines. Although our algorithm (and data) was parallelisable, we didn't feel that it was amenable to a high level of concurrency, and our research showed us that we could build the system using Java-based tools. This along with the fact that none of the developers were proficient with either Erlang or Scala swayed us back towards Java (even considering all of its problems). Workingmouse does perform research around functional languages including the newly developed Scalaz library, however at the time only one of our guys was highly proficient in Scala and we were on a very tight time line. The risk associated with four out of five in the team learning a new language and supporting tools was thought to be too great. Now that we had a candidate language, we looked at a bunch of Java-based "grid" or parallel computing frameworks including: These tools perform different functions, some distribute only the data and some distribute only the computation. By now we were fairly sure that we only needed to distribute the computation, so technologies such as Coherence, Terracotta and GigaSpaces were not appropriate. During this time we'd been prototyping simple algorithms using Hadoop and GridGain. We'd found GridGain very easy to get started and performance tests showed that it added only 10-200 ms to the overhead of calls when compared to local (in-JVM) invocations. Hadoop was promising, however it was also quite complicated to setup (installation and creation of readers for input) and use, and provided components that we didn't need, such as the distributed filesystem. We were also unsure how suited it was for interactive use, it looked to be optimised for long running batch operations. In the end, the ease of use and flexibility of GridGain made it a good candidate for the initial distribution, and continues to be used to date. GridGainGridGain has what it calls SPIs (service provider interfaces) which are a fancy way of saying an API whose implementation may be provided by a third party and which I thought had long since been relegated to the bad old days of XML parsers. These make it easy to swap in different implementations of node discovery or node-to-node communication for example. We used this to great effect when we discovered that Amazon's EC2 did not support IP multicast, we easily swapped this out for a JGroups implementation, which has since been swapped out for a JMS-based implementation. For its good points however, there are some downsides (I'm being quite picky here, it's a really good framework to use and works quite well).
We also encountered issues with the discovery SPI, initially driven by EC2 not supporting IP multicast, and later by issues with JGroups. With our configuration of JGroups, we found that with larger grids (8 or 16 nodes), often the master node wouldn't discover every other node available. We could often rectify the situation by starting our nodes in a specific order (all processing nodes, then the master node) - but we would still occasionally miss a grid node or two. What was more worrying was that with long-running CPU-intensive tasks some of the nodes would drop off the grid (according to the master node's logs). We probably would've persisted with JGroups even with this problem, except that once a node dropped off the grid it was never re-discovered by the master node. Eventually the grid would dwindle to our single non-processing node (our master node was non-processing) and fall over. Because of this we ended up swapping the JGroups discovery implementation out for a JMS-based one. Given we already had a JMS service running inside the application server, switching to it was fairly painless. Grid discovery with JMS seems to work just as well as IP multicast, and there does not appear to be anymore overhead on our processing times. ResultsAfter we'd found an initial working solution it was time to tweak it. We'd actually been doing this all along, however we now had a couple of solid weeks to spend solely on performance tuning. We had two lines of attack here, firstly we looked at tuning the JVM and its garbage collector configuration (I'd had great success with this in the past), and secondly we looked at profiling the code for CPU hotspots and memory usage. Our biggest wins turned out to be reducing the memory overhead and optimising our Part of our algorithm called for loading 3.1 million objects into memory (and retaining them for fast lookup). Each of these objects implemented a base class ( Our algorithm also makes heavy use of maps and sets, so our I should note here that we didn't perform any premature optimisations, we profiled with real requests against real data, giving us the true problems not the things we thought would be problems. In all we spent around a week and a half tuning and were able to decrease the total process from around 50 seconds to around 1 second (for a single node on the local machine). We were lucky however in that we always had performance at the forefront and had chosen a good architecture, one that didn't need to change following performance testing (I had thought we'd need to do some radical rework to achieve desired performance). As a sample of the numbers we were getting, here is a performance graph that comes from us trying to determine how the number of nodes affects processing time. These numbers were measured before we'd done the performance optimisations mentioned above, the current system is around 5 times faster than this. Processing time in milliseconds is on the Y-axis and the number of GridGain nodes is on the X-axis, the results for 31 nodes are extrapolated. The red and blue lines show the average processing time as the number of nodes is increased (averaged across 5 and 20 requests respectively). For a single node the system performs the query in 30 seconds, for 16 nodes the system takes 7 seconds. The green line (time delta) shows the improvement we get in going from N nodes to N + 1 nodes. For a small number of nodes this is very significant - going from 2 to 3 nodes reduces the total time by 9086 ms - however becomes less significant as more nodes are added. Going from 8 to 11, 11 to 16 and 16 to 31 nodes only drops the processing time by 1243 ms, 1255 ms and 1219 ms respectively. So for this test, almost doubling the number of nodes from 16 to 31 (our maximum number of nodes for our given chunk size discussed above) only improves processing time by one second. At some point you approach a limit where adding more nodes does not decrease (single-request) processing time significantly. It may however provide more resilience so that the grid can handle more concurrent requests and cope better with failure. The yellow line shows the theoretical performance we should be getting by increasing the number of nodes (assuming no network etc. overhead). If we average the total overhead out across the all the nodes, as we increase the number of nodes this blows out from 95 milliseconds for 3 nodes to 210 milliseconds for 16 nodes. This overhead corresponds roughly to the GridGain overhead we saw in our GridGain sample project. DeploymentAfter we'd developed a workable solution we needed a hosting environment. The guys we were working with were pretty keen not to have to host the grid themselves and were looking at Amazon's EC2 for a low cost easy ramp up solution. The process of setting up nodes was pretty painless; we had registered, set up a node and documented the issues in about a day. The only real issues we had with EC2 is that it doesn't support IP multicast so we had to change the way our nodes found each other, IPs are also not static so management of clusters could become unwieldily. We also had issues with the IP address the machine thought it was on (the internal IP) and its external address, which meant code that returned the address of the server (the WSDL generation) returned the wrong (internal) address to external clients. There's issues with persistent data (there is none) so you need to store anything you want somewhere else like S3. Instances boot up from images (AMIs) so you can keep stuff there also, but it doesn't get saved when the instance goes down, and there's a limit on the amount of data you can store. We used S3 to store our data and were planning on automating the copying of the data across to nodes on boot. The EC2 VMs are quite nice, instances are easy to manage and work as advertised. The small instance can be a bit slow, we found them slower than our desktop machines, but the large instances are very nice, and 64-bit. To ease deployment, we used Capistrano to deploy our code, start and stop our nodes and the app server. Some of the guys also wrote scripts to automate the booting of our images, returning the (dynamic) IPs for SSH access. ConclusionsBased on our work, here are a few items to consider in summary.
|

