Wednesday 16 August 2017

Extra concurrency utils on the JVM

I have a new set of concurrency utils released for taking advantage of multiple cores on the JVM. Often working at 4 or 8 cores contention is not a big deal - when operating on 32 or more cores, you can easily run into contention which will slow you down. There are ways to scale to more core counts but they generally require modification to your app structure.

At the same time there are a number of very commonly used abstractions on the JVM such as the OutputStream.

Wouldn't it be great if, rather than having to rewrite your application to take advantage of multiple threads, you could plug in a new library to take advantage of multiple cores?

This library aims to provide a solution to allow your code to scale:
https://github.com/jasonk000/concurrent

ParallelGZIPOutputStream - exactly what it says on the label, allows you to perform GZIP compression on long streams of data using multiple cores. This scales near-linearly to at least 16 cores so if you have the cores you can get 4, 8, 16x faster compression on your files.

BufferedOutputStream - the JVM BufferedOutputStream uses a synchronized{} block to ensure consistency when used from multiple threads. If you are producing a lot of data, and fast, from many threads, then synchronization overheads start to slow you down around 10 writers. This BufferedOutputStream solution will allow you to have multiple streams writing in for many more threads without impacting performance.

AsyncOutputStreamQueue - sometimes you just want a piece of the output work to happen on another thread - for example writing to IO. Ideally you might want to use something like a disruptor for this, however for something a lot simpler that you can plug in to your existing code, you could simply use this async module. Write to it, and the writes are queued and executed on another thread, which means the original writer can go about doing whatever it needs to do without being impacted by other slow activities.

CustomBlockingMpmcQueue - if you are using a standard Java executor, with small or moderate sized tasks, then concurrency on the queue loading tasks into the executor becomes an issue when scaling past 20+ cores. A small tweak on Nitsan's JCTools implementations, this allows you to use the fast MPMC queues with an Executor which allows for much snappier processing times through the queue when scaling up.

PS - it would be great to get this work upstreamed somewhere, if anyone has any suggestions.

Faster GZIP compression on the JVM

JVM compression benchmarks abound. Snappy and other compression libraries are often tagged as the go-to as they are quite a lot faster.

However, GZIP and the deflate algorithm is a very widely supported format with reasonably good performance. What if I told you you could get double the speed from gzip on Java?

Cloudflare have in fact done some excellent work tuning the open source zlib implementation, however this code is not available on the JVM, as it is a C library. The zlib implementation provided by Java is an older version, and overriding with LD_LIBRARY_PATH etc is not possible as the library is bundled.

Until now ..

I have packaged together some of the existing C and Java JNI code along with a few modifications, which allows you to call directly to an imported zlib library of your choosing! Include Cloudflare library for example and you will get more than 2x performance.

Benchmark results are excellent:
https://github.com/jasonk000/fastzlib

Benchmark                                Mode  Cnt   Score   Error  Units
BenchmarkCompressors.compressCloudflare  thrpt    5  43.760 ± 5.566  ops/s
BenchmarkCompressors.compressJvm         thrpt    5  19.533 ± 2.931  ops/s

Saturday 18 June 2016

Zero copy CSV processing

Recently I've had the opportunity to work on some text parsing problems and I have a simple CSV parsing implementation that I'd like to share.

By taking a fairly lightweight approach and focusing on not creating objects during parsing unless necessary, the parser is able to achieve some very good performance results even when compared to Univocity and Jackson parsers. The API exposes an ability to get to the raw buffer and byte offsets of the parsed values, which gives a significant performance boost (see the "no String" result below).

Performance results - fastest kid on the block


CAVEAT - It does have quite a few limitations in terms of character support and similar so it is not an apples to apples comparison. But, the majority of text I work with is encoded in a simple character set with only single row per line so this is sufficient for my needs and significantly faster. Your mileage may vary. Pull requests encouraged!

Now for the performance results: First CSV-Game and then Quick-CSV-Streamer results.

If you are already using a Streams based API I would definitely recommend using QuickCSV, it's going to be easier to adapt and probably "good enough".

The below results don't really highlight the end-to-end processing benefits you get from using zero through your entire processing chain but are nonetheless interesting to look at. Zero easily beats most of the general purpose parsers, however QuickCSV is actually ahead using paralel processing. On sequential processing, Zero can beat Quick when reading from a byte[] but not from an InputStream. Using the 'no String' API ensures Zero is faster than Quick.

CSV-Game result       CSV      Count
------------------------------------
CSVeed                4.871    -
BeanIO                1.126    -
Commons               0.848    -
Java                  0.328    -
OpenCSV               0.390    0.429
Univocity             0.272    0.313
ZeroCsv               0.141    0.161

