Added intellij support. Compiled as java6. Pool.take() will wait if there are no more objects in the pool.

This commit is contained in:
nathan 2015-06-28 00:45:33 +02:00
parent 88147a0811
commit 399ee303c7
10 changed files with 159 additions and 216 deletions

View File

@ -1,6 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="src" path="test"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="/Dependencies/JCTools/JCTools-v1.1-alpha-MTAQ.jar" sourcepath="/Dependencies/JCTools/JCTools-1.1-alpha-MTAQ-src.zip"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="output" path="classes"/>
</classpath>

View File

@ -1,12 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/test" isTestSource="true" />
</content>
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="inheritedJdk" />
<orderEntry type="jdk" jdkName="1.6" jdkType="JavaSDK" />
<orderEntry type="module-library">
<library name="JUnit4">
<CLASSES>
<root url="jar://$APPLICATION_HOME_DIR$/lib/junit-4.11.jar!/" />
<root url="jar://$APPLICATION_HOME_DIR$/lib/hamcrest-core-1.3.jar!/" />
<root url="jar://$APPLICATION_HOME_DIR$/lib/hamcrest-library-1.3.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
<orderEntry type="library" name="JCTools (1.1 alpha MTAQ)" level="application" />
</component>
<component name="org.twodividedbyzero.idea.findbugs">

View File

