Comment polish and better null checks
This commit is contained in:
parent
ad34679e71
commit
666b391514
@ -38,9 +38,8 @@ import static dorkbox.util.collections.ConcurrentIterator.headREF;
|
|||||||
public
|
public
|
||||||
class ConnectionManager<C extends Connection> implements ListenerBridge, ISessionManager<C>, ConnectionPoint, ConnectionBridgeServer<C>,
|
class ConnectionManager<C extends Connection> implements ListenerBridge, ISessionManager<C>, ConnectionPoint, ConnectionBridgeServer<C>,
|
||||||
ConnectionExceptSpecifiedBridgeServer<C> {
|
ConnectionExceptSpecifiedBridgeServer<C> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specifies the load-factor for the IdentityMap used
|
* Specifies the load-factor for the IdentityMap used to manage keeping track of the number of connections + listeners
|
||||||
*/
|
*/
|
||||||
@Property
|
@Property
|
||||||
public static final float LOAD_FACTOR = 0.8F;
|
public static final float LOAD_FACTOR = 0.8F;
|
||||||
@ -50,6 +49,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
private volatile IdentityMap<Connection, ConnectionManager<C>> localManagers = new IdentityMap<Connection, ConnectionManager<C>>(8, ConnectionManager.LOAD_FACTOR);
|
private volatile IdentityMap<Connection, ConnectionManager<C>> localManagers = new IdentityMap<Connection, ConnectionManager<C>>(8, ConnectionManager.LOAD_FACTOR);
|
||||||
|
@SuppressWarnings("unused")
|
||||||
private volatile IdentityMap<Type, ConcurrentIterator> listeners = new IdentityMap<Type, ConcurrentIterator>(32, LOAD_FACTOR);
|
private volatile IdentityMap<Type, ConcurrentIterator> listeners = new IdentityMap<Type, ConcurrentIterator>(32, LOAD_FACTOR);
|
||||||
|
|
||||||
@SuppressWarnings({"FieldCanBeLocal", "unused"})
|
@SuppressWarnings({"FieldCanBeLocal", "unused"})
|
||||||
@ -61,10 +61,9 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
// In order to force the "single writer principle" for subscribe & unsubscribe, they are within SYNCHRONIZED.
|
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
|
||||||
//
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// These methods **COULD** be dispatched via another thread (so it's only one thread ever touching them), however we do NOT want them
|
// use-case 99% of the time)
|
||||||
// asynchronous - as publish() should ALWAYS succeed if a correct subscribe() is called before. 'Synchronized' is good enough here.
|
|
||||||
private final Object singleWriterLock1 = new Object();
|
private final Object singleWriterLock1 = new Object();
|
||||||
private final Object singleWriterLock2 = new Object();
|
private final Object singleWriterLock2 = new Object();
|
||||||
private final Object singleWriterLock3 = new Object();
|
private final Object singleWriterLock3 = new Object();
|
||||||
@ -160,7 +159,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock1) {
|
synchronized (singleWriterLock1) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the listeners (single-writer-principle)
|
||||||
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
||||||
|
|
||||||
ConcurrentIterator subscribedListeners = listeners.get(type);
|
ConcurrentIterator subscribedListeners = listeners.get(type);
|
||||||
@ -207,7 +206,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock1) {
|
synchronized (singleWriterLock1) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the listeners (single-writer-principle)
|
||||||
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
||||||
final ConcurrentIterator concurrentIterator = listeners.get(type);
|
final ConcurrentIterator concurrentIterator = listeners.get(type);
|
||||||
if (concurrentIterator != null) {
|
if (concurrentIterator != null) {
|
||||||
@ -239,7 +238,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock1) {
|
synchronized (singleWriterLock1) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the listeners (single-writer-principle)
|
||||||
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
||||||
|
|
||||||
listeners.clear();
|
listeners.clear();
|
||||||
@ -270,7 +269,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock1) {
|
synchronized (singleWriterLock1) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the listeners (single-writer-principle)
|
||||||
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
||||||
|
|
||||||
listeners.remove(classType);
|
listeners.remove(classType);
|
||||||
@ -386,7 +385,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
|
|
||||||
|
|
||||||
// now have to account for additional connection listener managers (non-global).
|
// now have to account for additional connection listener managers (non-global).
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the managers (single-writer-principle)
|
||||||
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
||||||
ConnectionManager<C> localManager = localManagers.get(connection);
|
ConnectionManager<C> localManager = localManagers.get(connection);
|
||||||
if (localManager != null) {
|
if (localManager != null) {
|
||||||
@ -427,10 +426,10 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
||||||
|
|
||||||
boolean foundListener = false;
|
boolean foundListener = false;
|
||||||
final IdentityMap.Values<ConcurrentIterator> values = listeners.values();
|
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries(); // entries is necessary for multiple threads
|
||||||
for (ConcurrentIterator concurrentIterator : values) {
|
for (IdentityMap.Entry<Type, ConcurrentIterator> entry : entries) {
|
||||||
if (concurrentIterator != null) {
|
if (entry != null && entry.value != null) {
|
||||||
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
|
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(entry.value);
|
||||||
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
|
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
|
||||||
ListenerRaw<C, Object> listener;
|
ListenerRaw<C, Object> listener;
|
||||||
while (current != null) {
|
while (current != null) {
|
||||||
@ -459,7 +458,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
}
|
}
|
||||||
|
|
||||||
// now have to account for additional (local) listener managers.
|
// now have to account for additional (local) listener managers.
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the managers (single-writer-principle)
|
||||||
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
||||||
ConnectionManager<C> localManager = localManagers.get(connection);
|
ConnectionManager<C> localManager = localManagers.get(connection);
|
||||||
if (localManager != null) {
|
if (localManager != null) {
|
||||||
@ -481,10 +480,10 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
||||||
|
|
||||||
boolean foundListener = false;
|
boolean foundListener = false;
|
||||||
final IdentityMap.Values<ConcurrentIterator> values = listeners.values();
|
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries();
|
||||||
for (ConcurrentIterator concurrentIterator : values) {
|
for (IdentityMap.Entry<Type, ConcurrentIterator> entry : entries) {
|
||||||
if (concurrentIterator != null) {
|
if (entry != null && entry.value != null) {
|
||||||
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
|
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(entry.value);
|
||||||
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
|
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
|
||||||
ListenerRaw<C, Object> listener;
|
ListenerRaw<C, Object> listener;
|
||||||
while (current != null) {
|
while (current != null) {
|
||||||
@ -513,7 +512,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
}
|
}
|
||||||
|
|
||||||
// now have to account for additional (local) listener managers.
|
// now have to account for additional (local) listener managers.
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the managers (single-writer-principle)
|
||||||
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
||||||
ConnectionManager<C> localManager = localManagers.get(connection);
|
ConnectionManager<C> localManager = localManagers.get(connection);
|
||||||
if (localManager != null) {
|
if (localManager != null) {
|
||||||
@ -533,10 +532,10 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
|
||||||
|
|
||||||
boolean foundListener = false;
|
boolean foundListener = false;
|
||||||
final IdentityMap.Values<ConcurrentIterator> values = listeners.values();
|
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries(); // entries is necessary for multiple threads
|
||||||
for (ConcurrentIterator concurrentIterator : values) {
|
for (IdentityMap.Entry<Type, ConcurrentIterator> entry : entries) {
|
||||||
if (concurrentIterator != null) {
|
if (entry != null && entry.value != null) {
|
||||||
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
|
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(entry.value);
|
||||||
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
|
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
|
||||||
ListenerRaw<C, Object> listener;
|
ListenerRaw<C, Object> listener;
|
||||||
while (current != null) {
|
while (current != null) {
|
||||||
@ -567,7 +566,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
|
|
||||||
// now have to account for additional (local) listener managers.
|
// now have to account for additional (local) listener managers.
|
||||||
|
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the managers (single-writer-principle)
|
||||||
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
||||||
ConnectionManager<C> localManager = localManagers.get(connection);
|
ConnectionManager<C> localManager = localManagers.get(connection);
|
||||||
if (localManager != null) {
|
if (localManager != null) {
|
||||||
@ -593,7 +592,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock2) {
|
synchronized (singleWriterLock2) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the connections (single-writer-principle)
|
||||||
ConcurrentEntry head = connectionsREF.get(this);
|
ConcurrentEntry head = connectionsREF.get(this);
|
||||||
|
|
||||||
if (!connectionEntries.containsKey(connection)) {
|
if (!connectionEntries.containsKey(connection)) {
|
||||||
@ -620,7 +619,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock2) {
|
synchronized (singleWriterLock2) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the connections (single-writer-principle)
|
||||||
ConcurrentEntry concurrentEntry = connectionEntries.get(connection);
|
ConcurrentEntry concurrentEntry = connectionEntries.get(connection);
|
||||||
|
|
||||||
if (concurrentEntry != null) {
|
if (concurrentEntry != null) {
|
||||||
@ -671,7 +670,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock3) {
|
synchronized (singleWriterLock3) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the managers (single-writer-principle)
|
||||||
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
||||||
|
|
||||||
manager = localManagers.get(connection);
|
manager = localManagers.get(connection);
|
||||||
@ -703,7 +702,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
|
|||||||
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
|
||||||
// use-case 99% of the time)
|
// use-case 99% of the time)
|
||||||
synchronized (singleWriterLock3) {
|
synchronized (singleWriterLock3) {
|
||||||
// access a snapshot of the subscriptions (single-writer-principle)
|
// access a snapshot of the managers (single-writer-principle)
|
||||||
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
final IdentityMap<Connection, ConnectionManager<C>> localManagers = localManagersREF.get(this);
|
||||||
|
|
||||||
final ConnectionManager<C> removed = localManagers.remove(connection);
|
final ConnectionManager<C> removed = localManagers.remove(connection);
|
||||||
|
Loading…
Reference in New Issue
Block a user