And using the Quick-CSV test set. What's interesting in this set is that quite a bit of the total time is in the String / Double parsing rather than actual CSV work. Quick and Zero are both very quick.

Quick-CSV Test Set    Parse time  
----------------------------------------------
OpenCSV               1870 +- 86 ms/op
ZeroCsv (I'Stream)     786 +- 29 ms/op
QuickCSV Sequential    622 +- 45 ms/op
ZeroCsv (byte[])       512 +- 32 ms/op
ZeroCsv (no String)    371 +- 33 ms/op

QuickCSV Parallel      232 +- 11 ms/op

A sample of the API:

// provide a filename, the temporary buffer size,
// and the number of columns we should anticipate
String filename = "hello.csv"
int BUFFER_SIZE = 512*1024;
int MAX_COLS = 15;

// create the tokenizer
CsvTokenizer csv = new CsvTokenizer(new FileInputStream(filename), BUFFER_SIZE, MAX_COLS);

// start parsing
while (csv.parseLine()) {
  // fetch column count
  System.out.println("column count = " + csv.getCount());
  // fetch a value as a string (creates garbage)
  System.out.println("3rd column value = " + csv.getAsString(2));
  // or better, perform a write to System.out (without a String)
  System.out.write(csv.getLineBuffer(), csv.getOffset(2), csv.getLength(2));
  
}


Working with Disruptor


Using this with Disruptor is great. Your domain objects should instead of being objects, simply become pointers into the buffer. If you perform your CSV parsing and stream into a set of ints and a pointer to a byte[] then the Disruptor entries become very compact and simple to work with.

// in main loop
while(csv.parseLine()) {
    ringBuffer.publishEvent((line, seq, t) ->
        CityZ.copyTo(t, line), csv));
}
// see the CityZ code for an example

Other helpers are available such as getAsLong, getAsInt, getAsDouble. This allows for a nice API to work with the existing Java APIs that take a byte array, offset, and length. If the API gets cumbersome it is simple to switch to using Strings etc. This also allows you to skip deserialisation of columns that are not required.

If you are using an InputStream to process huge files, the Zero parser will allocate smaller buffers and rotate through them for you. As long as you eventually complete processing through disruptor the buffers will be released. (This will in fact generate garbage).

What's special


A significant part of the difference is in a few very small areas:
- Custom double and int parsing implementation
- Surprise branching shortcuts
- Avoid copying of buffers and instantiation of objects where possible
- Take some shortcuts with error checking

Further investigation


I might start benchmarking and profiling to compare my implementation vs QuickCSV. I do notice the character processing stream is inverted. But I also think the QuickCSV implementation has a lot of additional complexity. Perhaps if the two ideas are merged ....

QuickCSV also uses a custom Double parsing implementation.

In addition, QuickCSV seems to perform much quicker than ZeroCSV on the InputStream'd inputs. I'd like to investigate why that is.

The code?


Code is on github at https://github.com/jasonk000/zerocsv

Monday 29 September 2014

JcTools based queues

Improving further

Following on my previous post in which I investigated the performance benefits in writing a custom executor as compared to OOTB -- comments from both Nitsan and Rudiger suggest I should try the JCTools implementation for SPMC queues (Single producer multiple consumer).

As simple as importing the jctools JARs from Maven, and a couple of lines to change the Queue implementation and away we go ....

    private final Queue<Runnable> queue = new SpmcArrayQueue((int) Math.pow(2, 18));

Without much further ado, here are the results. I've re-executed all of the numbers so they may be slightly different.



(Updated with correct 256K elements - most notable difference is that JCTools does not suffer as much from contention at ~8 threads).

LinkedTransferQueue is still an excellent candidate

I did notice that LinkedTransferQueue is still looking very respectable, so as an OOTB JVM queue it's definitely worth considering. Based on observing the system during the run, it does have a relatively high garbage cost. But, if you have memory to burn it is a very good candidate to test. In fact, it seems like the LTQ would perform much better if it didn't cause so much GC work. This is impressive given it is MPMC capable.

Actually, what I do notice when running on the LTQ is that the GC pause times recorded by the JVM are significantly longer (up to 1 sec). This is interesting and I suspect it's a result of the unbounded queue. A few combinations of capped queue lengths showed significantly reduced GC times based on the logs (<10millisec compared to 1sec), however no overall difference in runtime. I suspect there is more lurking in the dark here, for another day.

And custom is still better

The jctools SPMC queue does in fact perform significantly quicker (4-16%) than the other queues tested. As contention increased, it actually performed much better. Looks like Nitsan's work has paid off. See his video here (though I'm yet to get a chance to see it).

