Thursday, December 26, 2013

How big should a buffer be?


Following on free a previous post on buffer bloat, a good question is: how big should the socket buffer size be? Using my simple JStringServer code, I ran some tests and plotted some graphs and found a sweet spot (for me. YMMV).

As a bit of an aside, the R-language was used to generate the graphs, a task for which it is very well suited. I'm no R expert, so this can probably be done better. But this is what it looks like:

# Load the data from a CSV file with headers
mydata <- read.table("~/Documents/Docs/bufferBloat.txt", header=TRUE, sep=",")

# Aggregate the data and calculate the mean and standard deviation.
# Note: the do.call is to make the data into the right type

attach(do.call(data.frameag <- aggregate(. ~ SO_RCVBU, mydata, function(x) c(mean = mean(x), sd = sd(x)))), warn.conflicts=FALSE)

# Plot the graph calls-per-second (cps) against the server-side SO_RCVBUF 
# Note that the x-axis (SO_RCVBUF) uses a logarithmic scale
plot(SO_RCVBUcps.meanlog="x", ylim=c(3000, 5000), ylab="calls per second")

# add the title
title(main="Mac Book Pro client calls/second vs. server-side SO_RCVBU")

# Add the standard deviations using simple lines
# see http://stat.ethz.ch/R-manual/R-devel/library/graphics/html/segments.html
segments (SO_RCVBU, cps.mean - cps.sd, SO_RCVBU, cps.mean + cps.sd)

# copy the screen to disk (don't forget to close the file handle)
# see http://stackoverflow.com/questions/7144118/how-to-save-a-plot-as-image-on-the-disk
dev.copy(jpeg,filename="~/Documents/Docs/bufferBloat_cps_vs_SO_RCVBU.jpg")
dev.off()

# Now much the same for call duration vs. SO_RCVBUF
plot(SO_RCVBUduration.meanlog="x", ylab="calls duration (ms)")
title(main="Mac Book Pro client call time vs. server-side SO_RCVBUF")
dev.copy(jpeg,filename="~/Documents/Docs/bufferBloat_duration_vs_SO_RCVBU.jpg")
dev.off()

Call times
Call time (ms) vs SO_RCVBUF value

Throughput
Number of calls per second vs. SO_RCVBUF
The results were taken from a (old-ish) Mac Book Pro while a (new-ish) Mac Book Air was also stressing the (Linux) server.

The results show that the optimal size for SO_RCVBUF for this application is about 5000. A buffer size too small cripples throughput. But the throughput peaks quite quickly and further increasing it does not seem to help throughput.

Note: significantly increasing the buffer size does not terribly impact performance but I noticed that the client would occasionally throw this nasty exception:

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
at sun.nio.ch.IOUtil.read(IOUtil.java:169)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at com.google.code.jstringserver.client.WritingConnector.read(WritingConnector.java:88)
at com.google.code.jstringserver.client.WritingConnector.connected(WritingConnector.java:55)
at com.google.code.jstringserver.client.ConstantWritingConnector.connected(ConstantWritingConnector.java:74)
at com.google.code.jstringserver.client.Connector.doCall(Connector.java:39)
at com.google.code.jstringserver.client.ConstantWritingConnector.doCall(ConstantWritingConnector.java:58)
at com.google.code.jstringserver.client.Networker.run(Networker.java:18)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)

(Cause to be determined).

This only happened for large buffer sizes.

Further R reading

R-Statistics blog.
StatMethods

Saturday, December 21, 2013

FastLocks in the JVM - part 2

I was wondering why AtomicIntegerArrays etc were slow. Alternatively, why are other methods (notably using synchronized) so fast? Now, I'm not a machine code guru nor a JVM coder so please take some of this post with a pinch of salt. Here are my findings.

Let's write some Java code that plugs into the same simple framework I mentioned in the previous post by extending our AbstractReaderRunnable class but this time, let the reader access a plain, old int[] array in a synchronized method (similarly for the writer).

There's nothing clever in the Java code, but this is how the machine code generated by HotSpot looks:

