Sunday, April 24, 2011

To Infinispan and Beyond!

I've been playing for the last few weeks with JBoss's Infinispan - a distributed cache and the successor to JBossCache.

It's a nice piece of technology. But for our needs, it may be inappropriate.

It may be OK for storing users' HTTP sessions and the like. But we were thinking about storing data for which integrity is absolutely essential (bond orders that run typically into the billions of dollars). Let me run through a small number of arguments that made me think Infinispan is not for us.

The Infinispan FAQ says it uses MVCC to optimize performance by having write locks not block read locks. However, dig deeper and you see this is not MVCC as most people who are familiar with Oracle and Postgres would know it. In these database implementations, a reading thread works with a snapshot of data. If a writing thread updates that row in the database, then that's OK. The reading thread continues with its snapshot. This is great since nobody is blocking anybody and it can improve performance.

However, with Infinispan, this is not quite what is happening. It's true that write threads do not block read threads. But the data in question may be mutable.

In the Infinispan mailing lists, we read:

"[I]n this case you may well see the change before [a write transaction] commits, we don't explicitly clone or copy mutable objects ... I suppose though we could add the ability to defensively copy mutable objects, but we'd need a way of knowing which are immutable, etc. Also, this would be more expensive, depending on the size of the atomic map."

A quick-and-dirty test I wrote demonstrates this:


package org.infinispan.replication;

import static org.testng.AssertJUnit.assertNull;

import java.util.concurrent.CountDownLatch;

import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;

import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.AbstractCacheTest.CleanupPhase;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "replication.SyncReplLockingAtomicityPHTest")
public class SyncReplLockingAtomicityPHTest extends MultipleCacheManagersTest {

private static final String CACHE_NAME = "testcache";

private static final String k = "key", v = "value";

public SyncReplLockingAtomicityPHTest() {
cleanup = CleanupPhase.AFTER_METHOD;
}

protected Configuration.CacheMode getCacheMode() {
return Configuration.CacheMode.REPL_SYNC;
}

protected void createCacheManagers() throws Throwable {
Configuration cfg = getDefaultClusteredConfig(getCacheMode(), true);
cfg.setLockAcquisitionTimeout(500);
createClusteredCaches(2, CACHE_NAME, cfg);
}

public void testUpdateWhileReadLock() throws Exception {
final Cache cache = cache(0, CACHE_NAME);
final CountDownLatch latchAfterRead = new CountDownLatch(1);
final CountDownLatch latchBeforeCommit = new CountDownLatch(1);
final ReaderThread readerRunnable = new ReaderThread(k, cache, latchAfterRead, latchBeforeCommit);
final UpdateThread updateRunnable = new UpdateThread(k, cache);
updateWhileReadTX(latchAfterRead, latchBeforeCommit, readerRunnable, updateRunnable);
}

private void updateWhileReadTX(
CountDownLatch latchAfterRead,
CountDownLatch latchBeforeReadCommit,
ReaderThread readerRunnable,
UpdateThread updateRunnable) throws SecurityException, IllegalStateException, RollbackException, HeuristicMixedException, HeuristicRollbackException, SystemException, NotSupportedException, InterruptedException {
Cache cache1 = cache(0, "testcache");
Cache cache2 = cache(1, "testcache");
assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
assertNull("Should be null", cache1.get(k));
assertNull("Should be null", cache2.get(k));

final StringBuffer initialObj = populateCache(cache1);
final Thread readThread = new Thread(readerRunnable);
readThread.start();
latchAfterRead.await();
final StringBuffer readObject = readerRunnable.getFromCache();

final Thread updateThread = new Thread(updateRunnable);
updateThread.start();
updateThread.join();
final StringBuffer writeObject = updateRunnable.getValue();
final String writeSnapshot = writeObject.toString();
final String readSnapshot = readObject.toString();

latchBeforeReadCommit.countDown();

AssertJUnit.assertSame(readObject, writeObject);
AssertJUnit.assertFalse(readSnapshot.equals(writeSnapshot));
}

private StringBuffer populateCache(Cache cache) throws SecurityException, IllegalStateException, RollbackException, HeuristicMixedException, HeuristicRollbackException, SystemException, NotSupportedException {
StringBuffer mutableObject = new StringBuffer("test");

TransactionManager mgr = TestingUtil.getTransactionManager(cache);
mgr.begin();
cache.getAdvancedCache().lock(k);
cache.put(k, mutableObject);
mgr.commit();
return mutableObject;
}

private class UpdateThread implements Runnable {

protected final String key;
protected final Cache cache;
protected final TransactionManager mgr;
private StringBuffer fromCache;

public UpdateThread(String k, Cache cache) {
this.key = k;
this.cache = cache;
mgr = TestingUtil.getTransactionManager(cache);
}

@Override
public void run() {
try {
mgr.begin();
getValueFromCache();
fromCache.append(System.currentTimeMillis());
cache.put(k, fromCache);
} catch (Exception x) {
x.printStackTrace();
} finally {
finishTx();
}

}

private void finishTx() {
try {
mgr.commit();
} catch (Exception x) {
x.printStackTrace();
}
}

private void getValueFromCache() {
fromCache = (StringBuffer) cache.get(key);
}

public StringBuffer getValue() {
return fromCache;
}

}


private class ReaderThread implements Runnable {

private final String key;
private final Cache cache;
private final CountDownLatch latchAfterRead, latchBeforeCommit;

private StringBuffer fromCache;

public ReaderThread(String k, Cache cache,
CountDownLatch latchAfterRead, CountDownLatch latchBeforeCommit) {
this.key = k;
this.cache = cache;
this.latchAfterRead = latchAfterRead;
this.latchBeforeCommit = latchBeforeCommit;
}

public StringBuffer getFromCache() {
return fromCache;
}

@Override
public void run() {
TransactionManager mgr = TestingUtil.getTransactionManager(cache);
try {
mgr.begin();
fromCache = (StringBuffer) cache.get(key);
latchAfterRead.countDown();
} catch (Exception x) {
x.printStackTrace();
} finally {
try {
latchBeforeCommit.await();
mgr.commit();
} catch (Exception x) {
x.printStackTrace();
}
}
}

}
}



The line in bold is where the test fails. The snapshot the read-thread has is the same as the write-thread, even though it read the object from the cache first. This is simply due to the fact that both read- and write-threads have the same object reference.

Note, if the write operations were rolled back, the read-thread would still see the change it had made. In fact, the whole cache from this point on would hold incorrect data.

This is not dissimilar to Hibernate updating an object (eg, assigning a primary key or a timestamp) even when the DB transaction it was in was rolled back. I've seen this happen when there were DB deadlocks and my transaction was chosen as the victim.

There are many other reasons why I think business critical data should not be put into Infinispan. I hope to present them in upcoming blog entries.

No comments:

Post a Comment