Perhaps it is a little like cheating, and we should pass Runnables to the Disruptor instead, but the Disruptor as written is still ~2x as fast as pushing this through than using an Executor.


Sunday 21 September 2014

Writing a non-blocking Executor

One of the blogs I follow is Rudiger's blog - in particular this article with a light benchmark I find interesting  as it shows the clear performance advantage of the Disruptor, and also just how slow the out of the box services can be for particular use cases.

In the article, Rudiger tests out a number of different implementations of a pi calculator, and finds quickly that 1million small tasks is a nice way to put enough pressure on the concurrency frameworks to split out a winner. The out of the box java Executor service is actually relatively slow, coming in last position.

One of the items I have on my to-do list is actually to build a fast ExecutorService implementation, and fortunately after a busy few months I've had a chance to sit down and write it. For those new to concurrency on Java, the ExecutorService interface and the Executors factory methods provide an easy way to get into launching threads and handling off composable tasks to other threads for asynchronous work.

The work below borrows heavily from Rudiger's work but also the work at 1024cores.net, and of course I can't neglect to mention Nitsan Wakart's blog and Martin Thompson's numerous talks on the topic.

The target: Get a working ExecutorService that can be used in the Pi calculator and get a result that is better than out-of-the-box.

Getting a baseline

Of course, since my target is to beat the out-of-the-box frameworks, I spent some time downloading the gists and hooking up the libs in maven. Only thing is I didn't hook up the Abstraktor tests as I am not too familiar with the project (happy to take a pull request).

Interestingly, the out of the box services perform pretty much as described in Rudiger's post, with the caveat that disruptor services have a much sharper degradation in performance on this run. Worth mentioning is that this is only a 2-socket 8-core E5410's (the old Harpertown series), so not nearly as new as Rudiger's kit.

First cut

I've heard good things about the LinkedTransferQueue but not had a need to test it out. As a first cut, I decided to write up a simple Executor implementation that as part of the submit() call.

    private static class LtqExecutor<T> implements ExecutorService {

        private volatile boolean running;
        private volatile boolean stopped;
        private final int threadCount;
        private final BlockingQueue<Runnable> queue;
        private final Thread[] threads;

        public LtqExecutor(int threadCount) {
            this.threadCount = threadCount;
            this.queue = new LinkedTransferQueue();
            running = true;
            stopped = false;
            threads = new Thread[threadCount];
            for(int i = 0; i < threadCount; i++) {
                threads[i] = new Thread(new Worker());
                threads[i].start();
            }
        }

        public void execute(Runnable runnable) {
            try {
                queue.put(runnable);
            } catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        }

        private class Worker implements Runnable {
            public void run() {
                while(running) {
                    Runnable runnable = null;
                    try {
                        runnable = queue.take();
                    } catch (InterruptedException ie) {
                        // was interrupted - just go round the loop again,
                        // if running = false will cause it to exit
                    }
                    try {
                        if (runnable != null) { 
                            runnable.run();
                        }
                    } catch (Exception e) {
                        System.out.println("failed because of: " + e.toString());
                        e.printStackTrace();
                    }
                }
            }
        }

I also removed the test for getQueue.size(). Actually, performance of this was rather impressive on its own, achieving results in 1/3rd the time of the ootb services, while of course it comes nowhere near Disruptor.

threads disruptor2 threadpi custom / LTQ
1 954 1455 1111
2 498 1473 1318
3 327 2108 802
4 251 2135 762
5 203 2195 643
6 170 2167 581
7 7502 2259 653
8 9469 2444 5238

Trying some things

ArrayBlockingQueue and SynchronousQueue

I also tried substituting ABQ and SQ instead of LinkedTransferQueue, since my understanding is LTQ has some additional unnecessary overheads. Perhaps ABQ and SQ would be a little more cache friendly since ABQ is possibly better laid out in memory, and SQ as I've used it implementations before.

threads disruptor2 threadpi custom / LTQ custom / SQ custom / ABQ 1000
1 954 1455 1111 3738 2570
2 498 1473 1318 2638 2570
3 327 2108 802 3925 1875
4 251 2135 762 4035 1792
5 203 2195 643 4740 1692
6 170 2167 581 4635 1684
7 7502 2259 653 4362 1668
8 9469 2444 5238 4566 1677

Ouch. No need to mention these again.

Striping the LTQ

Clearly Disruptor's approach of limiting contended writes has a benefit. Would the LinkedTransferQueue get a little faster by striping the input queues - one per thread?

In this model, instead of having one queue into the Executor, each thread has its own thread, and the put() method round-robins across the threads.

Actually, this was a bit better for some workloads but with much faster degradation.

threads disruptor2 threadpi custom / LTQ Custom / striped LTQ
1 954 1455 1111 1237
2 498 1473 1318 1127
3 327 2108 802 713
4 251 2135 762 570
5 203 2195 643 498
6 170 2167 581 386
7 7502 2259 653 5264
8 9469 2444 5238 5262

Customising a queue

A ringbuffer based queue

With a baseline set, I knew from reading Nitsan's many posts and the 1024cores.net site, I new there were some optimisations I could get from the problem space. What about using a queue with a ringbuffer?

By introducing a custom lock-free queue and substituting it in my Executor logic, using a put().

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    long toWrite = nextSlotToWrite.get();
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite % capacity)] = r;
                        nextSlotToWrite.incrementAndGet();
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

