More fun with Java Concurrency: BlockingQueue

Christian Nelson ·

I’ve written about Java 5 concurrency in the past and I’ve recently had the opportunity to make use of another one of the concurrency constructs: the BlockingQueue.

The Problem

There’s a problem we’ve seen a few times in the last few years. From time to time, applications must import data from external systems and massage it into a form that is useful. Sometimes these feeds are streamed over the net while other times they’re in the form of massive text files. My current project has two such feeds that are imported on a weekly basis, the larger of the two rings in at around 10M entities.

Loading the data is pretty straightforward: entities are parsed from the source, a transient entity object is instantiated and the parsed values plugged in, entities are batched up and then persisted as a batch.

The Collaborators

The Parser is responsible for loading records one at a time from the source.

public interface Parser
{
    boolean hasNextRecord();
    T nextRecord();
    void close();
}

The Persister is responsible for saving batches of entities.

public interface Persister
{
    void initializeFeed();
    void insertBatch(List entities);
    void finalizeFeed();
}

And the FeedLoader is the glue that pulls it all together, it coordinates parsing records into a batch and then triggering persisting the batches when they’re ready. It’s used like this, where 1000 is the batch size:

Parser parser = new WombatParser(inputStream);
Persister persister = new WombatPersister(dataSource);
FeedStats stats = new FeedLoader(parser, persister, 1000).loadData();

Now that the stage is set and we’ve covered the parts, we can get into what happens behind loadData().

Synchronous Parsing and Persistence

Our first cut of loadData() was a simple synchronous implementation:

public class FeedLoader
{
    private Parser parser;
    private Persister persister;
    private int batchSize;

    public FeedLoader(Parser parser, Persister persister, int batchSize)
    {
        Validate.isTrue(batchSize > 0);
        this.parser = parser;
        this.persister = persister;
        this.batchSize = batchSize;
    }

    public FeedStats loadData()
    {
        persister.initializeFeed();
        List entities = new ArrayList(batchSize);

        while (parser.hasNextRecord())
        {
            entities.add(parser.nextRecord());
            if (entities.size() >= batchSize)
            {
                persister.insertBatch(entities);
                entities.clear();
            }
        }

        // Save the stragglers that didn't make it into the last batch.
        if (!entities.isEmpty())
        {
            persister.insertBatch(entities);
        }

        parser.close();
        persister.finalizeFeed();
        return new FeedStats(...);
    }
}

This worked well… we were able to process records at a throughput of about 3125 per second. After a bit of research I realized we were spending nearly as much time parsing records as we were persisting them. I also noticed that the load on the machine was pretty low during the import process. While there is a relationship between parsing and persisting, it seemed like there should be an easy way split the processes across multiple threads while keeping the code simple and readable.

Asynchronous processing with BlockingQueue and ExecutorService

Digging through java.util.concurrent, I came across BlockingQueue which is described as “A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.” Sounds like a great construct to bridge the gap between our Parser and Persister threads. The parser can add entities to the queue while the persister is pulling them off into batches. Let’s see what it looks like:

public class FeedLoader
{
    private Parser parser;
    private Persister persister;
    private int batchSize;
    private boolean done = false;

    public FeedLoader(Parser parser, Persister persister, int batchSize)
    {
        Validate.isTrue(batchSize > 0);
        this.parser = parser;
        this.persister = persister;
        this.batchSize = batchSize;
    }

    public FeedStats loadData()
    {
        persister.initializeFeed();

        BlockingQueue blockingQueue = new ArrayBlockingQueue(batchSize * 2);
        try
        {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            // invokeAll() blocks until both tasks have completed
            executorService.invokeAll(
                asList(new ParserTask(parser, blockingQueue),
                       new PersisterTask(persister, blockingQueue)));
            executorService.shutdown();
        }
        catch (InterruptedException e)
        {
            log.error("Failed to load feed.", e);
            throw new RuntimeException("Failed to load feed.", e);
        }

        persister.finalizeFeed();
        return new FeedStats(...);
    }

    class ParserTask implements Callable
    {
        Parser parser;
        BlockingQueue queue;

        ParserTask(Parser parser, BlockingQueue queue)
        {
            this.parser = parser;
            this.queue = queue;
        }

        public Object call()
        {
            while (parser.hasNextRecord())
            {
                try
                {
                    queue.put(parser.nextRecord());
                }
                catch (InterruptedException e)
                {
                    log.error("Failed to load feed.", e);
                    throw new RuntimeException("Failed to load feed.", e);
                }
            }
            parser.close();
            done = true; // Indicates that the parser is done.
            return null;
        }
    }

    class PersisterTask implements Callable
    {
        Persister persister;
        BlockingQueue queue;

        PersisterTask(Persister persister, BlockingQueue queue)
        {
            this.persister = persister;
            this.queue = queue;
        }

        public Object call()
        {
            List entities = new ArrayList(batchSize);

            // "done" is set to false when the parser is done, at which point
            // all remaining entities will be in the queue.
            while (!done || !queue.isEmpty())
            {
                try
                {
                    entities.add(queue.take());
                    if (entities.size() >= batchSize)
                    {
                        persister.insertBatch(entities);
                        entities.clear();
                    }
                }
                catch (InterruptedException e)
                {
                    log.error("Failed to load feed.", e);
                    throw new RuntimeException("Failed to load feed.", e);
                }
            }
            if (!entities.isEmpty())
            {
                persister.insertBatch(entities);
            }
            return null;
        }
    }
}

By allowing the parser and persister to run concurrently using two threads, the feed loaded with a throughput of 4608 entities per second, nearly a 50% improvement over the single threaded version.

There are two caveats to the code as written above: creating an ExecutorService for each loadData() isn’t ideal; it’s best to configure one for the application and resuse it, and also is must be shutdown before the application quits. I’ve skimped on error handling, which is fine if the Parser and Persister implementations don’t throw exceptions.

Conclusion

The ExecutorService and BlockingQueue provide the tools to make this improvement easy while keeping the code pretty readable. As always, we should be striving for readability, so adding unnecessary concurrency is never a good idea. And your mileage may vary depending on many things, including the hardware, network, data, server load… so do some testing to measure the real improvement in production.

Even if you don’t end up using it, it’s fun to experiment with and learn about concurrency issues. There are scenarios where the smart application of concurrency constructs can yield fantastic benefits. Check our Greg Luck’s recent blog on Ehcache performance for an example.

Updates: added caveats and call to ExecutorService.shutdown(). Fixed a typo in the PersisterTask.

Christian Nelson
Christian Nelson

Christian is a software developer, technical lead and agile coach. He's passionate about helping teams find creative ways to make work fun and productive. He's a partner at Carbon Five and serves as the Director of Engineering in the San Francisco office. When not slinging code or playing agile games, you can find him trekking in the Sierras and playing with his daughters.