Code cleanup, restored sub/unsub unit test to original version
This commit is contained in:
parent
45614e8bdd
commit
7d410e78be
@ -6,6 +6,7 @@ import java.util.Collections;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
|
import dorkbox.util.messagebus.common.ConcurrentHashMapV8;
|
||||||
import dorkbox.util.messagebus.common.HashMapTree;
|
import dorkbox.util.messagebus.common.HashMapTree;
|
||||||
@ -73,6 +74,9 @@ public class SubscriptionManager {
|
|||||||
private final Map<Class<?>, Collection<Subscription>> varArgSubscriptions;
|
private final Map<Class<?>, Collection<Subscription>> varArgSubscriptions;
|
||||||
private final Map<Class<?>, Collection<Subscription>> varArgSuperClassSubscriptions;
|
private final Map<Class<?>, Collection<Subscription>> varArgSuperClassSubscriptions;
|
||||||
|
|
||||||
|
// to keep track if we really need to clear our maps
|
||||||
|
private final AtomicBoolean superCheck = new AtomicBoolean();
|
||||||
|
|
||||||
// stripe size of maps for concurrency
|
// stripe size of maps for concurrency
|
||||||
private final int STRIPE_SIZE;
|
private final int STRIPE_SIZE;
|
||||||
|
|
||||||
@ -142,9 +146,12 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// these are concurrent collections
|
// these are concurrent collections
|
||||||
this.superClassSubscriptions.clear();
|
boolean compareAndSet = this.superCheck.compareAndSet(true, false);
|
||||||
this.varArgSubscriptions.clear();
|
if (compareAndSet) {
|
||||||
this.varArgSuperClassSubscriptions.clear();
|
this.superClassSubscriptions.clear();
|
||||||
|
this.varArgSubscriptions.clear();
|
||||||
|
this.varArgSuperClassSubscriptions.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
|
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
|
||||||
@ -254,10 +261,12 @@ public class SubscriptionManager {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// these are concurrent collections
|
boolean compareAndSet = this.superCheck.compareAndSet(true, false);
|
||||||
this.superClassSubscriptions.clear();
|
if (compareAndSet) {
|
||||||
this.varArgSubscriptions.clear();
|
this.superClassSubscriptions.clear();
|
||||||
this.varArgSuperClassSubscriptions.clear();
|
this.varArgSubscriptions.clear();
|
||||||
|
this.varArgSuperClassSubscriptions.clear();
|
||||||
|
}
|
||||||
|
|
||||||
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
|
// no point in locking everything. We lock on the class object being subscribed, since that is as coarse as we can go.
|
||||||
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency
|
// the listenerClass is GUARANTEED to be unique and the same object, per classloader. We do NOT LOCK for visibility, but for concurrency
|
||||||
@ -345,6 +354,7 @@ public class SubscriptionManager {
|
|||||||
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
|
// check to see if the messageType can convert/publish to the "array" version, without the hit to JNI
|
||||||
// and then, returns the array'd version subscriptions
|
// and then, returns the array'd version subscriptions
|
||||||
public Collection<Subscription> getVarArgSubscriptions(Class<?> varArgType) {
|
public Collection<Subscription> getVarArgSubscriptions(Class<?> varArgType) {
|
||||||
|
this.superCheck.set(true);
|
||||||
Map<Class<?>, Collection<Subscription>> local = this.varArgSubscriptions;
|
Map<Class<?>, Collection<Subscription>> local = this.varArgSubscriptions;
|
||||||
|
|
||||||
// whenever our subscriptions change, this map is cleared.
|
// whenever our subscriptions change, this map is cleared.
|
||||||
@ -378,6 +388,7 @@ public class SubscriptionManager {
|
|||||||
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
|
// check to see if the messageType can convert/publish to the "array" superclass version, without the hit to JNI
|
||||||
// and then, returns the array'd version subscriptions
|
// and then, returns the array'd version subscriptions
|
||||||
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) {
|
public Collection<Subscription> getVarArgSuperSubscriptions(Class<?> varArgType) {
|
||||||
|
this.superCheck.set(true);
|
||||||
Map<Class<?>, Collection<Subscription>> local = this.varArgSuperClassSubscriptions;
|
Map<Class<?>, Collection<Subscription>> local = this.varArgSuperClassSubscriptions;
|
||||||
|
|
||||||
// whenever our subscriptions change, this map is cleared.
|
// whenever our subscriptions change, this map is cleared.
|
||||||
@ -421,6 +432,7 @@ public class SubscriptionManager {
|
|||||||
|
|
||||||
// ALSO checks to see if the superClass accepts subtypes.
|
// ALSO checks to see if the superClass accepts subtypes.
|
||||||
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
|
public final Collection<Subscription> getSuperSubscriptions(Class<?> superType) {
|
||||||
|
this.superCheck.set(true);
|
||||||
Map<Class<?>, Collection<Subscription>> local = this.superClassSubscriptions;
|
Map<Class<?>, Collection<Subscription>> local = this.superClassSubscriptions;
|
||||||
|
|
||||||
// whenever our subscriptions change, this map is cleared.
|
// whenever our subscriptions change, this map is cleared.
|
||||||
@ -462,6 +474,7 @@ public class SubscriptionManager {
|
|||||||
// must be protected by read lock
|
// must be protected by read lock
|
||||||
// ALSO checks to see if the superClass accepts subtypes.
|
// ALSO checks to see if the superClass accepts subtypes.
|
||||||
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
|
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2) {
|
||||||
|
this.superCheck.set(true);
|
||||||
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
|
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
|
||||||
|
|
||||||
// whenever our subscriptions change, this map is cleared.
|
// whenever our subscriptions change, this map is cleared.
|
||||||
@ -535,6 +548,7 @@ public class SubscriptionManager {
|
|||||||
// must be protected by read lock
|
// must be protected by read lock
|
||||||
// ALSO checks to see if the superClass accepts subtypes.
|
// ALSO checks to see if the superClass accepts subtypes.
|
||||||
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
|
public Collection<Subscription> getSuperSubscriptions(Class<?> superType1, Class<?> superType2, Class<?> superType3) {
|
||||||
|
this.superCheck.set(true);
|
||||||
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
|
HashMapTree<Class<?>, Collection<Subscription>> local = this.superClassSubscriptionsMulti;
|
||||||
|
|
||||||
// whenever our subscriptions change, this map is cleared.
|
// whenever our subscriptions change, this map is cleared.
|
||||||
|
@ -92,6 +92,9 @@ public class Subscription {
|
|||||||
return this.listeners.size();
|
return this.listeners.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if there were listeners for this publication, false if there was nothing
|
||||||
|
*/
|
||||||
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
|
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message) {
|
||||||
Collection<Object> listeners = this.listeners;
|
Collection<Object> listeners = this.listeners;
|
||||||
|
|
||||||
@ -143,6 +146,9 @@ public class Subscription {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if there were listeners for this publication, false if there was nothing
|
||||||
|
*/
|
||||||
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
|
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2) {
|
||||||
Collection<Object> listeners = this.listeners;
|
Collection<Object> listeners = this.listeners;
|
||||||
|
|
||||||
@ -198,6 +204,9 @@ public class Subscription {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if there were listeners for this publication, false if there was nothing
|
||||||
|
*/
|
||||||
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
|
public boolean publishToSubscription(ErrorHandlingSupport errorHandler, Object message1, Object message2, Object message3) {
|
||||||
Collection<Object> listeners = this.listeners;
|
Collection<Object> listeners = this.listeners;
|
||||||
|
|
||||||
|
@ -2,7 +2,6 @@ package dorkbox.util.messagebus;
|
|||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import dorkbox.util.messagebus.SubscriptionManager;
|
|
||||||
import dorkbox.util.messagebus.common.AssertSupport;
|
import dorkbox.util.messagebus.common.AssertSupport;
|
||||||
import dorkbox.util.messagebus.common.ConcurrentExecutor;
|
import dorkbox.util.messagebus.common.ConcurrentExecutor;
|
||||||
import dorkbox.util.messagebus.common.ListenerFactory;
|
import dorkbox.util.messagebus.common.ListenerFactory;
|
||||||
@ -36,7 +35,6 @@ import dorkbox.util.messagebus.messages.StandardMessage;
|
|||||||
public class SubscriptionManagerTest extends AssertSupport {
|
public class SubscriptionManagerTest extends AssertSupport {
|
||||||
|
|
||||||
private static final int InstancesPerListener = 5000;
|
private static final int InstancesPerListener = 5000;
|
||||||
private static final int ConcurrentUnits = 1;
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIMessageListener(){
|
public void testIMessageListener(){
|
||||||
@ -190,7 +188,6 @@ public class SubscriptionManagerTest extends AssertSupport {
|
|||||||
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1);
|
ConcurrentExecutor.runConcurrent(TestUtil.unsubscriber(subscriptionManager, listeners), 1);
|
||||||
|
|
||||||
listeners.clear();
|
listeners.clear();
|
||||||
validator.clear();
|
|
||||||
|
|
||||||
validator.validate(subscriptionManager);
|
validator.validate(subscriptionManager);
|
||||||
}
|
}
|
||||||
|
@ -26,11 +26,11 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
this.subscribedListener = subscribedListener;
|
this.subscribedListener = subscribedListener;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Expectation listener(Class<?> subscriber) {
|
public Expectation listener(Class subscriber){
|
||||||
return new Expectation(subscriber);
|
return new Expectation(subscriber);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SubscriptionValidator expect(Class<?> subscriber, Class<?> messageType) {
|
private SubscriptionValidator expect(Class subscriber, Class messageType){
|
||||||
this.validations.add(new ValidationEntry(messageType, subscriber));
|
this.validations.add(new ValidationEntry(messageType, subscriber));
|
||||||
this.messageTypes.add(messageType);
|
this.messageTypes.add(messageType);
|
||||||
return this;
|
return this;
|
||||||
@ -38,8 +38,8 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
|
|
||||||
// match subscriptions with existing validation entries
|
// match subscriptions with existing validation entries
|
||||||
// for each tuple of subscriber and message type the specified number of listeners must exist
|
// for each tuple of subscriber and message type the specified number of listeners must exist
|
||||||
public void validate(SubscriptionManager manager) {
|
public void validate(SubscriptionManager manager){
|
||||||
for (Class<?> messageType : this.messageTypes) {
|
for (Class messageType : this.messageTypes){
|
||||||
Collection<ValidationEntry> validationEntries = getEntries(messageType);
|
Collection<ValidationEntry> validationEntries = getEntries(messageType);
|
||||||
|
|
||||||
// we split subs + superSubs into TWO calls.
|
// we split subs + superSubs into TWO calls.
|
||||||
@ -54,6 +54,8 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(validationEntries.size(), collection.size());
|
assertEquals(validationEntries.size(), collection.size());
|
||||||
|
|
||||||
|
|
||||||
for(ValidationEntry validationValidationEntry : validationEntries){
|
for(ValidationEntry validationValidationEntry : validationEntries){
|
||||||
Subscription matchingSub = null;
|
Subscription matchingSub = null;
|
||||||
// one of the subscriptions must belong to the subscriber type
|
// one of the subscriptions must belong to the subscriber type
|
||||||
@ -70,15 +72,11 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void clear() {
|
private Collection<ValidationEntry> getEntries(Class<?> messageType){
|
||||||
this.validations.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private Collection<ValidationEntry> getEntries(Class<?> messageType) {
|
|
||||||
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
|
Collection<ValidationEntry> matching = new LinkedList<ValidationEntry>();
|
||||||
for (ValidationEntry validationValidationEntry : this.validations){
|
for (ValidationEntry validationValidationEntry : this.validations){
|
||||||
if (validationValidationEntry.messageType.equals(messageType)) {
|
|
||||||
|
if(validationValidationEntry.messageType.equals(messageType)) {
|
||||||
matching.add(validationValidationEntry);
|
matching.add(validationValidationEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -86,16 +84,18 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public class Expectation {
|
|
||||||
|
|
||||||
private Class<?> listener;
|
|
||||||
|
|
||||||
private Expectation(Class<?> listener) {
|
public class Expectation{
|
||||||
|
|
||||||
|
private Class listener;
|
||||||
|
|
||||||
|
private Expectation(Class listener) {
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SubscriptionValidator handles(Class<?> ...messages){
|
public SubscriptionValidator handles(Class ...messages){
|
||||||
for(Class<?> message : messages) {
|
for(Class message : messages) {
|
||||||
expect(this.listener, message);
|
expect(this.listener, message);
|
||||||
}
|
}
|
||||||
return SubscriptionValidator.this;
|
return SubscriptionValidator.this;
|
||||||
@ -103,12 +103,18 @@ public class SubscriptionValidator extends AssertSupport{
|
|||||||
}
|
}
|
||||||
|
|
||||||
private class ValidationEntry {
|
private class ValidationEntry {
|
||||||
private Class<?> subscriber;
|
|
||||||
private Class<?> messageType;
|
|
||||||
|
|
||||||
private ValidationEntry(Class<?> messageType, Class<?> subscriber) {
|
|
||||||
|
private Class subscriber;
|
||||||
|
|
||||||
|
private Class messageType;
|
||||||
|
|
||||||
|
private ValidationEntry(Class messageType, Class subscriber) {
|
||||||
this.messageType = messageType;
|
this.messageType = messageType;
|
||||||
this.subscriber = subscriber;
|
this.subscriber = subscriber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user