@ -1,38 +0,0 @@
/*
* Copyright 2014 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.objectPool;
import java.lang.reflect.Field;
import java.security.PrivilegedExceptionAction;
public class GetUnsafe implements PrivilegedExceptionAction<sun.misc.Unsafe> {
GetUnsafe() {
}
@Override
public sun.misc.Unsafe run() throws Exception {
Class<sun.misc.Unsafe> unsafeClass = sun.misc.Unsafe.class;
Field theUnsafe = unsafeClass.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Object unsafeObject = theUnsafe.get(null);
if (unsafeClass.isInstance(unsafeObject)) {
return unsafeClass.cast(unsafeObject);
}
throw new NoSuchFieldError("the Unsafe");
}
}

View File

@ -15,14 +15,20 @@
*/
package dorkbox.util.objectPool;
public interface ObjectPool<T> {
/**
* Takes an object from the pool
*/
public ObjectPoolHolder<T> take();
public
interface ObjectPool<T> {
/**
* Takes an object from the pool
*/
T take() throws InterruptedException;
/**
* Return object to the pool
*/
public void release(ObjectPoolHolder<T> object);
/**
* Return object to the pool
*/
void release(T object);
/**
* @return a new object instance created by the pool.
*/
T newInstance();
}

View File

@ -16,15 +16,18 @@
package dorkbox.util.objectPool;
public class ObjectPoolFactory {
public
class ObjectPoolFactory {
private ObjectPoolFactory() {
private
ObjectPoolFactory() {
}
/**
* Creates a pool of the specified size
*/
public static <T> ObjectPool<T> create(PoolableObject<T> poolableObject, int size) {
public static
<T> ObjectPool<T> create(PoolableObject<T> poolableObject, int size) {
try {
// here we use FAST (via UNSAFE)
UnsafeObjectPool<T> fastObjectPool = new UnsafeObjectPool<T>(poolableObject, size);
@ -35,4 +38,4 @@ public class ObjectPoolFactory {
return slowObjectPool;
}
}
}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright 2014 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.objectPool;
import java.util.concurrent.atomic.AtomicBoolean;
public class ObjectPoolHolder<T> {
private T value;
AtomicBoolean state = new AtomicBoolean(true);
public ObjectPoolHolder(T value) {
this.value = value;
}
public T getValue() {
return this.value;
}
}

View File

@ -15,9 +15,10 @@
*/
package dorkbox.util.objectPool;
public interface PoolableObject<T> {
public
interface PoolableObject<T> {
/**
* called when a new instance is created
*/
public T create();
T create();
}

View File

@ -18,63 +18,39 @@
*/
package dorkbox.util.objectPool;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ArrayBlockingQueue;
class SafeObjectPool<T> implements ObjectPool<T> {
private static final boolean FREE = true;
private static final boolean USED = false;
private final LinkedBlockingDeque<ObjectPoolHolder<T>> queue;
private ThreadLocal<ObjectPoolHolder<T>> localValue = new ThreadLocal<>();
private final ArrayBlockingQueue<T> queue;
private final PoolableObject<T> poolableObject;
SafeObjectPool(PoolableObject<T> poolableObject, int size) {
this.poolableObject = poolableObject;
this.queue = new LinkedBlockingDeque<ObjectPoolHolder<T>>(size);
this.queue = new ArrayBlockingQueue<T>(size);
for (int x=0;x<size;x++) {
this.queue.add(new ObjectPoolHolder<T>(poolableObject.create()));
for (int x = 0; x < size; x++) {
this.queue.add(poolableObject.create());
}
}
@Override
public ObjectPoolHolder<T> take() {
// if we have an object available in the cache, use it instead.
ObjectPoolHolder<T> localObject = this.localValue.get();
if (localObject != null) {
if (localObject.state.compareAndSet(FREE, USED)) {
return localObject;
}
}
ObjectPoolHolder<T> holder = this.queue.poll();
if (holder == null) {
return null;
}
// the use of a threadlocal reference here helps eliminates contention. This also checks OTHER threads,
// as they might have one sitting on the cache
if (holder.state.compareAndSet(FREE, USED)) {
this.localValue.set(holder);
return holder;
} else {
// put it back into the queue
this.queue.offer(holder);
return null;
}
public
T take() throws InterruptedException {
return this.queue.take();
}
@Override
public void release(ObjectPoolHolder<T> object) {
if (object.state.compareAndSet(USED, FREE)) {
this.queue.offer(object);
}
else {
throw new IllegalArgumentException("Invalid reference passed");
}
public
void release(T object) {
this.queue.offer(object);
}
}
@Override
public
T newInstance() {
return poolableObject.create();
}
}

View File

@ -19,112 +19,74 @@
*/
package dorkbox.util.objectPool;
import java.security.AccessController;
import org.jctools.queues.MpmcArrayQueue;
import org.jctools.util.Pow2;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class UnsafeObjectPool<T> implements ObjectPool<T> {
private final MpmcArrayQueue<T> objects;
private final sun.misc.Unsafe unsafe;
private final Lock lock = new ReentrantLock();
private final Condition empty = lock.newCondition();
private final PoolableObject<T> poolableObject;
private static final boolean FREE = true;
private static final boolean USED = false;
UnsafeObjectPool(final PoolableObject<T> poolableObject, final int size) throws Throwable {
this.poolableObject = poolableObject;
int newSize = Pow2.roundToPowerOfTwo(size);
objects = new MpmcArrayQueue(newSize);
private final ObjectPoolHolder<T>[] objects;
private volatile int takePointer;
private volatile int releasePointer;
private final int mask;
private final long BASE;
private final long INDEXSCALE;
private final long ASHIFT;
public ReentrantLock lock = new ReentrantLock();
private ThreadLocal<ObjectPoolHolder<T>> localValue = new ThreadLocal<>();
UnsafeObjectPool(PoolableObject<T> poolableObject, int size) throws Throwable {
this.unsafe = AccessController.doPrivileged(new GetUnsafe());
if (this.unsafe == null) {
throw new Exception("Unable to load unsafe");
for (int x = 0; x < newSize; x++) {
objects.offer(poolableObject.create());
}
int newSize = 1;
while (newSize < size) {
newSize = newSize << 1;
}
size = newSize;
@SuppressWarnings({"unchecked", "rawtypes"})
ObjectPoolHolder<T>[] stuff = new ObjectPoolHolder[size];
this.objects = stuff;
for (int x=0;x<size;x++) {
this.objects[x] = new ObjectPoolHolder<T>(poolableObject.create());
}
this.mask = size-1;
this.releasePointer = size;
this.BASE = this.unsafe.arrayBaseOffset(ObjectPoolHolder[].class);
this.INDEXSCALE = this.unsafe.arrayIndexScale(ObjectPoolHolder[].class);
this.ASHIFT = 31 - Integer.numberOfLeadingZeros((int) this.INDEXSCALE);
}
@Override
public ObjectPoolHolder<T> take() {
int localTakePointer;
// if we have an object available in the cache, use it instead.
ObjectPoolHolder<T> localObject = this.localValue.get();
if (localObject != null) {
if (localObject.state.compareAndSet(FREE, USED)) {
return localObject;
public
T take() throws InterruptedException {
T poll = objects.poll();
if (poll == null) {
lock.lock();
try {
while ((poll = objects.poll()) == null) {
empty.await();
}
} finally {
lock.unlock();
}
}
sun.misc.Unsafe unsafe = this.unsafe;
return poll;
}
while (this.releasePointer != (localTakePointer=this.takePointer)) {
int index = localTakePointer & this.mask;
@Override
public
void release(T object) {
boolean waiting = objects.peek() == null;
ObjectPoolHolder<T> holder = this.objects[index];
//if(holder!=null && THE_UNSAFE.compareAndSwapObject(objects, (index*INDEXSCALE)+BASE, holder, null))
if (holder != null && unsafe.compareAndSwapObject(this.objects, (index<<this.ASHIFT)+this.BASE, holder, null)) {
this.takePointer = localTakePointer+1;
if (!objects.offer(object)) {
throw new RuntimeException("Unable to insert item " + object.getClass() + " into pool. take/release calls MUST be symmetric!");
}
// the use of a threadlocal reference here helps eliminates contention. This also checks OTHER threads,
// as they might have one sitting on the cache
if (holder.state.compareAndSet(FREE, USED)) {
this.localValue.set(holder);
return holder;
if (waiting) {
lock.lock();
if (objects.peek() == null) {
try {
// we only need to signal one, since the take/release calls must be symmetric
empty.signal();
} finally {
lock.unlock();
}
}
}
return null;
}
@Override
public void release(ObjectPoolHolder<T> object) {
try {
this.lock.lockInterruptibly();
int localValue = this.releasePointer;
//long index = ((localValue & mask) * INDEXSCALE ) + BASE;
long index = ((localValue & this.mask)<<this.ASHIFT ) + this.BASE;
if (object.state.compareAndSet(USED, FREE)) {
this.unsafe.putOrderedObject(this.objects, index, object);
this.releasePointer = localValue+1;
}
else {
throw new IllegalArgumentException("Invalid reference passed");
}
} catch (InterruptedException e) {
}
finally {
this.lock.unlock();
}
public
T newInstance() {
return poolableObject.create();
}
}
}

View File

@ -0,0 +1,50 @@
package dorkbox.util.objectPool;
import org.junit.Test;
/**
*
*/
public
class ObjectPoolFactoryTest {
public static
void main(String[] args) throws Exception {
new ObjectPoolFactoryTest().testCreate();
}
@Test
public
void testCreate() throws Exception {
final ObjectPool<Integer> pool = ObjectPoolFactory.create(new PoolableObject<Integer>() {
int id = 1;
@Override
public
Integer create() {
return this.id++;
}
}, 4);
Integer one = pool.take();
Integer two = pool.take();
Integer three = pool.take();
Integer four = pool.take();
pool.release(one);
pool.release(two);
pool.release(three);
pool.release(four);
one = pool.take();
two = pool.take();
pool.release(one);
pool.release(two);
three = pool.take();
four = pool.take();
one = pool.take();
two = pool.take();
}
}