While the take() looks like this.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) % capacity);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                        }
                    } else {
                        // cas failed, do not sleep
                    }
                }
                throw new InterruptedException();
            }

Can we go faster by SPMC?

In particular, for our case, there is only ever a single writer thread issuing put()s to the queue. Can we take advantage of this by tweaking making the queue an SPMC queue (single producer many consumer)?

            public void put(Runnable r) throws InterruptedException {
                while (true) {
                    if (toWrite < (lastConsumed.get() + capacity)) {
                        buffer[(int) (toWrite & REMAINDER_MASK)] = r;
                        toWrite += 1;
                        nextSlotToWrite.lazySet(toWrite);
                        break;
                    } else {
                        // no space in queue, sleep
                        LockSupport.parkNanos(1);
                    }
                }
            }

Note that I use a lazySet() to write, but this doesn't help, I'm speculating that it's really the .get()'s in the below that are causing the issues.

            public Runnable take() throws InterruptedException {
                while(!Thread.interrupted()) {
                    long lastFilled = nextSlotToWrite.get() - 1;
                    long lastConsumedCache = lastConsumed.get();
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1); 
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                    }
                }
                throw new InterruptedException();
            }

By this point I've also started to use 2^ sized queues (following the disruptor guidance on fast modulo). Actually, this didn't make a lot of difference but I've left it in.

Caching the get()s

How can I reduce contention when things are busy? Having a look at the take() method we can see that there are two .get() calls to atomic fields, this will be causing volatile reads from memory. On CAS failure, it's not always the case that we need to re-read the variable. So what happens if we move that to only happen during the "things are too busy" loop?

            public Runnable take() throws InterruptedException {
                long lastFilled = nextSlotToWrite.get() - 1;
                long lastConsumedCache = lastConsumed.get();
                while(!Thread.interrupted()) {
                    if (lastConsumedCache < lastFilled) {
                        // fetch the value from buffer first, to avoid a wrap race
                        int bufferslot = (int) ((lastConsumedCache + 1) & REMAINDER_MASK);
                        Runnable r = buffer[bufferslot];
                        // attempt to cas
                        boolean casWon = lastConsumed.compareAndSet(
                            lastConsumedCache, lastConsumedCache + 1);
                        if (casWon) {
                            return r;
                        } else {
                            // if cas failed, then we are not the owner so try again
                            // LockSupport.parkNanos(1);

                            // note that the lastFilled does not need to change just yet
                            // but probably the lastConsumed did
                            lastConsumedCache = lastConsumed.get();
                        }
                    } else {
                        // no values available to take, wait a while
                        LockSupport.parkNanos(1);
                        lastFilled = nextSlotToWrite.get() - 1;
                        // lastConsumedCache = lastConsumed.get();
                    }
                }
                throw new InterruptedException();
            }
        }

threads akkapi disruptor disruptor2 threadpi custom / LTQ Custom Lockfree Pp Tp Padded Custom with cached gets
1 2389 1300 954 1455 1111 1245 1257
2 1477 713 498 1473 1318 992 1007
3 1266 492 327 2108 802 676 695
4 1629 423 251 2135 762 548 559
5 1528 400 203 2195 643 538 515
6 1541 398 170 2167 581 545 510
7 1482 1085 7502 2259 653 603 562
8 1397 1297 9469 2444 5238 925 883

Not a lot of difference, but it does help our under-contention numbers, which is exactly what we'd expect.

The results

Actually, the results were good, I was pretty happy with them. They're a small fraction faster than the LinkedTransferQueue with what seems like better worst case performance. Nowhere near as fast as the Disruptor but frankly that's not surprising, given the energy and talent working on it.