.
.
08a    andq    R10, #7 # long
08e    cmpq    R10, #5
092    jne     B20  P=0.000001 C=-1.000000
.
.

(where R10 points to memory in the stack).

Here, we're checking the 3 lowest order bits (#7) of R10. I believe the most significant bit refers to whether biased locking is enabled for an object of this class and "as long as an object is unlocked, the last two bits have the value 01." [1] "These bits are commonly referred to as the object sync bits." [3].

This analysis seems to be confirmed by Dave Dice, who wrote much of this, when he describes "the operation of stack-locking[:] The inlined enter code emitted by the JIT will first fetch and examine the mark word. (That code first checks to see if the object is biased)... If the mark word is neutral as determined by the low-order bits of the mark, we'll try to use stack locking. First, in anticipation of a successful compare-and-swap (CAS), the code will store the just-fetched mark value into the on-frame word that was allocated by the JIT and is associated with that particular bytecode offset. Next, the inline enter code will attempt to CAS the address of that on-frame location over the mark word. If successful, we've locked the object." [6]

So, we're checking whether the object allows biased locking and is unlocked. If so, we jump to:

1af   B20: # B7 B21 <- 0.351352="" b19="" b5="" div="" nbsp="" req:="">
1af    leaq    R11, [rsp + #48] # box lock
1b4    fastlock RBP,R11,RAX,R10
205    je     B7  P=0.999999 C=-1.000000

Fastlock is not an x86 instruction but inlined JVM code that performs the locking [2]. I've not been able to find exactly what this refers to in the OpenJDK source code, so I had to resort to Googling.

"Synchronization on Java objects is done using a fast lock mechanism using light-weight lock records (referred to as fast locks) in most cases and only done using a real lock mechanism (referred to as inflated locks) when needed. The premise behind this implementation is that contention on Java locks are rare. Hence, there is no need to associate an inflated lock with the object until contention occurs. Using an inflated lock tends to be slower than just using a fast lock.

"When a thread attempts to lock the object, it uses atomic operations to check and set the sync bits as well as the header word. If the sync bits are CVM_LOCKSTATE_UNLOCKED, then there is no contention on this object yet. Within the same atomic operation, the header word will be replaced with a pointer to a fast lock record and the sync bits are set to CVM_LOCKSTATE_LOCKED ... If the same thread attempts to re-enter the lock on this object, it will find that the sync bits are already set to CVM_LOCKSTATE_LOCKED, and check to see if it is the owner of the fast lock. Since the current thread does own this fast lock, it simply increments the reentry count in the fast lock record and proceed with code execution." [3]

So, no expensive lock instructions in the uncontended case - unlike in AtomicIntegerArrays.

"If a different thread attempts to acquire the lock on this object, it will check and see that it is not the owner of the fast lock record. This is considered a contention case which will trigger the inflation of the lock." [3] "Deflation occurs at all stop-the-world safepoints" which occur frequently. There are 3 safepoints in the JITed read method alone.

In the inflated fast-path, "there's no need to block or wake threads (which requires 1 atomic for an enter-exit pair)." [6] ("The slow-path implementation is in native C++ code while the fast-path is emitted by the JITs.") The ObjectMonitor class (in objectMonitor.hpp)  has a linked list of ObjectWaiter objects which "serves as a "proxy" or surrogate thread" and SpinCallbackArguments (in objectMonitor.cpp) that allow Adaptive Spinning [7] which uses a spin-then-block strategy based on measured success. This is currently the limit of my knowledge on how the JVM manages high throughput in this area.

So, why care?

Dice describes the JVM's locking as "considerably better (lower latency and better scalability) than native pthreads synchronization implementations." [6] So, why isn't it used everywhere?

Well, as ever, there is a trade-off. It's described as "optimized for system-wide throughput at the expense of short-term thread-specific fairness" [4] and "HotSpot favors throughput [and] favor[s] recently-run threads" [5].

We can see this quite easily. Given a writer class that looks a little like this:

public abstract class AbstractWriterRunnable implements Runnable {
    
    protected int index = 0;
    protected int lap = 1;
    private long duration;

    @Override
    public void run() {
        long start = System.currentTimeMillis();
        while (index < ARRAY_SIZE) {
            setElementAndIncrementCounters();
        }
        duration = System.currentTimeMillis() - start;
    }

    private void setElementAndIncrementCounters() {
        setElement();
        index++;
    }
    
    protected abstract void setElement();
}

that runs in one thread and a reader class that looks like this:

public abstract class AbstractReaderRunnable implements Runnable, Detailed {
    
    protected int index = 0;
    protected int lap = 0;
    private long duration;
    private int doNotCompileMeAway;

    @Override
    public void run() {
        long start = System.currentTimeMillis();
        while (index < ARRAY_SIZE) {
            lap = elementAtIndex();
            while (lap == 0) {
                lap = elementAtIndex();
                doNotCompileMeAway |= lap;
            }
            index++;
        }
        duration = System.currentTimeMillis() - start;
    }
    
    protected abstract int elementAtIndex();
.
.

that runs in another thread, we can implement setElement and elementAtIndex in subclasses, one pair to read and write to an array in synchronized methods and another to get and set on an AtomicIntegerArray.

No Free Meal

The results are very interesting. After one run that throw away the results (as ever) to let the JVM warm up, they consistently look something like this:

Synchronized Array Access
read thread took 9550ms, writer thread took 6516ms

AtomicIntegerArray Access
read thread took 74693ms, writer thread took 74693ms

Synchronized array access is consistently better over all my runs but notice the difference between the times for read and write threads - the synchronized reader is 3 seconds behind the writer while the AtomicIntegerArray reader is hot on its writer's heals.

Presumably, is an example of synchronized access sacrificing "short-term thread-specific fairness" for increased throughput.


[1] Synchronization and Object Locking.
[2] "Deep dive into assembly code from Java" -Kohsuke Kawaguchi's blog.
[3] CDC HotSpot Implementation.
[4] Synchronization in Java SE 6 (HotSpot).
[5] Synchonrization.
[6] "Lets say you're interested in using HotSpot as a vehicle for synchronization research" - Dave Dice's blog.
[7] Java SE6 Performance White Paper.


Friday, December 20, 2013

FastLocks in the JVM

I mentioned how AtomicIntegerArray and its kin can be slow. I'm now a little closer to finding why.

I decompiled my code that has one thread writing to and one thread reading from an AtomicIntegerArray using the JVM arguments:

-Xmx32g -server -XX:+PrintOptoAssembly -XX:CompileCommand=print,*Runnable -XX:CompileThreshold=10000

Using Java version:

$ /usr/java/jdk1.7.0_10/bin/java -version
java version "1.7.0_10"
Java(TM) SE Runtime Environment (build 1.7.0_10-b18)
Java HotSpot(TM) 64-Bit Server VM (build 23.6-b04, mixed mode)

My code is testing reading and writing to an array structure. Initially, all values are 0. 

The writer thread goes through the array setting the values to 1. 

The reader thread follows the writer as best it can. If an element is 0 (that is, uninitialized) it spins waiting for the value to change to 1.

The Java code for reading looks something like this in the super class:

public abstract class AbstractReaderRunnable implements Runnable {    
    protected int index = 0;
    protected int lap = 0;

    @Override
    public void run() {

        while (index < ARRAY_SIZE) {
            lap = elementAtIndex();
            while (lap == 0) {
                lap = elementAtIndex();
            }
            index++;
        }
    }

    protected abstract int elementAtIndex();


and in my particular subclass (that is, a class for which the array structure is a AtomicIntegerArray) looks like this:


class AtomicReaderRunnable extends AbstractReaderRunnable {

    private final AtomicIntegerArray array;

    protected int elementAtIndex() {
        return array.get(index);
    }
.
.

Fairly simple. Now comes the hard part: the machine code.

Note: "In Intel syntax the first operand is the destination, and the second operand is the source whereas in AT&T syntax the first operand is the source and the second operand is the destination." [1]

HotSpot by default appear to print the assembly in Intel format, although you can change this [2]

069     movl    R10, [R13 + #24 (8-bit)]        # int ! Field com/phenry/concurrent/lockless/AbstractReaderRunnable.index
06d     movq    R8, [R11 + #16 (8-bit)] # ptr ! Field java/util/concurrent/atomic/AtomicIntegerArray.array
.
.
So, the R10 registry is our index field and R8 points to the int[] array in the Java AtomicIntegerArray class.
.
.
094     movl    R11, [R8 + #24 + R10 << #2]     # int
099     MEMBAR-acquire ! (empty encoding)
099
099     movl    [R13 + #28 (8-bit)], R11        # int ! Field com/phenry/concurrent/lockless/AbstractReaderRunnable.lap

In English: here we multiply R10 by 4 by shifting it twice to the left (<< #2) - because ints are 4 bytes wide - and apparently add this to the start of the array to find our memory address. The contents of this we put into the R11 registry. In turn, this is put in the memory address where we store our lap field in the Java world.

Note that HotSpot appears to have inlined methods [6]. The code generated by Hotspot is not necessarily isomorphic to the original Java code. Also note that it might recompile it to machine code more than once, apparently depending on the usage profile.

Now, the thing of interest is the MEMBAR-acquire ! (empty encoding) line. This is not an x86 instruction and doesn't even take an address (the left-most column). So, why is it there?

"Each CPU has its own peculiar memory-barrier instructions, which can make portability a challenge" [3] The x86 hardware has a NoOp for most membar operations:

void LIR_Assembler::membar() {
  // QQQ sparc TSO uses this,
  __ membar( Assembler::Membar_mask_bits(Assembler::StoreLoad));
}

void LIR_Assembler::membar_acquire() {
  // No x86 machines currently require load fences
  // __ load_fence();
}

void LIR_Assembler::membar_release() {
  // No x86 machines currently require store fences
  // __ store_fence();
}

void LIR_Assembler::membar_loadload() {
  // no-op
  //__ membar(Assembler::Membar_mask_bits(Assembler::loadload));
}

void LIR_Assembler::membar_storestore() {
  // no-op
  //__ membar(Assembler::Membar_mask_bits(Assembler::storestore));
}

void LIR_Assembler::membar_loadstore() {
  // no-op
  //__ membar(Assembler::Membar_mask_bits(Assembler::loadstore));
}

void LIR_Assembler::membar_storeload() {
  __ membar(Assembler::Membar_mask_bits(Assembler::StoreLoad));
}

(from Hotspot's source code that can be found in hotspot/src/cpu/x86/vm/c1_LIRAssembler_x86.cpp).

The membar method itself looks like this:

  // Serializes memory and blows flags
  void membar(Membar_mask_bits order_constraint) {
    if (os::is_MP()) {
      // We only have to handle StoreLoad
      if (order_constraint & StoreLoad) {
        // All usable chips support "locked" instructions which suffice
        // as barriers, and are much faster than the alternative of
        // using cpuid instruction. We use here a locked add [esp],0.
        // This is conveniently otherwise a no-op except for blowing
        // flags.
        // Any change to this code may need to revisit other places in
        // the code where this idiom is used, in particular the
        // orderAccess code.
        lock();
        addl(Address(rsp, 0), 0);// Assert the lock# signal here
      }
    }
  }

(from assembler_x86.hpp in the HotSpot source code).

And true to the comment, we do see lines output like:

     lock addl [rsp + #0], 0 ! membar_volatile

but it's in the writer code! The writer code looks like this:

070     movl    R8, [RCX + #28 (8-bit)] # int ! Field com/phenry/concurrent/lockless/AbstractWriterRunnable.lap
.
.
078     movl    R11, [RCX + #24 (8-bit)]        # int ! Field com/phenry/concurrent/lockless/AbstractWriterRunnable.index
07c     movq    R10, [R10 + #16 (8-bit)]        # ptr ! Field java/util/concurrent/atomic/AtomicIntegerArray.array
.
.
Similar to before we have a register that's our index field (R11) and  the int[] array in the Java AtomicIntegerArray class (R10). Also similar to before, we calculate the address that this index points to in the array but this time we populate its value with R8 (our lap field in our Java code).
.
.
08e   B8: #     B4 B9 <- 287336="" b7="" font="" nbsp="" req:="">
08e     MEMBAR-release ! (empty encoding)
08e
08e     movslq  R11, R11        # i2l
091     movl    [R10 + #24 + R11 << #2], R8     # int
096     lock addl [rsp + #0], 0 ! membar_volatile

As before we have the (redundant) MEMBAR pseudo instruction but we also have the very significant lock prefix.

The reader code does not need to worry since the membar has flushed the store buffers (which "enable our fast processors to run without blocking while data is transferred to and from the cache sub-system" [7]. These fences are more related to instruction ordering - another side effect of the synchronize semantics - than the cache subsystem). Furthermore:

"If a core wants to read some memory, and it does not have it ... then it must make a read on the ring bus.  It will then either be read from main-memory if not in the cache sub-systems, or read from L3 if clean, or snooped from another core if Modified.  In any case the read will never return a stale copy from the cache sub-system, it is guaranteed to be coherent." [7]

This seems to be a standard way of synchronizing outside the JVM [5]. So, why is it so slow? The JVM has some very clever code to avoid this heavy-weight approach that I will go into in a future post.

[1] http://www.ibiblio.org/gferg/ldp/GCC-Inline-Assembly-HOWTO.html
[2] http://stackoverflow.com/questions/9337670/hotspot7-hsdis-printassembly-intel-syntax
[5] StackOverflow.
[6] "Deep dive into assembly code from Java" -Kohsuke Kawaguchi's blog.
[7] "CPU Cache Flushing Fallacy" - Martin Thompson's blog.


Sunday, December 8, 2013

Synching Sites

A particular data centre of 40 nodes we have must also send all the data to a secondary, backup site. We use Oracle Coherence's Push Replication pattern to keep these two sites synchronised. How does it do this and how do we write tests for it?

Push Rep uses Coherence*Extend servers which "allows you to access remote Coherence caches using standard Coherence APIs" [1]. It uses TCP to do this, not TCMP (an asynchronous protocol that uses UDP for data transferal) as Coherence normally uses.

Typically, you should "configure Coherence*Extend proxy servers on a dedicated set of machines that server no other purpose. This will allow you to scale your proxy servers independently from the storage-enabled members of the cluster... Coherence*Extend proxy servers are full-blown TCMP members, so they should have fast and reliable communication with the rest of the cluster." [1]

This is not possible nor desirable for our LittleGrid testing since we're just testing functionality. We will, however, need some of the production-like config. On the Extend Server side, we need:
    <caching-schemes>
        <proxy-scheme>
            <service-name>ExtendTcpProxyService</service-name>
            <thread-count>5</thread-count>

            <acceptor-config>
                <tcp-acceptor>
                    <local-address>
                        <address>localhost</address>
                        <port>32199</port>
                    </local-address>
                </tcp-acceptor>
[Note: I reduced the thread count for my test. This value is 50 in production].

On the side of the cluster that is a client of this Extend server (the production site in our case), the config looks like:

   <caching-schemes>
        <remote-invocation-scheme>
            <scheme-name>remote-contingency-invocation</scheme-name>
            <service-name>RemoteContingencyInvocationService</service-name>
            <initiator-config>
                <tcp-initiator>
                    <remote-addresses>
                        <socket-address>
                            <address>localhost</address>
                            <port>32199</port>
                        </socket-address>
                    </remote-addresses>
                    <connect-timeout>30s</connect-timeout>
                </tcp-initiator>

Instead of the usual method of executing code against the data using EntryProcessors, PushRep executes an Invocable against an InvocationService. (Your implementation of Invocable can execute the EntryProcessors you know and love on the remote InvocationService but note that InvocationService.query does not take an affined key as execution of EntryProcessors do).

As for LittleGrid configuration, your primary config will need something like:

        Builder builder = newBuilder()
            .setCacheConfiguration("FILE_WITH_CLIENT_XML_ABOVE")
.
.

and the Extend server config appears to need something like:

            newBuilder().setStorageEnabledExtendProxyCount(1)...

[1] Oracle Coherence 3.5.