From 42230706c77fc98c5696c4b608645d2a312aa100 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 19 Apr 2015 14:25:12 +0200 Subject: [PATCH] WIP - got queue to no longer deadlock --- build.gradle | 10 - hs_err_pid26955.log | 742 ------------------ pom.xml | 281 ------- .../com/lmax/disruptor/DisruptorQueue.java | 2 +- .../com/lmax/disruptor/EventProcessor2.java | 2 +- .../com/lmax/disruptor/MessageHolder.java | 2 +- .../java/com/lmax/disruptor/MessageType.java | 7 +- .../com/lmax/disruptor/MessageTypeOLD.java | 8 + .../util/messagebus/MultiMBassador.java | 61 +- .../common/simpleq/MpmcExchangerQueue.java | 4 +- .../util/messagebus/common/simpleq/Node.java | 21 +- .../messagebus/common/simpleq/NodeState.java | 8 + .../messagebus/common/simpleq/NodeType.java | 13 - .../common/simpleq/SimpleQueue.java | 694 ++++++++-------- .../jctools/ConcurrentCircularArrayQueue.java | 2 +- .../simpleq/jctools/MpmcArrayQueue.java | 8 +- .../util/messagebus/MpmcQueueAltPerfTest.java | 70 +- .../MpmcQueueBaselineNodePerfTest.java | 2 +- .../util/messagebus/PerformanceTest.java | 69 +- .../messagebus/SimpleQueueAltPerfTest.java | 97 +++ 20 files changed, 594 insertions(+), 1509 deletions(-) delete mode 100644 build.gradle delete mode 100644 hs_err_pid26955.log delete mode 100644 pom.xml create mode 100644 src/main/java/com/lmax/disruptor/MessageTypeOLD.java create mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java delete mode 100644 src/main/java/dorkbox/util/messagebus/common/simpleq/NodeType.java create mode 100644 src/test/java/dorkbox/util/messagebus/SimpleQueueAltPerfTest.java diff --git a/build.gradle b/build.gradle deleted file mode 100644 index 9384e2d..0000000 --- a/build.gradle +++ /dev/null @@ -1,10 +0,0 @@ -usePlugin('java') - -group="org.mbassy" -version="1.1.2" - -dependencies { - addMavenRepo() - - testCompile "junit:junit:4.10@jar" -} \ No newline at end of file diff --git a/hs_err_pid26955.log b/hs_err_pid26955.log deleted file mode 100644 index e72b75f..0000000 --- a/hs_err_pid26955.log +++ /dev/null @@ -1,742 +0,0 @@ -# -# A fatal error has been detected by the Java Runtime Environment: -# -# SIGSEGV (0xb) at pc=0x00007f2e07f877a4, pid=26955, tid=139835696731904 -# -# JRE version: Java(TM) SE Runtime Environment (7.0_55-b13) (build 1.7.0_55-b13) -# Java VM: Java HotSpot(TM) 64-Bit Server VM (24.55-b03 mixed mode linux-amd64 compressed oops) -# Problematic frame: -# V [libjvm.so+0x9697a4] Unsafe_GetObject+0xc4 -# -# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again -# -# If you would like to submit a bug report, please visit: -# http://bugreport.sun.com/bugreport/crash.jsp -# - ---------------- T H R E A D --------------- - -Current thread (0x00007f2e0000a000): JavaThread "main" [_thread_in_vm, id=26960, stack(0x00007f2e08f7d000,0x00007f2e0907e000)] - -siginfo:si_signo=SIGSEGV: si_errno=0, si_code=1 (SEGV_MAPERR), si_addr=0x0000000000000010 - -Registers: -RAX=0x0000000000000000, RBX=0x00007f2e0000a000, RCX=0x0000000000000010, RDX=0x00007f2e08455a50 -RSP=0x00007f2e0907c650, RBP=0x00007f2e0907c6a0, RSI=0x00007f2e0907c750, RDI=0x00007f2e0000a1e8 -R8 =0x00000000ffffffff, R9 =0x0000000000005300, R10=0x00007f2dfd012cac, R11=0x00007f2e07ec5e40 -R12=0x0000000000000000, R13=0x0000000000000010, R14=0x00007f2e0000a1e8, R15=0x00007f2e0844be58 -RIP=0x00007f2e07f877a4, EFLAGS=0x0000000000010202, CSGSFS=0x0000000000000033, ERR=0x0000000000000004 - TRAPNO=0x000000000000000e - -Top of Stack: (sp=0x00007f2e0907c650) -0x00007f2e0907c650: 00007f2e0000a000 00007f2dfd01da03 -0x00007f2e0907c660: 00007f2e0000a000 00007f2d00000000 -0x00007f2e0907c670: 00007f2e0907c798 0000000708ce72b8 -0x00007f2e0907c680: 0000000000000000 0000000708ce72b8 -0x00007f2e0907c690: 00007f2e0907c750 00007f2e0000a000 -0x00007f2e0907c6a0: 00007f2e0907c718 00007f2dfd012cd8 -0x00007f2e0907c6b0: 0000000709241b7f 00007f2e000000b6 -0x00007f2e0907c6c0: 00007f2e0000add8 00007f2e00198b90 -0x00007f2e0907c6d0: 00007f2e0000a000 00007f2e0907c6d8 -0x00007f2e0907c6e0: 0000000000000000 00007f2e0907c750 -0x00007f2e0907c6f0: 0000000708cecc78 0000000000000000 -0x00007f2e0907c700: 0000000708ce72b8 0000000000000000 -0x00007f2e0907c710: 00007f2e0907c738 00007f2e0907c798 -0x00007f2e0907c720: 00007f2dfd006233 0000000000000000 -0x00007f2e0907c730: 00007f2dfd00ecdb 0000000000000010 -0x00007f2e0907c740: 00007f2dfd01d01f 0000000000000000 -0x00007f2e0907c750: 000000070de009a0 00007f2e0907c758 -0x00007f2e0907c760: 0000000709241b7f 00007f2e0907c7a8 -0x00007f2e0907c770: 0000000709248f30 0000000000000000 -0x00007f2e0907c780: 0000000709241b98 00007f2e0907c738 -0x00007f2e0907c790: 00007f2e0907c7a8 00007f2e0907c7f0 -0x00007f2e0907c7a0: 00007f2dfd006233 0000000000000000 -0x00007f2e0907c7b0: 00007f2e0907c7b0 000000070924200d -0x00007f2e0907c7c0: 00007f2e0907c888 0000000709248f30 -0x00007f2e0907c7d0: 0000000000000000 00000007092420e8 -0x00007f2e0907c7e0: 00007f2e0907c7a8 00007f2e0907c888 -0x00007f2e0907c7f0: 00007f2e0907c8d0 00007f2dfd006233 -0x00007f2e0907c800: 0000000000000000 0000000000000000 -0x00007f2e0907c810: 0000000000000090 0000000000000000 -0x00007f2e0907c820: 0000000000000000 0000000000000000 -0x00007f2e0907c830: 0000000000000001 0000000000000000 -0x00007f2e0907c840: 0000000000000001 0000000000000000 - -Instructions: (pc=0x00007f2e07f877a4) -0x00007f2e07f87784: 83 60 02 00 00 06 00 00 00 48 89 5d b0 74 04 49 -0x00007f2e07f87794: 8b 04 24 4c 8b 3d ca ef 48 00 41 80 3f 00 74 7c -0x00007f2e07f877a4: 42 8b 14 28 31 f6 85 d2 0f 85 ae 00 00 00 4c 89 -0x00007f2e07f877b4: f7 e8 c6 3d d0 ff 48 89 c3 48 8b 05 ac c9 48 00 - -Register to memory mapping: - -RAX=0x0000000000000000 is an unknown value -RBX=0x00007f2e0000a000 is a thread -RCX=0x0000000000000010 is an unknown value -RDX=0x00007f2e08455a50: in /home/user/dorkbox/eclipse/jre/jre/lib/amd64/server/libjvm.so at 0x00007f2e0761e000 -RSP=0x00007f2e0907c650 is pointing into the stack for thread: 0x00007f2e0000a000 -RBP=0x00007f2e0907c6a0 is pointing into the stack for thread: 0x00007f2e0000a000 -RSI=0x00007f2e0907c750 is pointing into the stack for thread: 0x00007f2e0000a000 -RDI=0x00007f2e0000a1e8 is an unknown value -R8 =0x00000000ffffffff is an unknown value -R9 =0x0000000000005300 is an unknown value -R10=0x00007f2dfd012cac is at code_begin+620 in an Interpreter codelet -method entry point (kind = native) [0x00007f2dfd012a40, 0x00007f2dfd0132a0] 2144 bytes -R11=0x00007f2e07ec5e40: in /home/user/dorkbox/eclipse/jre/jre/lib/amd64/server/libjvm.so at 0x00007f2e0761e000 -R12=0x0000000000000000 is an unknown value -R13=0x0000000000000010 is an unknown value -R14=0x00007f2e0000a1e8 is an unknown value -R15=0x00007f2e0844be58: in /home/user/dorkbox/eclipse/jre/jre/lib/amd64/server/libjvm.so at 0x00007f2e0761e000 - - -Stack: [0x00007f2e08f7d000,0x00007f2e0907e000], sp=0x00007f2e0907c650, free space=1021k -Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code) -V [libjvm.so+0x9697a4] Unsafe_GetObject+0xc4 -j sun.misc.Unsafe.getObject(Ljava/lang/Object;J)Ljava/lang/Object;+0 -j dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue.getMessage1(Ljava/lang/Object;)Ljava/lang/Object;+7 -j dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue.take()Ljava/lang/Object;+85 -j dorkbox.util.messagebus.MpmcQueueAltPerfTest.performanceRun(ILdorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue;)J+32 -j dorkbox.util.messagebus.MpmcQueueAltPerfTest.main([Ljava/lang/String;)V+86 -v ~StubRoutines::call_stub -V [libjvm.so+0x5f84b5] JavaCalls::call_helper(JavaValue*, methodHandle*, JavaCallArguments*, Thread*)+0x365 -V [libjvm.so+0x5f6f18] JavaCalls::call(JavaValue*, methodHandle, JavaCallArguments*, Thread*)+0x28 -V [libjvm.so+0x630109] jni_invoke_static(JNIEnv_*, JavaValue*, _jobject*, JNICallType, _jmethodID*, JNI_ArgumentPusher*, Thread*)+0x219 -V [libjvm.so+0x639192] jni_CallStaticVoidMethod+0x162 -C [libjli.so+0x36d9] JavaMain+0x7e9 - -Java frames: (J=compiled Java code, j=interpreted, Vv=VM code) -j sun.misc.Unsafe.getObject(Ljava/lang/Object;J)Ljava/lang/Object;+0 -j dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue.getMessage1(Ljava/lang/Object;)Ljava/lang/Object;+7 -j dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue.take()Ljava/lang/Object;+85 -j dorkbox.util.messagebus.MpmcQueueAltPerfTest.performanceRun(ILdorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue;)J+32 -j dorkbox.util.messagebus.MpmcQueueAltPerfTest.main([Ljava/lang/String;)V+86 -v ~StubRoutines::call_stub - ---------------- P R O C E S S --------------- - -Java Threads: ( => current thread ) - 0x00007f2e0019c800 JavaThread "Thread-1" [_thread_in_Java, id=27002, stack(0x00007f2de1f06000,0x00007f2de2007000)] - 0x00007f2dc4001000 JavaThread "Attach Listener" daemon [_thread_blocked, id=27000, stack(0x00007f2de1d03000,0x00007f2de1e04000)] - 0x00007f2e00120800 JavaThread "process reaper" daemon [_thread_blocked, id=26979, stack(0x00007f2de24cb000,0x00007f2de2504000)] - 0x00007f2e0009f800 JavaThread "Service Thread" daemon [_thread_blocked, id=26975, stack(0x00007f2de280d000,0x00007f2de290e000)] - 0x00007f2e0009d000 JavaThread "C2 CompilerThread1" daemon [_thread_blocked, id=26974, stack(0x00007f2de290e000,0x00007f2de2a0f000)] - 0x00007f2e0009a000 JavaThread "C2 CompilerThread0" daemon [_thread_blocked, id=26973, stack(0x00007f2de2a0f000,0x00007f2de2b10000)] - 0x00007f2e00090000 JavaThread "Signal Dispatcher" daemon [_thread_blocked, id=26972, stack(0x00007f2de2b10000,0x00007f2de2c11000)] - 0x00007f2e00079000 JavaThread "Finalizer" daemon [_thread_blocked, id=26971, stack(0x00007f2de32f3000,0x00007f2de33f4000)] - 0x00007f2e00075000 JavaThread "Reference Handler" daemon [_thread_blocked, id=26970, stack(0x00007f2de33f4000,0x00007f2de34f5000)] -=>0x00007f2e0000a000 JavaThread "main" [_thread_in_vm, id=26960, stack(0x00007f2e08f7d000,0x00007f2e0907e000)] - -Other Threads: - 0x00007f2e00072800 VMThread [stack: 0x00007f2de34f5000,0x00007f2de35f6000] [id=26969] - 0x00007f2e000b2000 WatcherThread [stack: 0x00007f2de270c000,0x00007f2de280d000] [id=26976] - -VM state:not at safepoint (normal execution) - -VM Mutex/Monitor currently owned by a thread: None - -Heap - PSYoungGen total 72704K, used 2498K [0x00000007af480000, 0x00000007b4580000, 0x0000000800000000) - eden space 62464K, 4% used [0x00000007af480000,0x00000007af6f0ab0,0x00000007b3180000) - from space 10240K, 0% used [0x00000007b3180000,0x00000007b3180000,0x00000007b3b80000) - to space 10240K, 0% used [0x00000007b3b80000,0x00000007b3b80000,0x00000007b4580000) - ParOldGen total 164864K, used 7104K [0x000000070de00000, 0x0000000717f00000, 0x00000007af480000) - object space 164864K, 4% used [0x000000070de00000,0x000000070e4f01e8,0x0000000717f00000) - PSPermGen total 21504K, used 6446K [0x0000000708c00000, 0x000000070a100000, 0x000000070de00000) - object space 21504K, 29% used [0x0000000708c00000,0x000000070924b910,0x000000070a100000) - -Card table byte_map: [0x00007f2e05ac5000,0x00007f2e06280000] byte_map_base: 0x00007f2e0227f000 - -Polling page: 0x00007f2e090a1000 - -Code Cache [0x00007f2dfd000000, 0x00007f2dfd270000, 0x00007f2e00000000) - total_blobs=292 nmethods=29 adapters=217 free_code_cache=48669Kb largest_free_block=49810368 - -Compilation events (10 events): -Event: 0.477 Thread 0x00007f2e0009a000 24 java.io.UnixFileSystem::normalize (75 bytes) -Event: 0.480 Thread 0x00007f2e0009a000 nmethod 24 0x00007f2dfd076a50 code [0x00007f2dfd076bc0, 0x00007f2dfd076e18] -Event: 0.485 Thread 0x00007f2e0009d000 25 dorkbox.util.messagebus.common.simpleq.jctools.ConcurrentSequencedCircularArrayQueue::calcSequenceOffset (9 bytes) -Event: 0.485 Thread 0x00007f2e0009a000 26 dorkbox.util.messagebus.common.simpleq.jctools.ConcurrentSequencedCircularArrayQueue::calcSequenceOffset (12 bytes) -Event: 0.485 Thread 0x00007f2e0009a000 nmethod 26 0x00007f2dfd07ae50 code [0x00007f2dfd07af80, 0x00007f2dfd07afd8] -Event: 0.485 Thread 0x00007f2e0009a000 27 dorkbox.util.messagebus.common.simpleq.jctools.ConcurrentSequencedCircularArrayQueue::soSequence (11 bytes) -Event: 0.485 Thread 0x00007f2e0009d000 nmethod 25 0x00007f2dfd076490 code [0x00007f2dfd0765c0, 0x00007f2dfd076638] -Event: 0.485 Thread 0x00007f2e0009a000 nmethod 27 0x00007f2dfd076210 code [0x00007f2dfd076340, 0x00007f2dfd076398] -Event: 0.486 Thread 0x00007f2e0009d000 29 % dorkbox.util.messagebus.common.simpleq.jctools.ConcurrentSequencedCircularArrayQueue:: @ 35 (61 bytes) -Event: 0.487 Thread 0x00007f2e0009d000 nmethod 29% 0x00007f2dfd06f450 code [0x00007f2dfd06f5a0, 0x00007f2dfd06f698] - -GC Heap History (4 events): -Event: 0.488 GC heap before -{Heap before GC invocations=1 (full 0): - PSYoungGen total 72704K, used 14896K [0x00000007af480000, 0x00000007b4580000, 0x0000000800000000) - eden space 62464K, 23% used [0x00000007af480000,0x00000007b030c030,0x00000007b3180000) - from space 10240K, 0% used [0x00000007b3b80000,0x00000007b3b80000,0x00000007b4580000) - to space 10240K, 0% used [0x00000007b3180000,0x00000007b3180000,0x00000007b3b80000) - ParOldGen total 164864K, used 0K [0x000000070de00000, 0x0000000717f00000, 0x00000007af480000) - object space 164864K, 0% used [0x000000070de00000,0x000000070de00000,0x0000000717f00000) - PSPermGen total 21504K, used 6443K [0x0000000708c00000, 0x000000070a100000, 0x000000070de00000) - object space 21504K, 29% used [0x0000000708c00000,0x000000070924ae18,0x000000070a100000) -Event: 0.492 GC heap after -Heap after GC invocations=1 (full 0): - PSYoungGen total 72704K, used 7264K [0x00000007af480000, 0x00000007b4580000, 0x0000000800000000) - eden space 62464K, 0% used [0x00000007af480000,0x00000007af480000,0x00000007b3180000) - from space 10240K, 70% used [0x00000007b3180000,0x00000007b3898340,0x00000007b3b80000) - to space 10240K, 0% used [0x00000007b3b80000,0x00000007b3b80000,0x00000007b4580000) - ParOldGen total 164864K, used 8K [0x000000070de00000, 0x0000000717f00000, 0x00000007af480000) - object space 164864K, 0% used [0x000000070de00000,0x000000070de02000,0x0000000717f00000) - PSPermGen total 21504K, used 6443K [0x0000000708c00000, 0x000000070a100000, 0x000000070de00000) - object space 21504K, 29% used [0x0000000708c00000,0x000000070924ae18,0x000000070a100000) -} -Event: 0.492 GC heap before -{Heap before GC invocations=2 (full 1): - PSYoungGen total 72704K, used 7264K [0x00000007af480000, 0x00000007b4580000, 0x0000000800000000) - eden space 62464K, 0% used [0x00000007af480000,0x00000007af480000,0x00000007b3180000) - from space 10240K, 70% used [0x00000007b3180000,0x00000007b3898340,0x00000007b3b80000) - to space 10240K, 0% used [0x00000007b3b80000,0x00000007b3b80000,0x00000007b4580000) - ParOldGen total 164864K, used 8K [0x000000070de00000, 0x0000000717f00000, 0x00000007af480000) - object space 164864K, 0% used [0x000000070de00000,0x000000070de02000,0x0000000717f00000) - PSPermGen total 21504K, used 6443K [0x0000000708c00000, 0x000000070a100000, 0x000000070de00000) - object space 21504K, 29% used [0x0000000708c00000,0x000000070924ae18,0x000000070a100000) -Event: 0.516 GC heap after -Heap after GC invocations=2 (full 1): - PSYoungGen total 72704K, used 0K [0x00000007af480000, 0x00000007b4580000, 0x0000000800000000) - eden space 62464K, 0% used [0x00000007af480000,0x00000007af480000,0x00000007b3180000) - from space 10240K, 0% used [0x00000007b3180000,0x00000007b3180000,0x00000007b3b80000) - to space 10240K, 0% used [0x00000007b3b80000,0x00000007b3b80000,0x00000007b4580000) - ParOldGen total 164864K, used 7104K [0x000000070de00000, 0x0000000717f00000, 0x00000007af480000) - object space 164864K, 4% used [0x000000070de00000,0x000000070e4f01e8,0x0000000717f00000) - PSPermGen total 21504K, used 6441K [0x0000000708c00000, 0x000000070a100000, 0x000000070de00000) - object space 21504K, 29% used [0x0000000708c00000,0x000000070924a658,0x000000070a100000) -} - -Deoptimization events (0 events): -No events - -Internal exceptions (10 events): -Event: 0.476 Thread 0x00007f2e0000a000 Threw 0x00000007af9cf800 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.476 Thread 0x00007f2e0000a000 Threw 0x00000007af9d2250 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.476 Thread 0x00007f2e0000a000 Threw 0x00000007af9d5920 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.477 Thread 0x00007f2e0000a000 Threw 0x00000007af9d86d8 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.477 Thread 0x00007f2e0000a000 Threw 0x00000007af9dc4e8 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.477 Thread 0x00007f2e0000a000 Threw 0x00000007af9e1b48 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.477 Thread 0x00007f2e0000a000 Threw 0x00000007af9e4c70 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.478 Thread 0x00007f2e0000a000 Threw 0x00000007af9e7e68 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.478 Thread 0x00007f2e0000a000 Threw 0x00000007af9f0298 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 -Event: 0.516 Thread 0x00007f2e0000a000 Threw 0x00000007af480538 at /HUDSON/workspace/7u-2-build-linux-amd64/jdk7u55/869/hotspot/src/share/vm/prims/jvm.cpp:1244 - -Events (10 events): -Event: 0.477 loading class 0x00007f2e0019e130 done -Event: 0.478 loading class 0x00007f2e00163280 -Event: 0.478 loading class 0x00007f2e00163280 done -Event: 0.478 loading class 0x00007f2e0019d6d0 -Event: 0.478 loading class 0x00007f2e0019d6d0 done -Event: 0.487 Executing VM operation: ParallelGCSystemGC -Event: 0.516 Executing VM operation: ParallelGCSystemGC done -Event: 0.516 loading class 0x00007f2e000b8fd0 -Event: 0.516 loading class 0x00007f2e000b8fd0 done -Event: 0.516 Thread 0x00007f2e0019c800 Thread added: 0x00007f2e0019c800 - - -Dynamic libraries: -00400000-00401000 r-xp 00000000 fc:02 16917903 /home/user/dorkbox/eclipse/jre/bin/java -00600000-00601000 rw-p 00000000 fc:02 16917903 /home/user/dorkbox/eclipse/jre/bin/java -0120b000-0122c000 rw-p 00000000 00:00 0 [heap] -708c00000-70a100000 rw-p 00000000 00:00 0 -70a100000-70de00000 rw-p 00000000 00:00 0 -70de00000-717f00000 rw-p 00000000 00:00 0 -717f00000-7af480000 rw-p 00000000 00:00 0 -7af480000-7b4580000 rw-p 00000000 00:00 0 -7b4580000-800000000 rw-p 00000000 00:00 0 -7f2da0000000-7f2da0021000 rw-p 00000000 00:00 0 -7f2da0021000-7f2da4000000 ---p 00000000 00:00 0 -7f2da8000000-7f2da8021000 rw-p 00000000 00:00 0 -7f2da8021000-7f2dac000000 ---p 00000000 00:00 0 -7f2dac000000-7f2dac033000 rw-p 00000000 00:00 0 -7f2dac033000-7f2db0000000 ---p 00000000 00:00 0 -7f2db0000000-7f2db0021000 rw-p 00000000 00:00 0 -7f2db0021000-7f2db4000000 ---p 00000000 00:00 0 -7f2db4000000-7f2db4021000 rw-p 00000000 00:00 0 -7f2db4021000-7f2db8000000 ---p 00000000 00:00 0 -7f2db8000000-7f2db8219000 rw-p 00000000 00:00 0 -7f2db8219000-7f2dbc000000 ---p 00000000 00:00 0 -7f2dbc000000-7f2dbc1db000 rw-p 00000000 00:00 0 -7f2dbc1db000-7f2dc0000000 ---p 00000000 00:00 0 -7f2dc0000000-7f2dc0021000 rw-p 00000000 00:00 0 -7f2dc0021000-7f2dc4000000 ---p 00000000 00:00 0 -7f2dc4000000-7f2dc4021000 rw-p 00000000 00:00 0 -7f2dc4021000-7f2dc8000000 ---p 00000000 00:00 0 -7f2dc8000000-7f2dc8021000 rw-p 00000000 00:00 0 -7f2dc8021000-7f2dcc000000 ---p 00000000 00:00 0 -7f2dcc460000-7f2dd4000000 rw-p 00000000 00:00 0 -7f2dd4000000-7f2dd4021000 rw-p 00000000 00:00 0 -7f2dd4021000-7f2dd8000000 ---p 00000000 00:00 0 -7f2dd8000000-7f2dd8021000 rw-p 00000000 00:00 0 -7f2dd8021000-7f2ddc000000 ---p 00000000 00:00 0 -7f2ddc000000-7f2ddc021000 rw-p 00000000 00:00 0 -7f2ddc021000-7f2de0000000 ---p 00000000 00:00 0 -7f2de198e000-7f2de1af9000 rw-p 00000000 00:00 0 -7f2de1af9000-7f2de1b03000 r-xp 00000000 fc:02 15997771 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libinstrument.so -7f2de1b03000-7f2de1d02000 ---p 0000a000 fc:02 15997771 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libinstrument.so -7f2de1d02000-7f2de1d03000 rw-p 00009000 fc:02 15997771 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libinstrument.so -7f2de1d03000-7f2de1d06000 ---p 00000000 00:00 0 -7f2de1d06000-7f2de1e04000 rw-p 00000000 00:00 0 [stack:27000] -7f2de1e2c000-7f2de1f06000 rw-p 00000000 00:00 0 -7f2de1f06000-7f2de1f09000 ---p 00000000 00:00 0 -7f2de1f09000-7f2de2007000 rw-p 00000000 00:00 0 [stack:27002] -7f2de2007000-7f2de2072000 r--s 00e1f000 fc:02 15603316 /home/user/dorkbox/eclipse/jre/lib/tools.jar -7f2de2072000-7f2de2082000 r-xp 00000000 fc:02 15997753 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libnio.so -7f2de2082000-7f2de2282000 ---p 00010000 fc:02 15997753 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libnio.so -7f2de2282000-7f2de2283000 rw-p 00010000 fc:02 15997753 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libnio.so -7f2de2283000-7f2de22b4000 rw-p 00000000 00:00 0 -7f2de22b4000-7f2de22ca000 r-xp 00000000 fc:02 15997793 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libnet.so -7f2de22ca000-7f2de24ca000 ---p 00016000 fc:02 15997793 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libnet.so -7f2de24ca000-7f2de24cb000 rw-p 00016000 fc:02 15997793 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libnet.so -7f2de24cb000-7f2de24ce000 ---p 00000000 00:00 0 -7f2de24ce000-7f2de2504000 rw-p 00000000 00:00 0 [stack:26979] -7f2de2504000-7f2de250c000 r-xp 00000000 fc:02 15997764 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libmanagement.so -7f2de250c000-7f2de270b000 ---p 00008000 fc:02 15997764 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libmanagement.so -7f2de270b000-7f2de270c000 rw-p 00007000 fc:02 15997764 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libmanagement.so -7f2de270c000-7f2de270d000 ---p 00000000 00:00 0 -7f2de270d000-7f2de280d000 rw-p 00000000 00:00 0 [stack:26976] -7f2de280d000-7f2de2810000 ---p 00000000 00:00 0 -7f2de2810000-7f2de290e000 rw-p 00000000 00:00 0 [stack:26975] -7f2de290e000-7f2de2911000 ---p 00000000 00:00 0 -7f2de2911000-7f2de2a0f000 rw-p 00000000 00:00 0 [stack:26974] -7f2de2a0f000-7f2de2a12000 ---p 00000000 00:00 0 -7f2de2a12000-7f2de2b10000 rw-p 00000000 00:00 0 [stack:26973] -7f2de2b10000-7f2de2b13000 ---p 00000000 00:00 0 -7f2de2b13000-7f2de2c11000 rw-p 00000000 00:00 0 [stack:26972] -7f2de2c11000-7f2de32f3000 r--p 00000000 fc:02 17046099 /usr/lib/locale/locale-archive -7f2de32f3000-7f2de32f6000 ---p 00000000 00:00 0 -7f2de32f6000-7f2de33f4000 rw-p 00000000 00:00 0 [stack:26971] -7f2de33f4000-7f2de33f7000 ---p 00000000 00:00 0 -7f2de33f7000-7f2de34f5000 rw-p 00000000 00:00 0 [stack:26970] -7f2de34f5000-7f2de34f6000 ---p 00000000 00:00 0 -7f2de34f6000-7f2de4000000 rw-p 00000000 00:00 0 [stack:26969] -7f2de4000000-7f2de4021000 rw-p 00000000 00:00 0 -7f2de4021000-7f2de8000000 ---p 00000000 00:00 0 -7f2de8000000-7f2de8021000 rw-p 00000000 00:00 0 -7f2de8021000-7f2dec000000 ---p 00000000 00:00 0 -7f2dec000000-7f2dec021000 rw-p 00000000 00:00 0 -7f2dec021000-7f2df0000000 ---p 00000000 00:00 0 -7f2df0000000-7f2df0021000 rw-p 00000000 00:00 0 -7f2df0021000-7f2df4000000 ---p 00000000 00:00 0 -7f2df4000000-7f2df4021000 rw-p 00000000 00:00 0 -7f2df4021000-7f2df8000000 ---p 00000000 00:00 0 -7f2df8000000-7f2df8021000 rw-p 00000000 00:00 0 -7f2df8021000-7f2dfc000000 ---p 00000000 00:00 0 -7f2dfc005000-7f2dfc02b000 r--s 0020e000 fc:02 14943167 /home/user/dorkbox/resources/Dependencies/netty/netty-all-4.1.0.jar -7f2dfc02b000-7f2dfd000000 rw-p 00000000 00:00 0 -7f2dfd000000-7f2dfd270000 rwxp 00000000 00:00 0 -7f2dfd270000-7f2e001e1000 rw-p 00000000 00:00 0 -7f2e001e1000-7f2e04000000 ---p 00000000 00:00 0 -7f2e04019000-7f2e042c9000 rw-p 00000000 00:00 0 -7f2e042c9000-7f2e04487000 r--s 039d0000 fc:02 15736512 /home/user/dorkbox/eclipse/jre/jre/lib/rt.jar -7f2e04487000-7f2e04d88000 rw-p 00000000 00:00 0 -7f2e04d88000-7f2e04d89000 ---p 00000000 00:00 0 -7f2e04d89000-7f2e04e89000 rw-p 00000000 00:00 0 [stack:26968] -7f2e04e89000-7f2e04e8a000 ---p 00000000 00:00 0 -7f2e04e8a000-7f2e04f8a000 rw-p 00000000 00:00 0 [stack:26967] -7f2e04f8a000-7f2e04f8b000 ---p 00000000 00:00 0 -7f2e04f8b000-7f2e0508b000 rw-p 00000000 00:00 0 [stack:26966] -7f2e0508b000-7f2e0508c000 ---p 00000000 00:00 0 -7f2e0508c000-7f2e0518c000 rw-p 00000000 00:00 0 [stack:26965] -7f2e0518c000-7f2e0518d000 ---p 00000000 00:00 0 -7f2e0518d000-7f2e0528d000 rw-p 00000000 00:00 0 [stack:26964] -7f2e0528d000-7f2e0528e000 ---p 00000000 00:00 0 -7f2e0528e000-7f2e0538e000 rw-p 00000000 00:00 0 [stack:26963] -7f2e0538e000-7f2e0538f000 ---p 00000000 00:00 0 -7f2e0538f000-7f2e0548f000 rw-p 00000000 00:00 0 [stack:26962] -7f2e0548f000-7f2e05490000 ---p 00000000 00:00 0 -7f2e05490000-7f2e0559b000 rw-p 00000000 00:00 0 [stack:26961] -7f2e0559b000-7f2e055b9000 rw-p 00000000 00:00 0 -7f2e055b9000-7f2e0560a000 rw-p 00000000 00:00 0 -7f2e0560a000-7f2e05ac5000 rw-p 00000000 00:00 0 -7f2e05ac5000-7f2e05ad0000 rw-p 00000000 00:00 0 -7f2e05ad0000-7f2e05aee000 rw-p 00000000 00:00 0 -7f2e05aee000-7f2e05b3f000 rw-p 00000000 00:00 0 -7f2e05b3f000-7f2e05ff9000 rw-p 00000000 00:00 0 -7f2e05ff9000-7f2e06022000 rw-p 00000000 00:00 0 -7f2e06022000-7f2e0627f000 rw-p 00000000 00:00 0 -7f2e0627f000-7f2e06280000 rw-p 00000000 00:00 0 -7f2e06280000-7f2e0629a000 r-xp 00000000 fc:02 15997747 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libzip.so -7f2e0629a000-7f2e0649a000 ---p 0001a000 fc:02 15997747 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libzip.so -7f2e0649a000-7f2e0649b000 rw-p 0001a000 fc:02 15997747 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libzip.so -7f2e0649b000-7f2e064a6000 r-xp 00000000 fc:02 27003108 /lib/x86_64-linux-gnu/libnss_files-2.19.so -7f2e064a6000-7f2e066a5000 ---p 0000b000 fc:02 27003108 /lib/x86_64-linux-gnu/libnss_files-2.19.so -7f2e066a5000-7f2e066a6000 r--p 0000a000 fc:02 27003108 /lib/x86_64-linux-gnu/libnss_files-2.19.so -7f2e066a6000-7f2e066a7000 rw-p 0000b000 fc:02 27003108 /lib/x86_64-linux-gnu/libnss_files-2.19.so -7f2e066a7000-7f2e066b2000 r-xp 00000000 fc:02 27003173 /lib/x86_64-linux-gnu/libnss_nis-2.19.so -7f2e066b2000-7f2e068b1000 ---p 0000b000 fc:02 27003173 /lib/x86_64-linux-gnu/libnss_nis-2.19.so -7f2e068b1000-7f2e068b2000 r--p 0000a000 fc:02 27003173 /lib/x86_64-linux-gnu/libnss_nis-2.19.so -7f2e068b2000-7f2e068b3000 rw-p 0000b000 fc:02 27003173 /lib/x86_64-linux-gnu/libnss_nis-2.19.so -7f2e068b3000-7f2e068ca000 r-xp 00000000 fc:02 27003162 /lib/x86_64-linux-gnu/libnsl-2.19.so -7f2e068ca000-7f2e06ac9000 ---p 00017000 fc:02 27003162 /lib/x86_64-linux-gnu/libnsl-2.19.so -7f2e06ac9000-7f2e06aca000 r--p 00016000 fc:02 27003162 /lib/x86_64-linux-gnu/libnsl-2.19.so -7f2e06aca000-7f2e06acb000 rw-p 00017000 fc:02 27003162 /lib/x86_64-linux-gnu/libnsl-2.19.so -7f2e06acb000-7f2e06acd000 rw-p 00000000 00:00 0 -7f2e06acd000-7f2e06ad6000 r-xp 00000000 fc:02 27003161 /lib/x86_64-linux-gnu/libnss_compat-2.19.so -7f2e06ad6000-7f2e06cd5000 ---p 00009000 fc:02 27003161 /lib/x86_64-linux-gnu/libnss_compat-2.19.so -7f2e06cd5000-7f2e06cd6000 r--p 00008000 fc:02 27003161 /lib/x86_64-linux-gnu/libnss_compat-2.19.so -7f2e06cd6000-7f2e06cd7000 rw-p 00009000 fc:02 27003161 /lib/x86_64-linux-gnu/libnss_compat-2.19.so -7f2e06cd7000-7f2e06d00000 r-xp 00000000 fc:02 15997776 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libjava.so -7f2e06d00000-7f2e06f00000 ---p 00029000 fc:02 15997776 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libjava.so -7f2e06f00000-7f2e06f02000 rw-p 00029000 fc:02 15997776 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libjava.so -7f2e06f02000-7f2e06f0f000 r-xp 00000000 fc:02 15997780 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libverify.so -7f2e06f0f000-7f2e0710e000 ---p 0000d000 fc:02 15997780 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libverify.so -7f2e0710e000-7f2e07110000 rw-p 0000c000 fc:02 15997780 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/libverify.so -7f2e07110000-7f2e07117000 r-xp 00000000 fc:02 27003170 /lib/x86_64-linux-gnu/librt-2.19.so -7f2e07117000-7f2e07316000 ---p 00007000 fc:02 27003170 /lib/x86_64-linux-gnu/librt-2.19.so -7f2e07316000-7f2e07317000 r--p 00006000 fc:02 27003170 /lib/x86_64-linux-gnu/librt-2.19.so -7f2e07317000-7f2e07318000 rw-p 00007000 fc:02 27003170 /lib/x86_64-linux-gnu/librt-2.19.so -7f2e07318000-7f2e0741d000 r-xp 00000000 fc:02 27003107 /lib/x86_64-linux-gnu/libm-2.19.so -7f2e0741d000-7f2e0761c000 ---p 00105000 fc:02 27003107 /lib/x86_64-linux-gnu/libm-2.19.so -7f2e0761c000-7f2e0761d000 r--p 00104000 fc:02 27003107 /lib/x86_64-linux-gnu/libm-2.19.so -7f2e0761d000-7f2e0761e000 rw-p 00105000 fc:02 27003107 /lib/x86_64-linux-gnu/libm-2.19.so -7f2e0761e000-7f2e08180000 r-xp 00000000 fc:02 15997803 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/server/libjvm.so -7f2e08180000-7f2e08380000 ---p 00b62000 fc:02 15997803 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/server/libjvm.so -7f2e08380000-7f2e08443000 rw-p 00b62000 fc:02 15997803 /home/user/dorkbox/eclipse/jre/jre/lib/amd64/server/libjvm.so -7f2e08443000-7f2e08484000 rw-p 00000000 00:00 0 -7f2e08484000-7f2e0863f000 r-xp 00000000 fc:02 27003167 /lib/x86_64-linux-gnu/libc-2.19.so -7f2e0863f000-7f2e0883e000 ---p 001bb000 fc:02 27003167 /lib/x86_64-linux-gnu/libc-2.19.so -7f2e0883e000-7f2e08842000 r--p 001ba000 fc:02 27003167 /lib/x86_64-linux-gnu/libc-2.19.so -7f2e08842000-7f2e08844000 rw-p 001be000 fc:02 27003167 /lib/x86_64-linux-gnu/libc-2.19.so -7f2e08844000-7f2e08849000 rw-p 00000000 00:00 0 -7f2e08849000-7f2e0884c000 r-xp 00000000 fc:02 27003118 /lib/x86_64-linux-gnu/libdl-2.19.so -7f2e0884c000-7f2e08a4b000 ---p 00003000 fc:02 27003118 /lib/x86_64-linux-gnu/libdl-2.19.so -7f2e08a4b000-7f2e08a4c000 r--p 00002000 fc:02 27003118 /lib/x86_64-linux-gnu/libdl-2.19.so -7f2e08a4c000-7f2e08a4d000 rw-p 00003000 fc:02 27003118 /lib/x86_64-linux-gnu/libdl-2.19.so -7f2e08a4d000-7f2e08a63000 r-xp 00000000 fc:02 15736033 /home/user/dorkbox/eclipse/jre/lib/amd64/jli/libjli.so -7f2e08a63000-7f2e08c63000 ---p 00016000 fc:02 15736033 /home/user/dorkbox/eclipse/jre/lib/amd64/jli/libjli.so -7f2e08c63000-7f2e08c64000 rw-p 00016000 fc:02 15736033 /home/user/dorkbox/eclipse/jre/lib/amd64/jli/libjli.so -7f2e08c64000-7f2e08c7d000 r-xp 00000000 fc:02 27003168 /lib/x86_64-linux-gnu/libpthread-2.19.so -7f2e08c7d000-7f2e08e7c000 ---p 00019000 fc:02 27003168 /lib/x86_64-linux-gnu/libpthread-2.19.so -7f2e08e7c000-7f2e08e7d000 r--p 00018000 fc:02 27003168 /lib/x86_64-linux-gnu/libpthread-2.19.so -7f2e08e7d000-7f2e08e7e000 rw-p 00019000 fc:02 27003168 /lib/x86_64-linux-gnu/libpthread-2.19.so -7f2e08e7e000-7f2e08e82000 rw-p 00000000 00:00 0 -7f2e08e82000-7f2e08ea5000 r-xp 00000000 fc:02 27003164 /lib/x86_64-linux-gnu/ld-2.19.so -7f2e08ead000-7f2e08eb1000 r--s 0008a000 fc:02 15736510 /home/user/dorkbox/eclipse/jre/jre/lib/jsse.jar -7f2e08eb1000-7f2e08eb4000 r--s 00013000 fc:02 21495935 /home/user/dorkbox/resources/Dependencies/jol-core-0.3.1.jar -7f2e08eb4000-7f2e08ebd000 r--s 0005c000 fc:02 20712771 /home/user/dorkbox/resources/Dependencies/logging/logback-core-1.0.13.jar -7f2e08ebd000-7f2e08ec7000 rw-p 00000000 00:00 0 -7f2e08ec7000-7f2e08f7d000 rw-p 00000000 00:00 0 -7f2e08f7d000-7f2e08f80000 ---p 00000000 00:00 0 -7f2e08f80000-7f2e09083000 rw-p 00000000 00:00 0 [stack:26960] -7f2e09083000-7f2e09084000 rw-p 00000000 00:00 0 -7f2e09084000-7f2e09087000 r--s 00011000 fc:02 20060212 /home/user/dorkbox/resources/Dependencies/disruptor/disruptor-3.3.0.jar -7f2e09087000-7f2e09088000 r--s 00002000 fc:02 14945430 /home/user/dorkbox/resources/Dependencies/concurrent-locks/concurrent-locks-1.0.0.jar -7f2e09088000-7f2e09089000 r--s 00004000 fc:02 15728677 /home/user/dorkbox/resources/Dependencies/kryo/reflectasm.jar -7f2e09089000-7f2e0908a000 r--s 0000c000 fc:02 20845186 /home/user/dorkbox/resources/Dependencies/asm/asm-5.0.3.jar -7f2e0908a000-7f2e0908f000 r--s 0003a000 fc:02 20712769 /home/user/dorkbox/resources/Dependencies/logging/logback-classic-1.0.13.jar -7f2e0908f000-7f2e09091000 r--s 0000c000 fc:02 16918224 /home/user/dorkbox/eclipse/plugins/org.hamcrest.core_1.3.0.v201303031735.jar -7f2e09091000-7f2e09097000 r--s 00036000 fc:02 17700265 /home/user/dorkbox/eclipse/plugins/org.junit_4.11.0.v201303080030/junit.jar -7f2e09097000-7f2e09098000 r--s 00005000 fc:02 20712766 /home/user/dorkbox/resources/Dependencies/logging/slf4j-api-1.7.5.jar -7f2e09098000-7f2e090a0000 rw-s 00000000 fc:02 13633475 /tmp/hsperfdata_user/26955 -7f2e090a0000-7f2e090a1000 rw-p 00000000 00:00 0 -7f2e090a1000-7f2e090a2000 r--p 00000000 00:00 0 -7f2e090a2000-7f2e090a4000 rw-p 00000000 00:00 0 -7f2e090a4000-7f2e090a5000 r--p 00022000 fc:02 27003164 /lib/x86_64-linux-gnu/ld-2.19.so -7f2e090a5000-7f2e090a6000 rw-p 00023000 fc:02 27003164 /lib/x86_64-linux-gnu/ld-2.19.so -7f2e090a6000-7f2e090a7000 rw-p 00000000 00:00 0 -7fff565c7000-7fff565e8000 rw-p 00000000 00:00 0 [stack] -7fff565e8000-7fff565ea000 r-xp 00000000 00:00 0 [vdso] -ffffffffff600000-ffffffffff601000 r-xp 00000000 00:00 0 [vsyscall] - -VM Arguments: -jvm_args: -Dfile.encoding=UTF-8 -java_command: dorkbox.util.messagebus.MpmcQueueAltPerfTest -Launcher Type: SUN_STANDARD - -Environment Variables: -PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games -SHELL=/bin/bash -DISPLAY=:0 - -Signal Handlers: -SIGSEGV: [libjvm.so+0x993980], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGBUS: [libjvm.so+0x993980], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGFPE: [libjvm.so+0x8106e0], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGPIPE: [libjvm.so+0x8106e0], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGXFSZ: [libjvm.so+0x8106e0], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGILL: [libjvm.so+0x8106e0], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGUSR1: SIG_DFL, sa_mask[0]=0x00000000, sa_flags=0x00000000 -SIGUSR2: [libjvm.so+0x811bd0], sa_mask[0]=0x00000004, sa_flags=0x10000004 -SIGHUP: [libjvm.so+0x812b40], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGINT: [libjvm.so+0x812b40], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGTERM: [libjvm.so+0x812b40], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 -SIGQUIT: [libjvm.so+0x812b40], sa_mask[0]=0x7ffbfeff, sa_flags=0x10000004 - - ---------------- S Y S T E M --------------- - -OS:jessie/sid - -uname:Linux 3.13.0-46-generic #79-Ubuntu SMP Tue Mar 10 20:06:50 UTC 2015 x86_64 -libc:glibc 2.19 NPTL 2.19 -rlimit: STACK 8192k, CORE 0k, NPROC 123695, NOFILE 4096, AS infinity -load average:1.52 1.02 1.16 - -/proc/meminfo: -MemTotal: 15863996 kB -MemFree: 1765612 kB -Buffers: 175376 kB -Cached: 1862528 kB -SwapCached: 369940 kB -Active: 8051080 kB -Inactive: 1975324 kB -Active(anon): 7632440 kB -Inactive(anon): 1566592 kB -Active(file): 418640 kB -Inactive(file): 408732 kB -Unevictable: 288 kB -Mlocked: 288 kB -SwapTotal: 33554428 kB -SwapFree: 31429596 kB -Dirty: 192 kB -Writeback: 4 kB -AnonPages: 7811544 kB -Mapped: 3827496 kB -Shmem: 1210532 kB -Slab: 339396 kB -SReclaimable: 234608 kB -SUnreclaim: 104788 kB -KernelStack: 10136 kB -PageTables: 136940 kB -NFS_Unstable: 0 kB -Bounce: 0 kB -WritebackTmp: 0 kB -CommitLimit: 41486424 kB -Committed_AS: 29935756 kB -VmallocTotal: 34359738367 kB -VmallocUsed: 389944 kB -VmallocChunk: 34359337300 kB -HardwareCorrupted: 0 kB -AnonHugePages: 2963456 kB -HugePages_Total: 0 -HugePages_Free: 0 -HugePages_Rsvd: 0 -HugePages_Surp: 0 -Hugepagesize: 2048 kB -DirectMap4k: 935372 kB -DirectMap2M: 15263744 kB -DirectMap1G: 0 kB - - -CPU:total 8 (4 cores per cpu, 2 threads per core) family 6 model 60 stepping 3, cmov, cx8, fxsr, mmx, sse, sse2, sse3, ssse3, sse4.1, sse4.2, popcnt, avx, avx2, aes, erms, ht, tsc, tscinvbit - -/proc/cpuinfo: -processor : 0 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 2401.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 0 -cpu cores : 4 -apicid : 0 -initial apicid : 0 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - -processor : 1 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 800.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 1 -cpu cores : 4 -apicid : 2 -initial apicid : 2 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - -processor : 2 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 1300.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 2 -cpu cores : 4 -apicid : 4 -initial apicid : 4 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - -processor : 3 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 800.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 3 -cpu cores : 4 -apicid : 6 -initial apicid : 6 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - -processor : 4 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 1600.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 0 -cpu cores : 4 -apicid : 1 -initial apicid : 1 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - -processor : 5 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 800.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 1 -cpu cores : 4 -apicid : 3 -initial apicid : 3 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - -processor : 6 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 1100.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 2 -cpu cores : 4 -apicid : 5 -initial apicid : 5 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - -processor : 7 -vendor_id : GenuineIntel -cpu family : 6 -model : 60 -model name : Intel(R) Core(TM) i7-4700HQ CPU @ 2.40GHz -stepping : 3 -microcode : 0x17 -cpu MHz : 1300.000 -cache size : 6144 KB -physical id : 0 -siblings : 8 -core id : 3 -cpu cores : 4 -apicid : 7 -initial apicid : 7 -fpu : yes -fpu_exception : yes -cpuid level : 13 -wp : yes -flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm ida arat epb xsaveopt pln pts dtherm tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid -bogomips : 4788.47 -clflush size : 64 -cache_alignment : 64 -address sizes : 39 bits physical, 48 bits virtual -power management: - - - -Memory: 4k page, physical 15863996k(1765612k free), swap 33554428k(31429596k free) - -vm_info: Java HotSpot(TM) 64-Bit Server VM (24.55-b03) for linux-amd64 JRE (1.7.0_55-b13), built on Mar 17 2014 19:43:58 by "java_re" with gcc 4.3.0 20080428 (Red Hat 4.3.0-8) - -time: Sun Apr 12 23:24:36 2015 -elapsed time: 0 seconds - diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 72680f0..0000000 --- a/pom.xml +++ /dev/null @@ -1,281 +0,0 @@ - - - - org.sonatype.oss - oss-parent - 7 - - - 4.0.0 - net.engio - mbassador - 1.2.1-SNAPSHOT - bundle - mbassador - - Mbassador is a fast and flexible message bus system following the publish subscribe pattern. - It is designed for ease of use and aims to be feature rich and extensible - while preserving resource efficiency and performance. - - It features: - declarative handler definition via annotations, - sync and/or async message delivery, - weak-references, - message filtering, - ordering of message handlers etc. - - - https://github.com/bennidi/mbassador - - - MIT license - http://www.opensource.org/licenses/mit-license.php - - - - git@github.com:bennidi/mbassador.git - scm:git:git@github.com:bennidi/mbassador.git - mbassador-1.2.0 - scm:git:git@github.com:bennidi/mbassador.git - - - - - bennidi - Benjamin Diedrichsen - +1 - b.diedrichsen@googlemail.com - - - - - 2.0.1 - 1.6 - 3.0.1 - - UTF-8 - 1.6 - file://${project.basedir}/mvn-local-repo - - - - - - junit - junit - 4.10 - test - - - - org.slf4j - slf4j-api - 1.7.5 - test - - - - org.slf4j - slf4j-log4j12 - 1.7.5 - test - - - - javax.el - el-api - provided - 2.2 - - - de.odysseus.juel - juel-impl - 2.2.7 - runtime - true - - - de.odysseus.juel - juel-spi - 2.2.7 - runtime - true - - - - - - - - - - - org.apache.felix - maven-bundle-plugin - 2.3.7 - true - - - ${project.groupId}.${project.artifactId} - {local-packages} - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${project.build.java.version} - ${project.build.java.version} - - - - - org.apache.maven.plugins - maven-release-plugin - 2.4 - - forked-path - - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - AllTests.java - - - - - - - org.apache.maven.plugins - maven-source-plugin - - - attach-sources - - jar - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - attach-javadocs - - jar - - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.9.1 - - true - public - true -
mbassador, ${project.version}
-
mbassador, ${project.version}
- mbassador, ${project.version} -
-
- - org.apache.maven.plugins - maven-scm-publish-plugin - 1.0-beta-2 - - ${project.build.directory}/scmpublish - Publishing javadoc for ${project.artifactId}:${project.version} - ${project.reporting.outputDirectory}/apidocs - true - scm:git:git@github.com:bennidi/mbassador.git - gh-pages - - -
-
- - - - release-sign-artifacts - - - performRelease - true - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.4 - - - sign-artifacts - verify - - sign - - - - - - - - -
diff --git a/src/main/java/com/lmax/disruptor/DisruptorQueue.java b/src/main/java/com/lmax/disruptor/DisruptorQueue.java index a6bd2cc..48044d0 100644 --- a/src/main/java/com/lmax/disruptor/DisruptorQueue.java +++ b/src/main/java/com/lmax/disruptor/DisruptorQueue.java @@ -68,7 +68,7 @@ public class DisruptorQueue { try { // System.err.println("+(" + seq + ") " + message1); MessageHolder eventJob = ringBuffer.get(seq); - eventJob.messageType = MessageType.ONE; + eventJob.messageType = MessageTypeOLD.ONE; eventJob.message1 = message1; // eventJob.message2 = message2; // eventJob.message3 = message3; diff --git a/src/main/java/com/lmax/disruptor/EventProcessor2.java b/src/main/java/com/lmax/disruptor/EventProcessor2.java index 9795f16..5c3821e 100644 --- a/src/main/java/com/lmax/disruptor/EventProcessor2.java +++ b/src/main/java/com/lmax/disruptor/EventProcessor2.java @@ -21,7 +21,7 @@ public class EventProcessor2 implements WorkHandlerEarlyRelease { @Override public void onEvent(long sequence, MessageHolder event) throws Exception { - MessageType messageType = event.messageType; + MessageTypeOLD messageType = event.messageType; switch (messageType) { case ONE: { Object message1 = event.message1; diff --git a/src/main/java/com/lmax/disruptor/MessageHolder.java b/src/main/java/com/lmax/disruptor/MessageHolder.java index 3932f51..63f6eb2 100644 --- a/src/main/java/com/lmax/disruptor/MessageHolder.java +++ b/src/main/java/com/lmax/disruptor/MessageHolder.java @@ -5,7 +5,7 @@ package com.lmax.disruptor; * @author dorkbox, llc Date: 2/2/15 */ public class MessageHolder { - public MessageType messageType = MessageType.ONE; + public MessageTypeOLD messageType = MessageTypeOLD.ONE; public Object message1 = null; public Object message2 = null; diff --git a/src/main/java/com/lmax/disruptor/MessageType.java b/src/main/java/com/lmax/disruptor/MessageType.java index dda4777..5aadb86 100644 --- a/src/main/java/com/lmax/disruptor/MessageType.java +++ b/src/main/java/com/lmax/disruptor/MessageType.java @@ -3,6 +3,9 @@ package com.lmax.disruptor; * @author dorkbox, llc * Date: 2/2/15 */ -public enum MessageType { - ONE, TWO, THREE, ARRAY +public class MessageType { + public static final short ONE = 1; + public static final short TWO = 2; + public static final short THREE = 3; + public static final short ARRAY = 4; } \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/MessageTypeOLD.java b/src/main/java/com/lmax/disruptor/MessageTypeOLD.java new file mode 100644 index 0000000..be7c410 --- /dev/null +++ b/src/main/java/com/lmax/disruptor/MessageTypeOLD.java @@ -0,0 +1,8 @@ +package com.lmax.disruptor; +/** + * @author dorkbox, llc + * Date: 2/2/15 + */ +public enum MessageTypeOLD { + ONE, TWO, THREE, ARRAY +} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java index 9594ec1..7c9613a 100644 --- a/src/main/java/dorkbox/util/messagebus/MultiMBassador.java +++ b/src/main/java/dorkbox/util/messagebus/MultiMBassador.java @@ -13,6 +13,7 @@ import dorkbox.util.messagebus.common.NamedThreadFactory; import dorkbox.util.messagebus.common.StrongConcurrentSetV8; import dorkbox.util.messagebus.common.simpleq.HandlerFactory; import dorkbox.util.messagebus.common.simpleq.SimpleQueue; +import dorkbox.util.messagebus.common.simpleq.jctools.Pow2; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; import dorkbox.util.messagebus.subscription.Subscription; @@ -83,9 +84,11 @@ public class MultiMBassador implements IMessageBus { * @param numberOfThreads how many threads to have for dispatching async messages */ public MultiMBassador(boolean forceExactMatches, int numberOfThreads) { - if (numberOfThreads < 1) { - numberOfThreads = 1; // at LEAST 1 thread + if (numberOfThreads < 2) { + numberOfThreads = 2; // at LEAST 2 threads } + numberOfThreads = Pow2.roundToPowerOfTwo(numberOfThreads); + this.numberOfThreads = numberOfThreads; // this.dispatchQueue = new DisruptorQueue(this, numberOfThreads); @@ -97,7 +100,7 @@ public class MultiMBassador implements IMessageBus { } }; - this.dispatchQueue = new SimpleQueue(numberOfThreads, 1<<14); + this.dispatchQueue = new SimpleQueue(numberOfThreads); this.subscriptionManager = new SubscriptionManager(numberOfThreads); this.threads = new ArrayDeque(numberOfThreads); @@ -108,31 +111,18 @@ public class MultiMBassador implements IMessageBus { Runnable runnable = new Runnable() { @Override public void run() { -// TransferQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; SimpleQueue IN_QUEUE = MultiMBassador.this.dispatchQueue; -// Runnable event = null; - int spins; Object message1; - while (true) { -// spins = maxSpins; -// while ((event = IN_QUEUE.poll()) == null) { -// if (spins > 100) { -// --spins; -// } else if (spins > 0) { -// --spins; -// LockSupport.parkNanos(1L); -// } else { + try { + while (true) { message1 = IN_QUEUE.take(); -// break; -// } -// } - -// IN_QUEUE.release(event); -// event.run(); publish(message1); - - } + } + } catch (InterruptedException e) { + Thread.interrupted(); + return; + } } }; @@ -441,8 +431,15 @@ public class MultiMBassador implements IMessageBus { // } // }; -// this.dispatchQueue.transfer(runnable); - this.dispatchQueue.put(message); + try { +// this.dispatchQueue.transfer(runnable); + this.dispatchQueue.put(message); + } catch (InterruptedException e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message)); + } } } @@ -456,7 +453,14 @@ public class MultiMBassador implements IMessageBus { } }; + try { this.dispatchQueue.put(runnable); + } catch (InterruptedException e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2)); + } } } @@ -471,7 +475,14 @@ public class MultiMBassador implements IMessageBus { }; + try { this.dispatchQueue.put(runnable); + } catch (InterruptedException e) { + handlePublicationError(new PublicationError() + .setMessage("Error while adding an asynchronous message") + .setCause(e) + .setPublishedObject(message1, message2, message3)); + } } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java index a6a5ddc..ee56971 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/MpmcExchangerQueue.java @@ -12,8 +12,8 @@ public final class MpmcExchangerQueue extends MpmcArrayQueueConsumerField static { try { TYPE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("nodeType")); - ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); - THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); + ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); + THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); } catch (NoSuchFieldException e) { throw new RuntimeException(e); } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java index 116fdb8..7dc0c75 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/Node.java @@ -1,32 +1,33 @@ package dorkbox.util.messagebus.common.simpleq; +import com.lmax.disruptor.MessageType; // mpmc sparse.shift = 2, for this to be fast. abstract class PrePad { - volatile long z0, z1, z2, z4, z5, z6 = 7L; +// volatile long z0, z1, z2, z4, z5, z6 = 7L; } -abstract class ColdItems extends PrePad { - public short nodeType = NodeType.FREE; - +abstract class ColdItems { + public short type = MessageType.ONE; + public boolean isConsumer = false; public Object item1 = null; public Object item2 = null; public Object item3 = null; public Object[] item4 = null; } -abstract class Pad1 extends ColdItems { - volatile long z0, z1, z2, z4, z5, z6 = 7L; +abstract class Pad0 extends ColdItems { +// volatile long z0, z1, z2, z4, z5, z6 = 7L; } -abstract class HotItem1 extends Pad1 { - public Thread thread; - +abstract class HotItem1 extends ColdItems { + public volatile Thread thread; } + public class Node extends HotItem1 { // post-padding - volatile long z0, z1, z2, z4, z5, z6 = 7L; +// volatile long z0, z1, z2, z4, z5, z6 = 7L; public Node() { } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java new file mode 100644 index 0000000..83102d9 --- /dev/null +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeState.java @@ -0,0 +1,8 @@ +package dorkbox.util.messagebus.common.simpleq; + +public class NodeState { + public static final short FREE = 0; + public static final short CANCELLED = 1; + + private NodeState() {} +} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeType.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeType.java deleted file mode 100644 index 920ef4a..0000000 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/NodeType.java +++ /dev/null @@ -1,13 +0,0 @@ -package dorkbox.util.messagebus.common.simpleq; -public class NodeType { - public static final short FREE = 0; - - public static final short P_INIT = 1; - public static final short P_DONE = 2; - - public static final short C_INIT = 1; - public static final short C_DONE = 2; - - - private NodeType() {} -} \ No newline at end of file diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java index 6655e22..d9ea9ff 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/SimpleQueue.java @@ -5,30 +5,95 @@ import static dorkbox.util.messagebus.common.simpleq.jctools.UnsafeAccess.UNSAFE import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; -import com.lmax.disruptor.MessageHolder; - import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueueConsumerField; public final class SimpleQueue extends MpmcArrayQueueConsumerField { + private final static long THREAD; + private static final long ITEM1_OFFSET; + private static final long TYPE; + private static final long CONSUMER; -// private final static long NODE_OFFSET; - private final static long MESSAGE1_OFFSET; - static { - try { -// NODE_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("n")); - MESSAGE1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - - // Prevent rare disastrous classloading in first call to LockSupport.park. - // See: https://bugs.openjdk.java.net/browse/JDK-8074773 - @SuppressWarnings("unused") - Class ensureLoaded = LockSupport.class; - LockSupport.unpark(Thread.currentThread()); + static { + try { + CONSUMER = UNSAFE.objectFieldOffset(Node.class.getField("isConsumer")); + THREAD = UNSAFE.objectFieldOffset(Node.class.getField("thread")); + TYPE = UNSAFE.objectFieldOffset(Node.class.getField("type")); + ITEM1_OFFSET = UNSAFE.objectFieldOffset(Node.class.getField("item1")); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } } + private static final void spIsConsumer(Object node, boolean value) { + UNSAFE.putBoolean(node, CONSUMER, value); + } + + private static final boolean lpIsConsumer(Object node) { + return UNSAFE.getBoolean(node, CONSUMER); + } + + private static final boolean lvIsConsumer(Object node) { + return UNSAFE.getBooleanVolatile(node, CONSUMER); + } + + + private static final void spType(Object node, short type) { + UNSAFE.putShort(node, TYPE, type); + } + + private static final short lpType(Object node) { + return UNSAFE.getShort(node, TYPE); + } + + private static final void soItem1(Object node, Object item) { + UNSAFE.putOrderedObject(node, ITEM1_OFFSET, item); + } + + private static final void spItem1(Object node, Object item) { + UNSAFE.putObject(node, ITEM1_OFFSET, item); + } + + private static final Object lvItem1(Object node) { + return UNSAFE.getObjectVolatile(node, ITEM1_OFFSET); + } + + private static final Object lpItem1(Object node) { + return UNSAFE.getObject(node, ITEM1_OFFSET); + } + + private static final Thread lvThread(Object node) { + return (Thread) UNSAFE.getObjectVolatile(node, THREAD); + } + + private static final Thread lpThread(Object node) { + return (Thread) UNSAFE.getObject(node, THREAD); + } + + private static final Object cancelledMarker = new Object(); + + private static final boolean lpIsCanceled(Object node) { + return cancelledMarker == UNSAFE.getObject(node, THREAD); + } + + private static final void spIsCancelled(Object node) { + UNSAFE.putObject(node, THREAD, cancelledMarker); + } + + private static final void soThread(Object node, Thread newValue) { + UNSAFE.putOrderedObject(node, THREAD, newValue); + } + + private static final void spThread(Object node, Thread newValue) { + UNSAFE.putObject(node, THREAD, newValue); + } + + private static final boolean casThread(Object node, Object expect, Object newValue) { + return UNSAFE.compareAndSwapObject(node, THREAD, expect, newValue); + } + + + /** The number of CPUs */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -41,28 +106,36 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { */ private static final int SPINS = NCPU == 1 ? 0 : 600; // orig: 2000 - private static final int SIZE = 1<<14; + /** + * The number of times to spin before blocking in timed waits. + * The value is empirically derived -- it works well across a + * variety of processors and OSes. Empirically, the best value + * seems not to vary with number of CPUs (beyond 2) so is just + * a constant. + */ + static final int maxTimedSpins = NCPU < 2 ? 0 : 32; + static final int negMaxTimedSpins = -maxTimedSpins; + + /** + * The number of times to spin before blocking in untimed waits. + * This is greater than timed value because untimed waits spin + * faster since they don't need to check times on each spin. + */ + static final int maxUntimedSpins = maxTimedSpins * 16; + + /** + * The number of nanoseconds for which it is faster to spin + * rather than to use timed park. A rough estimate suffices. + */ + static final long spinForTimeoutThreshold = 1000L; + + long p40, p41, p42, p43, p44, p45, p46; long p30, p31, p32, p33, p34, p35, p36, p37; -// // EMPTY == TRUE -// if (currentConsumerIndex == currentProducerIndex) { -// // automatically park, since we are the first one on the Q -// } -// -// -// // other consumers may have grabbed the element, or queue might be empty -// Node fbject = lpElement(calcElementOffset(currentConsumerIndex)); -// -// - - - private final int numberConsumerThreads; - - public SimpleQueue(int numberConsumerThreads, int size) { + public SimpleQueue(final int size) { super(size); - this.numberConsumerThreads = numberConsumerThreads; // pre-fill our data structures @@ -77,105 +150,41 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } } - - /** * PRODUCER */ - public void put(Object item) { + public void put(Object item) throws InterruptedException { + xfer(item, false, 0); + } + + + /** + * CONSUMER + */ + public Object take() throws InterruptedException { + return xfer(null, false, 0); + } + + private Object xfer(Object item, boolean timed, long nanos) throws InterruptedException { + + boolean isConsumer = item == null; // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = this.mask + 1; final long[] sBuffer = this.sequenceBuffer; -// long currentConsumerIndex; - long currentProducerIndex; - long pSeqOffset; - long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - - while (true) { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. -// currentConsumerIndex = lvConsumerIndex(); // LoadLoad - currentProducerIndex = lvProducerIndex(); // LoadLoad - - pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); - final long seq = lvSequence(sBuffer, pSeqOffset); // LoadLoad - final long delta = seq - currentProducerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) { - // Successful CAS: full barrier - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentProducerIndex, mask); - Node lpElement = lpElement(offset); - setMessage1(lpElement, item); - -// lpspElement(offset, fakeVal, NODE_OFFSET, MESSAGE1_OFFSET); -// spElement(offset, item); - - -// lpElement(offset); -// ((Node)e).setMessage1(null); -// node.setMessage1(((Node)item).getMessage1()); -// Object lpElement = lpElement(offset); -// setMessage1(lpElement, 445); -// this.fakeNode.setMessage1(Integer.valueOf(12)); -// e.setMessage1(((Node)item).item); - -////// NodeState2 message1 = e.getMessage1(); - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, pSeqOffset, currentProducerIndex + 1); // StoreStore - - return; - } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - currentProducerIndex - capacity <= cIndex && // test against cached cIndex - currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] -// return null; - busySpin(); - } - - // another producer has moved the sequence by one, retry 2 - - // only producer will busySpin if contention -// busySpin(); - } - } - - private static final void setMessage1(Object node, Object item) { -// final Object o = UNSAFE.getObject(node, NODE_OFFSET); - - UNSAFE.putObject(node, MESSAGE1_OFFSET, item); - } - - private static final Object getMessage1(Object node) { -// final Object o = UNSAFE.getObject(node, NODE_OFFSET); - return UNSAFE.getObject(node, MESSAGE1_OFFSET); - } - - /** - * CONSUMER - * @return null iff empty - */ - public Object take() { - // local load of field to avoid repeated loads after volatile reads - final long mask = this.mask; - final long[] sBuffer = this.sequenceBuffer; - long currentConsumerIndex; -// long currentProducerIndex; + long currentProducerIndex; + long cSeqOffset; - long pIndex = -1; // start with bogus value, hope we don't need it + long pSeqOffset; + + long pSeq; + long pDelta = -1; + + boolean sameMode = false; + boolean empty = false; while (true) { // Order matters! @@ -183,224 +192,195 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is // nothing we can do to make this an exact method. currentConsumerIndex = lvConsumerIndex(); // LoadLoad -// currentProducerIndex = lvProducerIndex(); // LoadLoad + currentProducerIndex = lvProducerIndex(); // LoadLoad + // empty or same mode + // check what was last placed on the queue + if (currentProducerIndex == currentConsumerIndex) { + empty = true; + } else { + final long previousProducerIndex = currentProducerIndex - 1; - cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); - final long seq = lvSequence(sBuffer, cSeqOffset); // LoadLoad - final long delta = seq - (currentConsumerIndex + 1); + final long ppSeqOffset = calcSequenceOffset(previousProducerIndex, mask); + final long ppSeq = lvSequence(sBuffer, ppSeqOffset); // LoadLoad + final long ppDelta = ppSeq - previousProducerIndex; - if (delta == 0) { - if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) { - // Successful CAS: full barrier + if (ppDelta == 1) { + // same mode check // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex, mask); - final Object e = lpElement(offset); - Object item = getMessage1(e); -// final Node node = (Node)e; -// -// Object item = node.getMessage1(); -// soElement(offset, null); + final long offset = calcElementOffset(previousProducerIndex, mask); + Object element = lpElement(offset); + sameMode = lpIsConsumer(element) == isConsumer; + } else if (ppDelta < 1 && // slot has not been moved by producer + currentConsumerIndex >= currentProducerIndex && // test against cached pIndex + currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(sBuffer, cSeqOffset, currentConsumerIndex + mask + 1); // StoreStore - - return item; + // is empty + empty = true; + } else { + // hasn't been moved yet. retry 2 + busySpin(); + continue; } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - currentConsumerIndex >= pIndex && // test against cached pIndex - currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] -// return null; - - // contention. we WILL have data in the Q, we just got to it too quickly - busySpin(); } - // another consumer beat us and moved sequence ahead, retry 2 - // only producer busyspins - } - } + + if (empty || sameMode) { + // push+park onto queue + + // we add ourselves to the queue and wait + pSeqOffset = calcSequenceOffset(currentProducerIndex, mask); + pSeq = lvSequence(sBuffer, pSeqOffset); // LoadLoad + pDelta = pSeq - currentProducerIndex; + if (pDelta == 0) { + // this is expected if we see this first time around + final long nextProducerIndex = currentProducerIndex + 1; + if (casProducerIndex(currentProducerIndex, nextProducerIndex)) { + // Successful CAS: full barrier + // it is possible that two threads check the queue at the exact same time, + // BOTH can think that the queue is empty, resulting in a deadlock between threads + // it is ALSO possible that the consumer pops the previous node, and so we thought it was not-empty, when + // in reality, it is. + currentConsumerIndex = lvConsumerIndex(); + + if (empty && currentProducerIndex != currentConsumerIndex) { + // RESET the push of this element. + empty = false; + casProducerIndex(nextProducerIndex, currentProducerIndex); + continue; + } else if (sameMode && currentProducerIndex == currentConsumerIndex) { + sameMode = false; + casProducerIndex(nextProducerIndex, currentProducerIndex); + continue; + } + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(currentProducerIndex, mask); + final Object element = lpElement(offset); + spIsConsumer(element, isConsumer); + spThread(element, Thread.currentThread()); + if (isConsumer) { + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore - - - public void putOLD(Object message1) throws InterruptedException { -// // decrement count -// // <0: no consumers available, add to Q, park and wait -// // >=0: consumers available, get one from the parking lot -// -// Thread myThread = Thread.currentThread(); -// for (;;) { -// final int count = this.currentCount.get(); -// if (this.currentCount.compareAndSet(count, count - 1)) { -// if (count <= 0) { -// // <=0: no consumers available (PUSH_P, PARK_P) -// Node producer = this.producersWaiting.put(); -// if (producer == null || producer.item == null) { -// System.err.println("KAPOW"); -// } -// producer.item.message1 = message1; -// -// if (!park(producer, myThread)) { -// throw new InterruptedException(); -// } -// -// return; -// } else { -// // >0: consumers available (TAKE_C, UNPARK_C) -// Node consumer = this.consumersWaiting.take(); -// while (consumer == null) { -//// busySpin(); -// consumer = this.consumersWaiting.take(); -// } -// -// consumer.item.message1 = message1; -// -// unpark(consumer, myThread); -// return; -// } -// } -// -// // contention -// busySpin(); -// } - } - - public void takeOLD(MessageHolder item) throws InterruptedException { -// // increment count -// // >=0: no producers available, park and wait -// // <0: producers available, get one from the Q -// -// Thread myThread = Thread.currentThread(); -// for (;;) { -// final int count = this.currentCount.get(); -// if (this.currentCount.compareAndSet(count, count + 1)) { -// if (count >= 0) { -// // >=0: no producers available (PUT_C, PARK_C) -// Node consumer = this.consumersWaiting.put(); -// -// if (!park(consumer, myThread)) { -// throw new InterruptedException(); -// } -// if (consumer.item == null || consumer.item.message1 == null) { -// System.err.println("KAPOW"); -// } -// item.message1 = consumer.item.message1; -// -// return; -// } else { -// // <0: producers available (TAKE_P, UNPARK_P) -// Node producer = this.producersWaiting.take(); -// while (producer == null) { -//// busySpin(); -// producer = this.producersWaiting.take(); -// } -// -// item.message1 = producer.item.message1; -// unpark(producer, myThread); -// -// if (item.message1 == null) { -// System.err.println("KAPOW"); -// } -// -// return; -// } -// } -// -// // contention -// busySpin(); -// } - } - - /** - * @param myThread - * @return false if we were interrupted, true if we were unparked by another thread - */ - private boolean park(Node myNode, Thread myThread) { - PaddedObject waiter = myNode.waiter; - Thread thread; - - for (;;) { - thread = waiter.get(); - if (waiter.compareAndSet(thread, myThread)) { - if (thread == null) { - // busy spin for the amount of time (roughly) of a CPU context switch - int spins = SPINS; - for (;;) { - if (spins > 0) { - --spins; - } else if (waiter.get() != myThread) { - break; + // now we wait + park(element, timed, nanos); + return lvItem1(element); } else { - // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. - LockSupport.park(); - if (myThread.isInterrupted()) { - waiter.set(null); - return false; - } - break; + spItem1(element, item); + + // increment sequence by 1, the value expected by consumer + // (seeing this value from a producer will lead to retry 2) + soSequence(sBuffer, pSeqOffset, nextProducerIndex); // StoreStore + + // now we wait + park(element, timed, nanos); + return null; } } - -// do { -// // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. -// LockSupport.park(); -// if (myThread.isInterrupted()) { -// myNode.waiter.set(null); -// return false; -// } -// } while (myNode.waiter.get() == myThread); - - waiter.set(null); - return true; - } else if (thread != myThread) { - // no parking - return true; - } else { - // contention + // failed cas, retry 1 + } else if (pDelta < 0 && // poll has not moved this value forward + currentProducerIndex - capacity <= currentConsumerIndex && // test against cached cIndex + currentProducerIndex - capacity <= (currentConsumerIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] + // return; busySpin(); } - } - } - } + } else { + // complimentary mode - /** - * Unparks the other node (if it was waiting) - */ - private void unpark(Node otherNode, Thread myThread) { - PaddedObject waiter = otherNode.waiter; - Thread thread; + // get item + cSeqOffset = calcSequenceOffset(currentConsumerIndex, mask); + final long cSeq = lvSequence(sBuffer, cSeqOffset); // LoadLoad + final long nextConsumerIndex = currentConsumerIndex + 1; + final long cDelta = cSeq - nextConsumerIndex; - for (;;) { - thread = waiter.get(); - if (waiter.compareAndSet(thread, myThread)) { - if (thread == null) { - // no parking - return; - } else if (thread != myThread) { - // park will always set the waiter back to null - LockSupport.unpark(thread); - return; - } else { - // contention + if (cDelta == 0) { + // on 64bit(no compressed oops) JVM this is the same as seqOffset + final long offset = calcElementOffset(currentConsumerIndex, mask); + final Object element = lpElement(offset); + final Thread thread = lpThread(element); + + if (isConsumer) { + if (thread == null || // is cancelled/fulfilled already + !casThread(element, thread, null)) { // failed cas state + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + } + continue; + } + + // success + Object item1 = lpItem1(element); + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + LockSupport.unpark(thread); + + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + return item1; + } + + continue; + } else { + soItem1(element, item); + + if (thread == null || // is cancelled/fulfilled already + !casThread(element, thread, null)) { // failed cas state + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + } + + continue; + } + + // success + + // pop off queue + if (casConsumerIndex(currentConsumerIndex, nextConsumerIndex)) { + // Successful CAS: full barrier + + LockSupport.unpark(thread); + soSequence(sBuffer, cSeqOffset, nextConsumerIndex + mask); // StoreStore + + return null; + } + + // lost CAS + busySpin(); + continue; + } + + } else if (cDelta < 0 && // slot has not been moved by producer + currentConsumerIndex >= currentProducerIndex && // test against cached pIndex + currentConsumerIndex == (currentProducerIndex = lvProducerIndex())) { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + // return null; busySpin(); + } } + // contention. + busySpin(); } } - private static final void busySpin() { // busy spin for the amount of time (roughly) of a CPU context switch - int spins = SPINS; + int spins = maxUntimedSpins; for (;;) { if (spins > 0) { --spins; @@ -410,19 +390,73 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { } } - public boolean hasPendingMessages() { - // count the number of consumers waiting, it should be the same as the number of threads configured -// return this.consumersWaiting.size() == this.numberConsumerThreads; + /** + * @param myThread + * @return + * @return false if we were interrupted, true if we were unparked by another thread + */ + private static final boolean park(Object myNode, boolean timed, long nanos) throws InterruptedException { +// long lastTime = timed ? System.nanoTime() : 0; +// int spins = timed ? maxTimedSpins : maxUntimedSpins; + int spins = maxUntimedSpins; + Thread myThread = Thread.currentThread(); - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is - // nothing we can do to make this an exact method. - return lvConsumerIndex() != lvProducerIndex(); +// if (timed) { +// long now = System.nanoTime(); +// nanos -= now - lastTime; +// lastTime = now; +// if (nanos <= 0) { +//// s.tryCancel(e); +// continue; +// } +// } + + // busy spin for the amount of time (roughly) of a CPU context switch + // then park (if necessary) + int spin = spins; + for (;;) { + if (lvThread(myNode) == null) { + return true; + } else if (spin > 0) { + --spin; + } else if (spin > negMaxTimedSpins) { + LockSupport.parkNanos(1); + } else { + // park can return for NO REASON. Subsequent loops will hit this if it has not been ACTUALLY unlocked. + LockSupport.park(); + + if (myThread.isInterrupted()) { + casThread(myNode, myThread, null); + return false; + } + } + } } - public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { - } +// /** +// * Unparks the other node (if it was waiting) +// */ +// private static final void unpark(Object otherNode) { +// Thread myThread = Thread.currentThread(); +// Thread thread; +// +// for (;;) { +// thread = getThread(otherNode); +// if (threadCAS(otherNode, thread, myThread)) { +// if (thread == null) { +// // no parking (UNPARK won the race) +// return; +// } else if (thread != myThread) { +// // park will always set the waiter back to null +// LockSupport.unpark(thread); +// return; +// } else { +// // contention +// busySpin(); +// } +// } +// } +// } @Override public boolean offer(Node message) { @@ -443,4 +477,24 @@ public final class SimpleQueue extends MpmcArrayQueueConsumerField { public int size() { return 0; } + + @Override + public boolean isEmpty() { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is + // nothing we can do to make this an exact method. + return lvConsumerIndex() == lvProducerIndex(); + } + + public boolean hasPendingMessages() { + // count the number of consumers waiting, it should be the same as the number of threads configured +// return this.consumersWaiting.size() == this.numberConsumerThreads; + return false; + } + + public void tryTransfer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException { + // TODO Auto-generated method stub + + } } diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java index 8029a05..66f59eb 100644 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/ConcurrentCircularArrayQueue.java @@ -41,7 +41,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue imp * @param */ public abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 2); + protected static final int SPARSE_SHIFT = Integer.getInteger("sparse.shift", 0); protected static final int BUFFER_PAD; private static final long REF_ARRAY_BASE; private static final int REF_ELEMENT_SHIFT; diff --git a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java index d80e302..d194ab3 100755 --- a/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java +++ b/src/main/java/dorkbox/util/messagebus/common/simpleq/jctools/MpmcArrayQueue.java @@ -101,11 +101,11 @@ public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { currentProducerIndex - capacity <= cIndex && // test against cached cIndex currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex // Extra check required to ensure [Queue.offer == false iff queue is full] -// return false; - busySpin(); + return false; } // another producer has moved the sequence by one, retry 2 + busySpin(); } } @@ -151,11 +151,11 @@ public class MpmcArrayQueue extends MpmcArrayQueueConsumerField { currentConsumerIndex >= pIndex && // test against cached pIndex currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] -// return null; - busySpin(); + return null; } // another consumer beat us and moved sequence ahead, retry 2 + // only producer will busy spin } } diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java index 580c940..45542bb 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueAltPerfTest.java @@ -18,12 +18,13 @@ package dorkbox.util.messagebus; import org.openjdk.jol.info.ClassLayout; import org.openjdk.jol.util.VMSupport; -import dorkbox.util.messagebus.common.simpleq.MpmcExchangerQueue; import dorkbox.util.messagebus.common.simpleq.Node; +import dorkbox.util.messagebus.common.simpleq.jctools.MpmcArrayQueue; public class MpmcQueueAltPerfTest { // 15 == 32 * 1024 public static final int REPETITIONS = Integer.getInteger("reps", 50) * 1000 * 1000; + public static final Integer TEST_VALUE = Integer.valueOf(777); public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); @@ -31,10 +32,8 @@ public class MpmcQueueAltPerfTest { System.out.println(VMSupport.vmDetails()); System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - - final MpmcExchangerQueue queue = new MpmcExchangerQueue(QUEUE_CAPACITY); + final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); final long[] results = new long[20]; for (int i = 0; i < 20; i++) { @@ -49,19 +48,20 @@ public class MpmcQueueAltPerfTest { System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); } + private static long performanceRun(int runNumber, MpmcArrayQueue queue) throws Exception { + Producer p = new Producer(queue); + Thread thread = new Thread(p); + thread.start(); // producer will timestamp start - private static long performanceRun(int runNumber, MpmcExchangerQueue queue) throws Exception { -// for (int i=0;i consumer = queue; + Node result; int i = REPETITIONS; + int queueEmpty = 0; do { - result = consumer.take(); + while (null == (result = consumer.poll())) { + queueEmpty++; + Thread.yield(); + } } while (0 != --i); long end = System.nanoTime(); @@ -69,30 +69,46 @@ public class MpmcQueueAltPerfTest { long duration = end - p.start; long ops = REPETITIONS * 1000L * 1000L * 1000L / duration; String qName = queue.getClass().getSimpleName(); - System.out.format("%d - ops/sec=%,d - %s result=%d\n", runNumber, ops, qName, result); + System.out.format("%d - ops/sec=%,d - %s result=%d failed.poll=%d failed.offer=%d\n", runNumber, ops, + qName, result.item1, queueEmpty, p.queueFull); return ops; } - private static final Integer val = Integer.valueOf(234); - - + @SuppressWarnings("rawtypes") public static class Producer implements Runnable { - private final MpmcExchangerQueue queue; + private final MpmcArrayQueue queue; + int queueFull = 0; long start; - public Producer(MpmcExchangerQueue queue) { + public Producer(MpmcArrayQueue queue) { this.queue = queue; } @Override public void run() { - MpmcExchangerQueue producer = this.queue; - int i = REPETITIONS; - long s = System.nanoTime(); - do { - producer.put(val); - } while (0 != --i); - this.start = s; + MpmcArrayQueue producer = this.queue; + int i = REPETITIONS; + int f = 0; + long s = System.nanoTime(); + + MpmcArrayQueue pool = new MpmcArrayQueue(2); + pool.offer(new Node()); + pool.offer(new Node()); + + Node node; + do { + node = pool.poll(); + node.item1 = TEST_VALUE; + + while (!producer.offer(node)) { + Thread.yield(); + f++; + } + + pool.offer(node); + } while (0 != --i); + this.queueFull = f; + this.start = s; } } } diff --git a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java b/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java index d55f3d4..207baa9 100644 --- a/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java +++ b/src/test/java/dorkbox/util/messagebus/MpmcQueueBaselineNodePerfTest.java @@ -30,7 +30,7 @@ public class MpmcQueueBaselineNodePerfTest { public static void main(final String[] args) throws Exception { System.out.println(VMSupport.vmDetails()); - System.out.println(ClassLayout.parseClass(Integer.class).toPrintable()); + System.out.println(ClassLayout.parseClass(Node.class).toPrintable()); System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); final MpmcArrayQueue queue = new MpmcArrayQueue(QUEUE_CAPACITY); diff --git a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java index 38cbf62..b9cd326 100644 --- a/src/test/java/dorkbox/util/messagebus/PerformanceTest.java +++ b/src/test/java/dorkbox/util/messagebus/PerformanceTest.java @@ -6,7 +6,6 @@ package dorkbox.util.messagebus; import junit.framework.Assert; import dorkbox.util.messagebus.annotations.Handler; import dorkbox.util.messagebus.common.ConcurrentExecutor; -import dorkbox.util.messagebus.common.simpleq.SimpleQueue; import dorkbox.util.messagebus.error.IPublicationErrorHandler; import dorkbox.util.messagebus.error.PublicationError; @@ -19,7 +18,7 @@ public class PerformanceTest { public static final int QUEUE_CAPACITY = 1 << Integer.getInteger("pow2.capacity", 17); - public static final int CONCURRENCY_LEVEL = 1; + public static final int CONCURRENCY_LEVEL = 2; private static long count = 0; @@ -32,72 +31,6 @@ public class PerformanceTest { }; public static void main(String[] args) throws Exception { -// testSpeed(); - tesCorrectness(); - } - - private static void testSpeed() throws Exception { - System.out.println("capacity:" + QUEUE_CAPACITY + " reps:" + REPETITIONS); - - final SimpleQueue queue = new SimpleQueue (QUEUE_CAPACITY, 1 << 14); - - final long[] results = new long[20]; - for (int i = 0; i < 20; i++) { - System.gc(); - results[i] = performanceRun(i, queue); - } - // only average last 10 results for summary - long sum = 0; - for (int i = 10; i < 20; i++) { - sum += results[i]; - } - System.out.format("summary,QueuePerfTest,%s,%d\n", queue.getClass().getSimpleName(), sum / 10); - } - - private static long performanceRun(int runNumber, SimpleQueue queue) throws Exception { -// for (int i=0;i