Of course, this is not Production ready, and YMMV but I would welcome comments and questions. Particularly if there are any issues with correctness.

Code samples on github.

Thursday 6 March 2014

Chasing pointers with scissors .. or something

Nitsan W has a great article on the dangers of using on heap pointers within the JVM. In fact Gil has a classically clear post on the same topic.

Try as I might though, I couldn't find someone who had posted a concrete example. Especially since rumours seem to abound that it's a possibility, it seemed a little bizarre that there were plenty of examples of "how to access unsafe" but none on "what not to do".

So I thought, it might be fun to put together a demo to show just how quickly this can cause problems. It's a contrived example, of course, so in the real world I'm sure it will take longer to show up, and will probably be more random.

The code is pretty simple, and self explanatory (I guess). Review the code and then run the example.

Just so:

[jason@freebsd01 ~/code/examples/pointers]$ mvn clean compile exec:exec
<< the usual maven build things >>
[INFO][INFO] --- exec-maven-plugin:1.1:exec (default-cli) @ pointers ---
[INFO] [GC 2621K->1388K(502464K), 0.0042790 secs]
<< lots of GC while the vm starts up >>
<< and now we get down to business >>
[INFO] initialising array
[INFO] address: 34913266488
[INFO] native address: 34913266488
[INFO] checking array:
[INFO] seems ok
[INFO] seems ok
<< carries on for a while >>
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] making garbage
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] seems ok
[INFO] [GC 132255K->129303K(502464K), 0.3024900 secs]
[INFO] seems ok
[INFO] oops! expected: 0; but found: 109260096
[INFO] inserted to map -> 1013558; map size ended at -> 950748
[INFO] garbage generator finished
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.938s
[INFO] Finished at: Thu Mar 06 22:29:41 EST 2014
[INFO] Final Memory: 10M/151M
[INFO] ------------------------------------------------------------------------


Build success!

Tuesday 25 February 2014

Updating the Disruptor sample

Quick update to Disruptor sample

As a comment was posted on the previous blog asking about comparisons to the juc Executor implementations, I thought it was time to refresh the examples a little and update that with some additional examples and in particular a comparison to a thread pooled implementation (since that was the question).

The code simply does 10000 UDP "echo" packets, the service is expected to uppercase the characters. What I want to look at here is latency while bringing in an additional test to the mix.

This test is a pretty straightforward test - just looking to identify 99th percentile latency across a couple of scenarios. Since these are simple applications I've included a baseline blocking IO implementation to get a comparison.

Total runtime tests

The initial test is a total runtime test. This is a naive indicative number of relative performance. Test runs a number of 10K UDP request-response loops.



A single thread shows that the blocking IO send/receive loop is quicker than the Disruptor implementation, which is quicker than the threadpool. But - one issue; after more than one thread, the results are almost identical? What's going on here?

My initial assumption is that the log to file was causing issues as a cap on throughput, so let's remove it.

Latency with logging disabled


As you can see, similar results apply. Disruptor is only slightly slower than a blocking loop in the sleeping view. The threadpool is a bit further away but still close behind. Disruptor with busy spin waiting strategy - and the non blocking IO in a tight loop - results in the best latency. I think this requires a bit more investigation than I have time to understand but I would welcome comments.

Key observations

Suffice to say, similar key points come up as normally do:
  • A single thread processing is often the quickest approach. Avoid threads until there is a clear reason. Not only that, but the code is significantly shorter and simpler.
  • Disruptor is very quick for handoff between threads. In fact, the message passes between three threads nearly as quick as the single threaded code.
  • Disruptor response times are very sensitive to the choice of blocking strategy. By choosing the wrong strategy, you will burn 100pc CPU across multiple cores while getting worse latency.
  • Threadpool is "quick enough" for many cases. Unless your architecture and design is already very efficient, you're unlikely to notice magical performance gains without significant tuning to other areas of the codebase. It's probable that other areas of the codebase need optimisations first.

The test code and bench

I've tried to write the code in a fairly "natural" style for each approach, so the comparisons are not strictly apples to apples - I think they are more indicative of real-world usage. For example, the creation of a dedicated outbound socket for the threadpool approach is used, where the single threaded and disruptor code can quite easily create a single outbound UDP socket to transmit all responses. I'd be happy to take pull requests.

Beware when you are running the samples that you will need a quiet network to avoid any dropped UDP packets.

Running the client -> mvn clean package ; java -jar target/disruptor-1.0-SNAPSHOT.jar <<serveripaddress>> <<numclients>>
Running the server -> mvn exec:java -Dbio (or -Dthreadpool or -Ddisruptor), will open UDP port 9999.

The code.