tweaked fastObjectPool

This commit is contained in:
nathan 2014-10-02 01:45:30 +02:00
parent 2cfdec4875
commit 99c32e9863
4 changed files with 58 additions and 88 deletions

View File

@ -1,4 +1,4 @@
package dorkbox.util; package dorkbox.util.objectPool;
/* /*
* *
@ -22,13 +22,36 @@ package dorkbox.util;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import sun.misc.Unsafe; import sun.misc.Unsafe;
public class FastObjectPool<T> { public class FastObjectPool<T> {
public static final Unsafe THE_UNSAFE;
static {
try {
final PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>() {
@Override
public Unsafe run() throws Exception {
Class<sun.misc.Unsafe> unsafeClass = sun.misc.Unsafe.class;
Field theUnsafe = unsafeClass.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Object unsafeObject = theUnsafe.get(null);
if (unsafeClass.isInstance(unsafeObject)) {
return unsafeClass.cast(unsafeObject);
}
throw new NoSuchFieldError("the Unsafe");
}
};
THE_UNSAFE = AccessController.doPrivileged(action);
}
catch (Exception e) {
throw new RuntimeException("Unable to load unsafe", e);
}
}
private Holder<T>[] objects; private Holder<T>[] objects;
@ -43,21 +66,23 @@ public class FastObjectPool<T> {
public ReentrantLock lock = new ReentrantLock(); public ReentrantLock lock = new ReentrantLock();
private ThreadLocal<Holder<T>> localValue = new ThreadLocal<>(); private ThreadLocal<Holder<T>> localValue = new ThreadLocal<>();
@SuppressWarnings("unchecked") public FastObjectPool(PoolFactory<T> factory, int size) {
public FastObjectPool(PoolFactory<T> factory , int size)
{
int newSize=1; int newSize = 1;
while(newSize<size) while (newSize < size) {
{
newSize = newSize << 1; newSize = newSize << 1;
} }
size = newSize; size = newSize;
this.objects = new Holder[size];
for(int x=0;x<size;x++) @SuppressWarnings({"unchecked", "rawtypes"})
{ Holder<T>[] stuff = new Holder[size];
this.objects = stuff;
for (int x=0;x<size;x++) {
this.objects[x] = new Holder<T>(factory.create()); this.objects[x] = new Holder<T>(factory.create());
} }
this.mask = size-1; this.mask = size-1;
this.releasePointer = size; this.releasePointer = size;
this.BASE = THE_UNSAFE.arrayBaseOffset(Holder[].class); this.BASE = THE_UNSAFE.arrayBaseOffset(Holder[].class);
@ -65,29 +90,23 @@ public class FastObjectPool<T> {
this.ASHIFT = 31 - Integer.numberOfLeadingZeros((int) this.INDEXSCALE); this.ASHIFT = 31 - Integer.numberOfLeadingZeros((int) this.INDEXSCALE);
} }
public Holder<T> take() public Holder<T> take() {
{
int localTakePointer; int localTakePointer;
Holder<T> localObject = this.localValue.get(); Holder<T> localObject = this.localValue.get();
if(localObject!=null) if (localObject != null) {
{ if(localObject.state.compareAndSet(Holder.FREE, Holder.USED)) {
if(localObject.state.compareAndSet(Holder.FREE, Holder.USED))
{
return localObject; return localObject;
} }
} }
while(this.releasePointer != (localTakePointer=this.takePointer) ) while (this.releasePointer != (localTakePointer=this.takePointer)) {
{
int index = localTakePointer & this.mask; int index = localTakePointer & this.mask;
Holder<T> holder = this.objects[index]; Holder<T> holder = this.objects[index];
//if(holder!=null && THE_UNSAFE.compareAndSwapObject(objects, (index*INDEXSCALE)+BASE, holder, null)) //if(holder!=null && THE_UNSAFE.compareAndSwapObject(objects, (index*INDEXSCALE)+BASE, holder, null))
if(holder!=null && THE_UNSAFE.compareAndSwapObject(this.objects, (index<<this.ASHIFT)+this.BASE, holder, null)) if (holder != null && THE_UNSAFE.compareAndSwapObject(this.objects, (index<<this.ASHIFT)+this.BASE, holder, null)) {
{
this.takePointer = localTakePointer+1; this.takePointer = localTakePointer+1;
if(holder.state.compareAndSet(Holder.FREE, Holder.USED)) if (holder.state.compareAndSet(Holder.FREE, Holder.USED)) {
{
this.localValue.set(holder); this.localValue.set(holder);
return holder; return holder;
} }
@ -96,72 +115,24 @@ public class FastObjectPool<T> {
return null; return null;
} }
public void release(Holder<T> object) throws InterruptedException public void release(Holder<T> object) throws InterruptedException {
{
this.lock.lockInterruptibly(); this.lock.lockInterruptibly();
try{
try {
int localValue=this.releasePointer; int localValue=this.releasePointer;
//long index = ((localValue & mask) * INDEXSCALE ) + BASE; //long index = ((localValue & mask) * INDEXSCALE ) + BASE;
long index = ((localValue & this.mask)<<this.ASHIFT ) + this.BASE; long index = ((localValue & this.mask)<<this.ASHIFT ) + this.BASE;
if(object.state.compareAndSet(Holder.USED, Holder.FREE))
{ if (object.state.compareAndSet(Holder.USED, Holder.FREE)) {
THE_UNSAFE.putOrderedObject(this.objects, index, object); THE_UNSAFE.putOrderedObject(this.objects, index, object);
this.releasePointer = localValue+1; this.releasePointer = localValue+1;
} }
else else {
{
throw new IllegalArgumentException("Invalid reference passed"); throw new IllegalArgumentException("Invalid reference passed");
} }
} }
finally finally {
{
this.lock.unlock(); this.lock.unlock();
} }
} }
public static class Holder<T>
{
private T value;
public static final int FREE=0;
public static final int USED=1;
private AtomicInteger state = new AtomicInteger(FREE);
public Holder(T value)
{
this.value = value;
}
public T getValue() {
return this.value;
}
}
public static interface PoolFactory<T>
{
public T create();
}
public static final Unsafe THE_UNSAFE;
static
{
try
{
final PrivilegedExceptionAction<Unsafe> action = new PrivilegedExceptionAction<Unsafe>()
{
@Override
public Unsafe run() throws Exception
{
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
}
};
THE_UNSAFE = AccessController.doPrivileged(action);
}
catch (Exception e)
{
throw new RuntimeException("Unable to load unsafe", e);
}
}
} }

View File

@ -221,18 +221,18 @@ public class Metadata {
lock.release(); lock.release();
// Sys.printArray(buf, buf.length, false, 0); // Sys.printArray(buf, buf.length, false, 0);
return buf; return buf;
} }
/** /**
* Reads the record data for the given record header. * Reads the record data for the given record header.
*/ */
@SuppressWarnings("unchecked")
<T> T readData(Kryo kryo, InflaterInputStream inputStream) throws IOException { <T> T readData(Kryo kryo, InflaterInputStream inputStream) throws IOException {
Input input = new Input(inputStream, 1024); // read 1024 at a time Input input = new Input(inputStream, 1024); // read 1024 at a time
Object readObject = kryo.readClassAndObject(input); @SuppressWarnings("unchecked")
return (T) readObject; T readObject = (T) kryo.readClassAndObject(input);
return readObject;
} }
/** /**

View File

@ -324,14 +324,14 @@ public class Storage {
} }
/** /**
* Reads a object using the default (blank) key * Reads a object using the default (blank) key, and casts it to the expected class
*/ */
public final <T> T get() { public final <T> T get() {
return get0(this.defaultKey); return get0(this.defaultKey);
} }
/** /**
* Reads a object using the specific key. * Reads a object using the specific key, and casts it to the expected class
*/ */
public final <T> T get(String key) { public final <T> T get(String key) {
ByteArrayWrapper wrap = wrap(key); ByteArrayWrapper wrap = wrap(key);

View File

@ -140,10 +140,10 @@ public class StorageBase {
this.kryo.setRegistrationRequired(false); this.kryo.setRegistrationRequired(false);
this.deflater = new Deflater(7, true); this.deflater = new Deflater(7, true);
this.outputStream = new DeflaterOutputStream(new FileOutputStream(this.file.getFD()), this.deflater, 1024, true); this.outputStream = new DeflaterOutputStream(new FileOutputStream(this.file.getFD()), this.deflater, 65536, true);
this.inflater = new Inflater(true); this.inflater = new Inflater(true);
this.inputStream = new InflaterInputStream(new FileInputStream(this.file.getFD()), this.inflater, 1024); this.inputStream = new InflaterInputStream(new FileInputStream(this.file.getFD()), this.inflater, 65536);
this.weight = .5F; this.weight = .5F;
this.memoryIndex = new ConcurrentHashMap<ByteArrayWrapper, Metadata>(this.numberOfRecords); this.memoryIndex = new ConcurrentHashMap<ByteArrayWrapper, Metadata>(this.numberOfRecords);
@ -250,8 +250,7 @@ public class StorageBase {
this.inflater.reset(); this.inflater.reset();
this.file.seek(meta.dataPointer); this.file.seek(meta.dataPointer);
@SuppressWarnings("unchecked") T readRecordData = meta.readData(this.kryo, this.inputStream);
T readRecordData = (T) meta.readData(this.kryo, this.inputStream);
if (readRecordData != null) { if (readRecordData != null) {
// now stuff it into our reference cache for future lookups! // now stuff it into our reference cache for future lookups!