From 76599e370e0f7e331f86a348a6bbb48114b97bf4 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 3 Apr 2016 18:58:38 +0200 Subject: [PATCH] Refactored listener dispatch events (connect/disconnect/idle/message-received) so that they are now interfaces, and the backing implementation uses the single-writer principle --- .../network/connection/ConnectionImpl.java | 4 +- .../network/connection/ConnectionManager.java | 394 +++++------------- src/dorkbox/network/connection/EndPoint.java | 4 +- src/dorkbox/network/connection/Listener.java | 82 +++- .../network/connection/ListenerBridge.java | 4 +- .../network/connection/ListenerRaw.java | 142 ------- .../connection/RegisterRmiSystemListener.java | 2 +- .../network/connection/idle/IdleListener.java | 8 +- .../connection/idle/IdleListenerTCP.java | 3 +- .../connection/idle/IdleListenerUDP.java | 3 +- .../connection/idle/IdleListenerUDT.java | 3 +- .../network/connection/idle/IdleSender.java | 4 +- .../OnConnectedManager.java | 156 +++++++ .../OnDisconnectedManager.java | 157 +++++++ .../listenerManagement/OnIdleManager.java | 159 +++++++ .../OnMessageReceivedManager.java | 320 ++++++++++++++ .../connection/ping/PingSystemListener.java | 4 +- .../network/rmi/RemoteInvocationResponse.java | 24 ++ .../rmi/RemoteObjectInvocationHandler.java | 17 +- src/dorkbox/network/rmi/RmiBridge.java | 6 +- test/dorkbox/network/ChunkedDataIdleTest.java | 4 +- test/dorkbox/network/ClientSendTest.java | 4 +- test/dorkbox/network/ConnectionTest.java | 7 +- test/dorkbox/network/DiscoverHostTest.java | 2 +- test/dorkbox/network/IdleTest.java | 15 +- .../network/LargeResizeBufferTest.java | 4 +- test/dorkbox/network/ListenerTest.java | 241 +++++++---- test/dorkbox/network/MultipleServerTest.java | 8 +- test/dorkbox/network/MultipleThreadTest.java | 88 ++-- test/dorkbox/network/PingPongLocalTest.java | 52 ++- test/dorkbox/network/PingPongTest.java | 247 +++++------ test/dorkbox/network/ReconnectTest.java | 4 +- test/dorkbox/network/ReuseTest.java | 86 ++-- .../network/UnregisteredClassTest.java | 133 +++--- test/dorkbox/network/rmi/RmiGlobalTest.java | 7 +- .../rmi/RmiSendObjectOverrideMethodTest.java | 4 +- .../network/rmi/RmiSendObjectTest.java | 4 +- test/dorkbox/network/rmi/RmiTest.java | 41 +- 38 files changed, 1575 insertions(+), 872 deletions(-) delete mode 100644 src/dorkbox/network/connection/ListenerRaw.java create mode 100644 src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java create mode 100644 src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java create mode 100644 src/dorkbox/network/connection/listenerManagement/OnIdleManager.java create mode 100644 src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java create mode 100644 src/dorkbox/network/rmi/RemoteInvocationResponse.java diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index 4902d0b1..cb9ddf24 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -772,7 +772,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn @SuppressWarnings("rawtypes") @Override public final - void add(ListenerRaw listener) { + void add(Listener listener) { if (this.endPoint instanceof EndPointServer) { // when we are a server, NORMALLY listeners are added at the GLOBAL level // meaning -- @@ -812,7 +812,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn @SuppressWarnings("rawtypes") @Override public final - void remove(ListenerRaw listener) { + void remove(Listener listener) { if (this.endPoint instanceof EndPointServer) { // when we are a server, NORMALLY listeners are added at the GLOBAL level // meaning -- diff --git a/src/dorkbox/network/connection/ConnectionManager.java b/src/dorkbox/network/connection/ConnectionManager.java index 7220533b..8333bab8 100644 --- a/src/dorkbox/network/connection/ConnectionManager.java +++ b/src/dorkbox/network/connection/ConnectionManager.java @@ -18,21 +18,20 @@ package dorkbox.network.connection; import com.esotericsoftware.kryo.util.IdentityMap; import dorkbox.network.connection.bridge.ConnectionBridgeServer; import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer; -import dorkbox.network.rmi.RmiMessages; +import dorkbox.network.connection.listenerManagement.OnConnectedManager; +import dorkbox.network.connection.listenerManagement.OnDisconnectedManager; +import dorkbox.network.connection.listenerManagement.OnIdleManager; +import dorkbox.network.connection.listenerManagement.OnMessageReceivedManager; import dorkbox.util.ClassHelper; import dorkbox.util.Property; import dorkbox.util.collections.ConcurrentEntry; -import dorkbox.util.collections.ConcurrentIterator; import org.slf4j.Logger; import java.io.IOException; -import java.lang.reflect.Type; -import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import static dorkbox.util.collections.ConcurrentIterator.headREF; - // .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other. @SuppressWarnings("unchecked") public @@ -44,27 +43,29 @@ class ConnectionManager implements ListenerBridge, ISessio @Property public static final float LOAD_FACTOR = 0.8F; - private static Listener unRegisteredType_Listener = null; private final String loggerName; - @SuppressWarnings("unused") - private volatile IdentityMap> localManagers = new IdentityMap>(8, ConnectionManager.LOAD_FACTOR); - @SuppressWarnings("unused") - private volatile IdentityMap listeners = new IdentityMap(32, LOAD_FACTOR); + private final OnConnectedManager onConnectedManager; + private final OnDisconnectedManager onDisconnectedManager; + private final OnIdleManager onIdleManager; + private final OnMessageReceivedManager onMessageReceivedManager; + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) private volatile ConcurrentEntry connectionsHead = null; // reference to the first element - // This is only touched by a single thread, maintains a map of entries for FAST lookup during connection remove. + // This is ONLY touched by a single thread, maintains a map of entries for FAST lookup during connection remove. private final IdentityMap connectionEntries = new IdentityMap(32, ConnectionManager.LOAD_FACTOR); + @SuppressWarnings("unused") + private volatile IdentityMap> localManagers = new IdentityMap>(8, ConnectionManager.LOAD_FACTOR); // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our // use-case 99% of the time) - private final Object singleWriterLock1 = new Object(); private final Object singleWriterLock2 = new Object(); private final Object singleWriterLock3 = new Object(); @@ -75,11 +76,7 @@ class ConnectionManager implements ListenerBridge, ISessio IdentityMap.class, "localManagers"); - // Recommended for best performance while adhering to the "single writer principle". Must be static-final - private static final AtomicReferenceFieldUpdater listenersREF = - AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class, - IdentityMap.class, - "listeners"); + // Recommended for best performance while adhering to the "single writer principle". Must be static-final private static final AtomicReferenceFieldUpdater connectionsREF = @@ -93,14 +90,19 @@ class ConnectionManager implements ListenerBridge, ISessio */ private final Class baseClass; protected final org.slf4j.Logger logger; - volatile boolean shutdown = false; + private final AtomicBoolean hasAddedAtLeastOnce = new AtomicBoolean(false); + final AtomicBoolean shutdown = new AtomicBoolean(false); + - public ConnectionManager(final String loggerName, final Class baseClass) { this.loggerName = loggerName; this.logger = org.slf4j.LoggerFactory.getLogger(loggerName); - this.baseClass = baseClass; + + onConnectedManager = new OnConnectedManager(logger); + onDisconnectedManager = new OnDisconnectedManager(logger); + onIdleManager = new OnIdleManager(logger); + onMessageReceivedManager = new OnMessageReceivedManager(logger); } /** @@ -115,16 +117,21 @@ class ConnectionManager implements ListenerBridge, ISessio @SuppressWarnings("rawtypes") @Override public final - void add(final ListenerRaw listener) { + void add(final Listener listener) { if (listener == null) { throw new IllegalArgumentException("listener cannot be null."); } // find the class that uses Listener.class. Class clazz = listener.getClass(); - while (clazz.getSuperclass() != ListenerRaw.class) { - clazz = clazz.getSuperclass(); - } + Class[] interfaces = clazz.getInterfaces(); + +// for (Class anInterface : interfaces) { +// } +// +// while (!(clazz.getSuperclass() != Object.class)) { +// clazz = clazz.getSuperclass(); +// } // this is the connection generic parameter for the listener Class genericClass = ClassHelper.getGenericParameterAsClassForSuperClass(clazz, 0); @@ -152,34 +159,39 @@ class ConnectionManager implements ListenerBridge, ISessio */ @SuppressWarnings({"unchecked", "rawtypes"}) private - void addListener0(final ListenerRaw listener) { - Class type = listener.getObjectType(); - - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (singleWriterLock1) { - // access a snapshot of the listeners (single-writer-principle) - final IdentityMap listeners = listenersREF.get(this); - - ConcurrentIterator subscribedListeners = listeners.get(type); - if (subscribedListeners == null) { - subscribedListeners = new ConcurrentIterator(); - listeners.put(type, subscribedListeners); - } - - subscribedListeners.add(listener); - - // save this snapshot back to the original (single writer principle) - listenersREF.lazySet(this, listeners); + void addListener0(final Listener listener) { + boolean found = false; + if (listener instanceof Listener.OnConnected) { + onConnectedManager.add((Listener.OnConnected) listener); + found = true; + } + if (listener instanceof Listener.OnDisconnected) { + onDisconnectedManager.add((Listener.OnDisconnected) listener); + found = true; + } + if (listener instanceof Listener.OnIdle) { + onIdleManager.add((Listener.OnIdle) listener); + found = true; } - Logger logger2 = this.logger; - if (logger2.isTraceEnabled()) { - logger2.trace("listener added: {} <{}>", + if (listener instanceof Listener.OnMessageReceived) { + onMessageReceivedManager.add((Listener.OnMessageReceived) listener); + found = true; + } + + final Logger logger2 = this.logger; + if (!found) { + logger2.error("No matching listener types. Unable to add listener: {}", listener.getClass() - .getName(), - listener.getObjectType()); + .getName()); + } + else { + hasAddedAtLeastOnce.set(true); + if (logger2.isTraceEnabled()) { + logger2.trace("listener added: {}", + listener.getClass() + .getName()); + } } } @@ -195,35 +207,36 @@ class ConnectionManager implements ListenerBridge, ISessio @SuppressWarnings("rawtypes") @Override public final - void remove(final ListenerRaw listener) { + void remove(final Listener listener) { if (listener == null) { throw new IllegalArgumentException("listener cannot be null."); } - Class type = listener.getObjectType(); - - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (singleWriterLock1) { - // access a snapshot of the listeners (single-writer-principle) - final IdentityMap listeners = listenersREF.get(this); - final ConcurrentIterator concurrentIterator = listeners.get(type); - if (concurrentIterator != null) { - concurrentIterator.remove(listener); - } - - // save this snapshot back to the original (single writer principle) - listenersREF.lazySet(this, listeners); + boolean found = false; + if (listener instanceof Listener.OnConnected) { + found = onConnectedManager.remove((Listener.OnConnected) listener); + } + if (listener instanceof Listener.OnDisconnected) { + found |= onDisconnectedManager.remove((Listener.OnDisconnected) listener); + } + if (listener instanceof Listener.OnIdle) { + found |= onIdleManager.remove((Listener.OnIdle) listener); + } + if (listener instanceof Listener.OnMessageReceived) { + found |= onMessageReceivedManager.remove((Listener.OnMessageReceived) listener); } - - Logger logger2 = this.logger; - if (logger2.isTraceEnabled()) { - logger2.trace("listener removed: {} <{}>", + final Logger logger2 = this.logger; + if (!found) { + logger2.error("No matching listener types. Unable to remove listener: {}", listener.getClass() - .getName(), - listener.getObjectType()); + .getName()); + + } + else if (logger2.isTraceEnabled()) { + logger2.trace("listener removed: {}", + listener.getClass() + .getName()); } } @@ -234,22 +247,11 @@ class ConnectionManager implements ListenerBridge, ISessio @Override public final void removeAll() { - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (singleWriterLock1) { - // access a snapshot of the listeners (single-writer-principle) - final IdentityMap listeners = listenersREF.get(this); - - listeners.clear(); - - // save this snapshot back to the original (single writer principle) - listenersREF.lazySet(this, listeners); - } + onMessageReceivedManager.removeAll(); Logger logger2 = this.logger; if (logger2.isTraceEnabled()) { - logger2.trace("all listeners removed !!"); + logger2.trace("ALL listeners removed !!"); } } @@ -265,22 +267,15 @@ class ConnectionManager implements ListenerBridge, ISessio throw new IllegalArgumentException("classType cannot be null."); } - // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this - // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our - // use-case 99% of the time) - synchronized (singleWriterLock1) { - // access a snapshot of the listeners (single-writer-principle) - final IdentityMap listeners = listenersREF.get(this); - - listeners.remove(classType); - - // save this snapshot back to the original (single writer principle) - listenersREF.lazySet(this, listeners); - } - - Logger logger2 = this.logger; - if (logger2.isTraceEnabled()) { - logger2.trace("all listeners removed for type: {}", + final Logger logger2 = this.logger; + if (onMessageReceivedManager.removeAll(classType)) { + if (logger2.isTraceEnabled()) { + logger2.trace("All listeners removed for type: {}", + classType.getClass() + .getName()); + } + } else { + logger2.warn("No listeners found to remove for type: {}", classType.getClass() .getName()); } @@ -304,85 +299,7 @@ class ConnectionManager implements ListenerBridge, ISessio @SuppressWarnings("Duplicates") private boolean notifyOnMessage0(final C connection, final Object message, boolean foundListener) { - Class objectType = message.getClass(); - - // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) - final IdentityMap listeners = listenersREF.get(this); - ConcurrentIterator concurrentIterator = listeners.get(objectType); - - if (concurrentIterator != null) { - ConcurrentEntry> head = headREF.get(concurrentIterator); - ConcurrentEntry> current = head; - ListenerRaw listener; - while (current != null) { - if (this.shutdown) { - return true; - } - - listener = current.getValue(); - current = current.next(); - - try { - listener.received(connection, message); - } catch (Exception e) { - logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", - objectType, - listener, - connection, - e); - listener.error(connection, e); - } - } - - foundListener = head != null; // true if we have something to publish to, otherwise false - } - - if (!(message instanceof RmiMessages)) { - // we march through all super types of the object, and find the FIRST set - // of listeners that are registered and cast it as that, and notify the method. - // NOTICE: we do NOT call ALL TYPE -- meaning, if we have Object->Foo->Bar - // and have listeners for Object and Foo - // we will call Bar (from the above code) - // we will call Foo (from this code) - // we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object! - - objectType = objectType.getSuperclass(); - while (objectType != null) { - // check to see if we have what we are looking for in our CURRENT class - concurrentIterator = listeners.get(objectType); - if (concurrentIterator != null) { - ConcurrentEntry> head = headREF.get(concurrentIterator); - ConcurrentEntry> current = head; - ListenerRaw listener; - while (current != null) { - if (this.shutdown) { - return true; - } - - listener = current.getValue(); - current = current.next(); - - try { - listener.received(connection, message); - } catch (Exception e) { - logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", - objectType, - listener, - connection, - e); - listener.error(connection, e); - } - } - - foundListener = head != null; // true if we have something to publish to, otherwise false - break; - } - - // NO MATCH, so walk up. - objectType = objectType.getSuperclass(); - } - } - + foundListener |= onMessageReceivedManager.notifyReceived(connection, message, shutdown); // now have to account for additional connection listener managers (non-global). // access a snapshot of the managers (single-writer-principle) @@ -400,9 +317,6 @@ class ConnectionManager implements ListenerBridge, ISessio connection.send() .flush(); } - else if (unRegisteredType_Listener != null) { - unRegisteredType_Listener.received(connection, null); - } else { Logger logger2 = this.logger; if (logger2.isErrorEnabled()) { @@ -422,35 +336,7 @@ class ConnectionManager implements ListenerBridge, ISessio @Override public final void notifyOnIdle(final C connection) { - // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) - final IdentityMap listeners = listenersREF.get(this); - - boolean foundListener = false; - final IdentityMap.Entries entries = listeners.entries(); // entries is necessary for multiple threads - for (IdentityMap.Entry entry : entries) { - if (entry != null && entry.value != null) { - ConcurrentEntry> head = headREF.get(entry.value); - ConcurrentEntry> current = head; - ListenerRaw listener; - while (current != null) { - if (this.shutdown) { - return; - } - - listener = current.getValue(); - current = current.next(); - - try { - listener.idle(connection); - } catch (Exception e) { - logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e); - listener.error(connection, e); - } - } - - foundListener |= head != null; // true if we have something to publish to, otherwise false - } - } + boolean foundListener = onIdleManager.notifyIdle(connection, shutdown); if (foundListener) { connection.send() @@ -477,34 +363,7 @@ class ConnectionManager implements ListenerBridge, ISessio void connectionConnected(final C connection) { addConnection(connection); - final IdentityMap listeners = listenersREF.get(this); - - boolean foundListener = false; - final IdentityMap.Entries entries = listeners.entries(); - for (IdentityMap.Entry entry : entries) { - if (entry != null && entry.value != null) { - ConcurrentEntry> head = headREF.get(entry.value); - ConcurrentEntry> current = head; - ListenerRaw listener; - while (current != null) { - if (this.shutdown) { - return; - } - - listener = current.getValue(); - current = current.next(); - - try { - listener.connected(connection); - } catch (Exception e) { - logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e); - listener.error(connection, e); - } - } - - foundListener |= head != null; // true if we have something to publish to, otherwise false - } - } + boolean foundListener = onConnectedManager.notifyConnected(connection, shutdown); if (foundListener) { connection.send() @@ -529,41 +388,13 @@ class ConnectionManager implements ListenerBridge, ISessio @Override public void connectionDisconnected(final C connection) { - final IdentityMap listeners = listenersREF.get(this); - - boolean foundListener = false; - final IdentityMap.Entries entries = listeners.entries(); // entries is necessary for multiple threads - for (IdentityMap.Entry entry : entries) { - if (entry != null && entry.value != null) { - ConcurrentEntry> head = headREF.get(entry.value); - ConcurrentEntry> current = head; - ListenerRaw listener; - while (current != null) { - if (this.shutdown) { - return; - } - - listener = current.getValue(); - current = current.next(); - - try { - listener.disconnected(connection); - } catch (Exception e) { - logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e); - listener.error(connection, e); - } - } - - foundListener |= head != null; // true if we have something to publish to, otherwise false - } - } + boolean foundListener = onDisconnectedManager.notifyDisconnected(connection, shutdown); if (foundListener) { connection.send() .flush(); } - // now have to account for additional (local) listener managers. // access a snapshot of the managers (single-writer-principle) @@ -614,6 +445,7 @@ class ConnectionManager implements ListenerBridge, ISessio * * @param connection the connection to remove */ + private void removeConnection(C connection) { // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our @@ -676,7 +508,8 @@ class ConnectionManager implements ListenerBridge, ISessio manager = localManagers.get(connection); if (manager == null) { created = true; - manager = new ConnectionManager(loggerName + "-" + connection.toString() + " Specific", ConnectionManager.this.baseClass); + manager = new ConnectionManager(loggerName + "-" + connection.toString() + " Specific", + ConnectionManager.this.baseClass); localManagers.put(connection, manager); // save this snapshot back to the original (single writer principle) @@ -746,7 +579,7 @@ class ConnectionManager implements ListenerBridge, ISessio */ final boolean hasListeners() { - return listenersREF.get(this).size == 0; + return hasAddedAtLeastOnce.get(); } /** @@ -754,24 +587,15 @@ class ConnectionManager implements ListenerBridge, ISessio */ final void stop() { - this.shutdown = true; + this.shutdown.set(true); // disconnect the sessions closeConnections(); - synchronized (singleWriterLock1) { - final IdentityMap listeners = listenersREF.get(this); - final Iterator iterator = listeners.values() - .iterator(); - while (iterator.hasNext()) { - final ConcurrentIterator next = iterator.next(); - next.clear(); - iterator.remove(); - } - - // save this snapshot back to the original (single writer principle) - listenersREF.lazySet(this, listeners); - } + onConnectedManager.clear(); + onDisconnectedManager.clear(); + onIdleManager.clear(); + onMessageReceivedManager.clear(); } /** diff --git a/src/dorkbox/network/connection/EndPoint.java b/src/dorkbox/network/connection/EndPoint.java index 84a2bc94..c98fa319 100644 --- a/src/dorkbox/network/connection/EndPoint.java +++ b/src/dorkbox/network/connection/EndPoint.java @@ -296,7 +296,7 @@ class EndPoint { public void run() { // connectionManager.shutdown accurately reflects the state of the app. Safe to use here - if (EndPoint.this.connectionManager != null && !EndPoint.this.connectionManager.shutdown) { + if (EndPoint.this.connectionManager != null && !EndPoint.this.connectionManager.shutdown.get()) { EndPoint.this.stop(); } } @@ -364,7 +364,7 @@ class EndPoint { } /** - * The amount of milli-seconds that must elapse with no read or write before {@link ListenerRaw#idle(Connection)} } + * The amount of milli-seconds that must elapse with no read or write before {@link Listener.OnIdle#idle(Connection)} } * will be triggered */ public diff --git a/src/dorkbox/network/connection/Listener.java b/src/dorkbox/network/connection/Listener.java index d0048adf..2ae7054c 100644 --- a/src/dorkbox/network/connection/Listener.java +++ b/src/dorkbox/network/connection/Listener.java @@ -15,10 +15,82 @@ */ package dorkbox.network.connection; -public abstract -class Listener extends ListenerRaw { - public - Listener() { - super(0); +import java.io.IOException; + +public +interface Listener { + /** + * Called when the remote end has been connected. This will be invoked before any objects are received by the network. + * This method should not block for long periods as other network activity will not be processed + * until it returns. + */ + interface OnConnected extends Listener { + /** + * Called when the remote end has been connected. This will be invoked before any objects are received by the network. + * This method should not block for long periods as other network activity will not be processed + * until it returns. + */ + void connected(C connection); + } + + /** + * Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method. + *

+ * Do not write data in this method! The channel can already be closed, resulting in an error if you attempt to do so. + */ + interface OnDisconnected extends Listener { + /** + * Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method. + *

+ * Do not write data in this method! The channel can already be closed, resulting in an error if you attempt to do so. + */ + void disconnected(C connection); + } + + + /** + * Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise) + */ + interface OnError extends Listener { + /** + * Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise). + * + * The error is sent to an error log before this method is called. + */ + void error(C connection, Throwable throwable); + } + + + /** + * Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold. + */ + interface OnIdle extends Listener { + + /** + * Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold. + */ + void idle(C connection) throws IOException; + } + + + /** + * Called when an object has been received from the remote end of the connection. + * This method should not block for long periods as other network activity will not be processed until it returns. + */ + interface OnMessageReceived extends Listener { + void received(C connection, M message); + } + + + /** + * Permits a listener to specify it's own referenced object type, if passing in a generic parameter doesn't work. This is necessary since + * the system looks up incoming message types to determine what listeners to dispatch them to. + */ + interface SelfDefinedType { + /** + * Permits a listener to specify it's own referenced object type, if passing in a generic parameter doesn't work. This is necessary since + * the system looks up incoming message types to determine what listeners to dispatch them to. + */ + Class getType(); } } diff --git a/src/dorkbox/network/connection/ListenerBridge.java b/src/dorkbox/network/connection/ListenerBridge.java index b92d0734..6a70f658 100644 --- a/src/dorkbox/network/connection/ListenerBridge.java +++ b/src/dorkbox/network/connection/ListenerBridge.java @@ -38,7 +38,7 @@ interface ListenerBridge { * the connection is notified on that event (ie, admin type listeners) */ @SuppressWarnings("rawtypes") - void add(ListenerRaw listener); + void add(Listener listener); /** * Removes a listener from this connection/endpoint to NO LONGER be notified @@ -53,7 +53,7 @@ interface ListenerBridge { * the connection is removed */ @SuppressWarnings("rawtypes") - void remove(ListenerRaw listener); + void remove(Listener listener); /** * Removes all registered listeners from this connection/endpoint to NO diff --git a/src/dorkbox/network/connection/ListenerRaw.java b/src/dorkbox/network/connection/ListenerRaw.java deleted file mode 100644 index 7959986f..00000000 --- a/src/dorkbox/network/connection/ListenerRaw.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.connection; - -import dorkbox.util.ClassHelper; - -import java.io.IOException; - -public abstract -class ListenerRaw { - - private final Class objectType; - - // for compile time code. The generic type parameter #2 (index 1) is pulled from type arguments. - // generic parameters cannot be primitive types - public - ListenerRaw() { - this(1); - } - - // for sub-classed listeners, we might have to specify which parameter to use. - protected - ListenerRaw(int lastParameterIndex) { - if (lastParameterIndex > -1) { - @SuppressWarnings("rawtypes") - Class class1 = getClass(); - - Class objectType = ClassHelper.getGenericParameterAsClassForSuperClass(class1, lastParameterIndex); - - if (objectType != null) { - // SOMETIMES generics get confused on which parameter we actually mean (when sub-classing) - if (objectType != Object.class && ClassHelper.hasInterface(Connection.class, objectType)) { - Class objectType2 = ClassHelper.getGenericParameterAsClassForSuperClass(class1, lastParameterIndex + 1); - if (objectType2 != null) { - objectType = objectType2; - } - } - - this.objectType = objectType; - } - else { - this.objectType = Object.class; - } - } - else { - // for when we want to override it - this.objectType = Object.class; - } - } - - /** - * Gets the referenced object type. - *

- * non-final so this can be overridden by listeners that aren't able to define their type as a generic parameter - */ - public - Class getObjectType() { - return this.objectType; - } - - /** - * Called when the remote end has been connected. This will be invoked before any objects are received by the network. - * This method should not block for long periods as other network activity will not be processed - * until it returns. - */ - @SuppressWarnings("unused") - public - void connected(C connection) { - } - - /** - * Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method. - *

- * Do not write data in this method! The channel can be closed, resulting in an error if you attempt to do so. - */ - @SuppressWarnings("unused") - public - void disconnected(C connection) { - } - - /** - * Called when an object has been received from the remote end of the connection. - * This method should not block for long periods as other network activity will not be processed until it returns. - */ - @SuppressWarnings("unused") - public - void received(C connection, M message) { - } - - /** - * Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold. - */ - @SuppressWarnings("unused") - public - void idle(C connection) throws IOException { - } - - /** - * Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise) - */ - @SuppressWarnings("unused") - public - void error(C connection, Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public - int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + (this.objectType == null ? 0 : this.objectType.hashCode()); - return result; - } - - // only possible way for it to be equal, is if it is the same object - @Override - public - boolean equals(Object obj) { - return this == obj; - - } - - @Override - public - String toString() { - return "Listener [type=" + getObjectType() + "]"; - } -} diff --git a/src/dorkbox/network/connection/RegisterRmiSystemListener.java b/src/dorkbox/network/connection/RegisterRmiSystemListener.java index 4f7ffaf0..a293d1a1 100644 --- a/src/dorkbox/network/connection/RegisterRmiSystemListener.java +++ b/src/dorkbox/network/connection/RegisterRmiSystemListener.java @@ -17,7 +17,7 @@ package dorkbox.network.connection; import dorkbox.network.rmi.RmiRegistration; -class RegisterRmiSystemListener extends ListenerRaw { +class RegisterRmiSystemListener implements Listener.OnMessageReceived { RegisterRmiSystemListener() { } diff --git a/src/dorkbox/network/connection/idle/IdleListener.java b/src/dorkbox/network/connection/idle/IdleListener.java index ac2f7d8b..00fcd1f9 100644 --- a/src/dorkbox/network/connection/idle/IdleListener.java +++ b/src/dorkbox/network/connection/idle/IdleListener.java @@ -16,14 +16,12 @@ package dorkbox.network.connection.idle; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ListenerRaw; - -public abstract -class IdleListener extends ListenerRaw { +import dorkbox.network.connection.Listener; +public +interface IdleListener extends Listener { /** * used by the Idle Sender */ - abstract void send(C connection, M message); } diff --git a/src/dorkbox/network/connection/idle/IdleListenerTCP.java b/src/dorkbox/network/connection/idle/IdleListenerTCP.java index e421cfb5..c8473b2c 100644 --- a/src/dorkbox/network/connection/idle/IdleListenerTCP.java +++ b/src/dorkbox/network/connection/idle/IdleListenerTCP.java @@ -18,7 +18,7 @@ package dorkbox.network.connection.idle; import dorkbox.network.connection.Connection; public -class IdleListenerTCP extends IdleListener { +class IdleListenerTCP implements IdleListener { /** * used by the Idle Sender @@ -31,6 +31,7 @@ class IdleListenerTCP extends IdleListener { * used by the Idle Sender */ @Override + public void send(C connection, M message) { connection.send() .TCP(message); diff --git a/src/dorkbox/network/connection/idle/IdleListenerUDP.java b/src/dorkbox/network/connection/idle/IdleListenerUDP.java index 2b2df7be..fa4ac6cf 100644 --- a/src/dorkbox/network/connection/idle/IdleListenerUDP.java +++ b/src/dorkbox/network/connection/idle/IdleListenerUDP.java @@ -18,7 +18,7 @@ package dorkbox.network.connection.idle; import dorkbox.network.connection.Connection; public -class IdleListenerUDP extends IdleListener { +class IdleListenerUDP implements IdleListener { /** * used by the Idle Sender @@ -31,6 +31,7 @@ class IdleListenerUDP extends IdleListener { * used by the Idle Sender */ @Override + public void send(C connection, M message) { connection.send() .UDP(message); diff --git a/src/dorkbox/network/connection/idle/IdleListenerUDT.java b/src/dorkbox/network/connection/idle/IdleListenerUDT.java index 9530738e..68fb13cd 100644 --- a/src/dorkbox/network/connection/idle/IdleListenerUDT.java +++ b/src/dorkbox/network/connection/idle/IdleListenerUDT.java @@ -18,7 +18,7 @@ package dorkbox.network.connection.idle; import dorkbox.network.connection.Connection; public -class IdleListenerUDT extends IdleListener { +class IdleListenerUDT implements IdleListener { /** * used by the Idle Sender @@ -31,6 +31,7 @@ class IdleListenerUDT extends IdleListener { * used by the Idle Sender */ @Override + public void send(C connection, M message) { connection.send() .UDT(message); diff --git a/src/dorkbox/network/connection/idle/IdleSender.java b/src/dorkbox/network/connection/idle/IdleSender.java index d3c0a42e..4990992a 100644 --- a/src/dorkbox/network/connection/idle/IdleSender.java +++ b/src/dorkbox/network/connection/idle/IdleSender.java @@ -16,12 +16,12 @@ package dorkbox.network.connection.idle; import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ListenerRaw; +import dorkbox.network.connection.Listener; import java.io.IOException; public abstract -class IdleSender extends ListenerRaw { +class IdleSender implements Listener.OnIdle { final IdleListener idleListener; volatile boolean started; diff --git a/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java new file mode 100644 index 00000000..dc8514ee --- /dev/null +++ b/src/dorkbox/network/connection/listenerManagement/OnConnectedManager.java @@ -0,0 +1,156 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection.listenerManagement; + +import com.esotericsoftware.kryo.util.IdentityMap; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionManager; +import dorkbox.network.connection.Listener.OnConnected; +import dorkbox.network.connection.Listener.OnError; +import dorkbox.util.collections.ConcurrentEntry; +import org.slf4j.Logger; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Called when the remote end has been connected. This will be invoked before any objects are received by the network. + * This method should not block for long periods as other network activity will not be processed + * until it returns. + */ +@SuppressWarnings("Duplicates") +public final +class OnConnectedManager { + private final Logger logger; + + // + // The iterators for IdentityMap are NOT THREAD SAFE! + // + // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. + private final IdentityMap, ConcurrentEntry> entries = new IdentityMap, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR); + private volatile ConcurrentEntry> head = null; // reference to the first element + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + private final Object lock = new Object(); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater REF = + AtomicReferenceFieldUpdater.newUpdater(OnConnectedManager.class, + ConcurrentEntry.class, + "head"); + + public + OnConnectedManager(final Logger logger) { + this.logger = logger; + } + + public void add(final OnConnected listener) { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot (single-writer-principle) + ConcurrentEntry head = REF.get(this); + + if (!entries.containsKey(listener)) { + head = new ConcurrentEntry(listener, head); + + entries.put(listener, head); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); + } + } + } + + /** + * @return true if the listener was removed, false otherwise + */ + public + boolean remove(final OnConnected listener) { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot (single-writer-principle) + ConcurrentEntry concurrentEntry = entries.get(listener); + + if (concurrentEntry != null) { + ConcurrentEntry head1 = REF.get(this); + + if (concurrentEntry == head1) { + // if it was second, now it's first + head1 = head1.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators + } + else { + concurrentEntry.remove(); + } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head1); + entries.remove(listener); + return true; + } else { + return false; + } + } + } + + /** + * @return true if a listener was found, false otherwise + */ + @SuppressWarnings("unchecked") + public + boolean notifyConnected(final C connection, final AtomicBoolean shutdown) { + ConcurrentEntry> head = REF.get(this); + ConcurrentEntry> current = head; + OnConnected listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + try { + listener.connected(connection); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e); + } + } + } + + return head != null; // true if we have something, otherwise false + } + + /** + * called on shutdown + */ + public + void clear() { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + this.entries.clear(); + this.head = null; + } + } +} diff --git a/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java b/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java new file mode 100644 index 00000000..9aa3022a --- /dev/null +++ b/src/dorkbox/network/connection/listenerManagement/OnDisconnectedManager.java @@ -0,0 +1,157 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection.listenerManagement; + +import com.esotericsoftware.kryo.util.IdentityMap; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionManager; +import dorkbox.network.connection.Listener.OnDisconnected; +import dorkbox.network.connection.Listener.OnError; +import dorkbox.util.collections.ConcurrentEntry; +import org.slf4j.Logger; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Called when the remote end has been connected. This will be invoked before any objects are received by the network. + * This method should not block for long periods as other network activity will not be processed + * until it returns. + */ +@SuppressWarnings("Duplicates") +public final +class OnDisconnectedManager { + private final Logger logger; + + // + // The iterators for IdentityMap are NOT THREAD SAFE! + // + // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. + private final IdentityMap, ConcurrentEntry> entries = new IdentityMap, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR); + private volatile ConcurrentEntry> head = null; // reference to the first element + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + private final Object lock = new Object(); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater REF = + AtomicReferenceFieldUpdater.newUpdater(OnDisconnectedManager.class, + ConcurrentEntry.class, + "head"); + + public + OnDisconnectedManager(final Logger logger) { + this.logger = logger; + } + + public void add(final OnDisconnected listener) { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot (single-writer-principle) + ConcurrentEntry head = REF.get(this); + + if (!entries.containsKey(listener)) { + head = new ConcurrentEntry(listener, head); + + entries.put(listener, head); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); + } + } + } + + /** + * @return true if the listener was removed, false otherwise + */ + public + boolean remove(final OnDisconnected listener) { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot (single-writer-principle) + ConcurrentEntry concurrentEntry = entries.get(listener); + + if (concurrentEntry != null) { + ConcurrentEntry head1 = REF.get(this); + + if (concurrentEntry == head1) { + // if it was second, now it's first + head1 = head1.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators + } + else { + concurrentEntry.remove(); + } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head1); + entries.remove(listener); + return true; + } else { + return false; + } + } + } + + + /** + * @return true if a listener was found, false otherwise + */ + @SuppressWarnings("unchecked") + public + boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) { + ConcurrentEntry> head = REF.get(this); + ConcurrentEntry> current = head; + OnDisconnected listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + try { + listener.disconnected(connection); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e); + } + } + } + + return head != null; // true if we have something, otherwise false + } + + /** + * called on shutdown + */ + public + void clear() { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + this.entries.clear(); + this.head = null; + } + } +} diff --git a/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java b/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java new file mode 100644 index 00000000..ccdcaa3c --- /dev/null +++ b/src/dorkbox/network/connection/listenerManagement/OnIdleManager.java @@ -0,0 +1,159 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection.listenerManagement; + +import com.esotericsoftware.kryo.util.IdentityMap; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionManager; +import dorkbox.network.connection.Listener.OnError; +import dorkbox.network.connection.Listener.OnIdle; +import dorkbox.util.collections.ConcurrentEntry; +import org.slf4j.Logger; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Called when the remote end has been connected. This will be invoked before any objects are received by the network. + * This method should not block for long periods as other network activity will not be processed + * until it returns. + */ +@SuppressWarnings("Duplicates") +public final +class OnIdleManager { + private final Logger logger; + + // + // The iterators for IdentityMap are NOT THREAD SAFE! + // + // This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove. + private final IdentityMap, ConcurrentEntry> entries = new IdentityMap, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR); + private volatile ConcurrentEntry> head = null; // reference to the first element + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + private final Object lock = new Object(); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater REF = + AtomicReferenceFieldUpdater.newUpdater(OnIdleManager.class, + ConcurrentEntry.class, + "head"); + + public + OnIdleManager(final Logger logger) { + this.logger = logger; + } + + public void add(final OnIdle listener) { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot (single-writer-principle) + ConcurrentEntry head = REF.get(this); + + if (!entries.containsKey(listener)) { + head = new ConcurrentEntry(listener, head); + + entries.put(listener, head); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head); + } + } + } + + /** + * @return true if the listener was removed, false otherwise + */ + public + boolean remove(final OnIdle listener) { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot (single-writer-principle) + ConcurrentEntry concurrentEntry = entries.get(listener); + + if (concurrentEntry != null) { + ConcurrentEntry head1 = REF.get(this); + + if (concurrentEntry == head1) { + // if it was second, now it's first + head1 = head1.next(); + //oldHead.clear(); // optimize for GC not possible because of potentially running iterators + } + else { + concurrentEntry.remove(); + } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, head1); + entries.remove(listener); + return true; + } + else { + return false; + } + } + } + + /** + * @return true if a listener was found, false otherwise + */ + @SuppressWarnings("unchecked") + public + boolean notifyIdle(final C connection, final AtomicBoolean shutdown) { + ConcurrentEntry> head = REF.get(this); + ConcurrentEntry> current = head; + OnIdle listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + try { + listener.idle(connection); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e); + } + + } + } + + return head != null; // true if we have something, otherwise false + } + + + /** + * called on shutdown + */ + public + void clear() { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + this.entries.clear(); + this.head = null; + } + } +} diff --git a/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java b/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java new file mode 100644 index 00000000..1dbf95a2 --- /dev/null +++ b/src/dorkbox/network/connection/listenerManagement/OnMessageReceivedManager.java @@ -0,0 +1,320 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.connection.listenerManagement; + +import com.esotericsoftware.kryo.util.IdentityMap; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionManager; +import dorkbox.network.connection.Listener.OnError; +import dorkbox.network.connection.Listener.OnMessageReceived; +import dorkbox.network.connection.Listener.SelfDefinedType; +import dorkbox.network.rmi.RmiMessages; +import dorkbox.util.ClassHelper; +import dorkbox.util.collections.ConcurrentEntry; +import dorkbox.util.collections.ConcurrentIterator; +import org.slf4j.Logger; + +import java.lang.reflect.Type; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static dorkbox.util.collections.ConcurrentIterator.headREF; + +/** + * Called when the remote end has been connected. This will be invoked before any objects are received by the network. + * This method should not block for long periods as other network activity will not be processed + * until it returns. + */ +@SuppressWarnings("Duplicates") +public final +class OnMessageReceivedManager { + private final Logger logger; + + // + // The iterators for IdentityMap are NOT THREAD SAFE! + // + @SuppressWarnings("unused") + private volatile IdentityMap listeners = new IdentityMap(32, ConnectionManager.LOAD_FACTOR); + + // Recommended for best performance while adhering to the "single writer principle". Must be static-final + private static final AtomicReferenceFieldUpdater REF = + AtomicReferenceFieldUpdater.newUpdater(OnMessageReceivedManager.class, + IdentityMap.class, + "listeners"); + + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + private final Object lock = new Object(); + + + public + OnMessageReceivedManager(final Logger logger) { + this.logger = logger; + } + + public void add(final OnMessageReceived listener) { + final Class type; + if (listener instanceof SelfDefinedType) { + type = ((SelfDefinedType) listener).getType(); + } + else { + type = identifyType(listener); + } + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot of the listeners (single-writer-principle) + @SuppressWarnings("unchecked") + final IdentityMap listeners = REF.get(this); + + ConcurrentIterator subscribedListeners = listeners.get(type); + if (subscribedListeners == null) { + subscribedListeners = new ConcurrentIterator(); + listeners.put(type, subscribedListeners); + } + + subscribedListeners.add(listener); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } + } + + /** + * @return true if the listener was removed, false otherwise + */ + public + boolean remove(final OnMessageReceived listener) { + final Class type; + if (listener instanceof SelfDefinedType) { + type = ((SelfDefinedType) listener).getType(); + } + else { + type = identifyType(listener); + } + + boolean found = false; + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot of the listeners (single-writer-principle) + @SuppressWarnings("unchecked") + final IdentityMap listeners = REF.get(this); + final ConcurrentIterator concurrentIterator = listeners.get(type); + if (concurrentIterator != null) { + concurrentIterator.remove(listener); + found = true; + } + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } + + return found; + } + + /** + * @return true if a listener was found, false otherwise + */ + @SuppressWarnings("unchecked") + public + boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) { + boolean found = false; + Class objectType = message.getClass(); + + + // this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version) + final IdentityMap listeners = REF.get(this); + ConcurrentIterator concurrentIterator = listeners.get(objectType); + + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + OnMessageReceived listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + try { + listener.received(connection, message); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", + objectType, + listener, + connection, + e); + } + } + } + + found = head != null; // true if we have something to publish to, otherwise false + } + + if (!(message instanceof RmiMessages)) { + // we march through all super types of the object, and find the FIRST set + // of listeners that are registered and cast it as that, and notify the method. + // NOTICE: we do NOT call ALL TYPE -- meaning, if we have Object->Foo->Bar + // and have listeners for Object and Foo + // we will call Bar (from the above code) + // we will call Foo (from this code) + // we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object! + + objectType = objectType.getSuperclass(); + while (objectType != null) { + // check to see if we have what we are looking for in our CURRENT class + concurrentIterator = listeners.get(objectType); + if (concurrentIterator != null) { + ConcurrentEntry> head = headREF.get(concurrentIterator); + ConcurrentEntry> current = head; + OnMessageReceived listener; + while (current != null && !shutdown.get()) { + listener = current.getValue(); + current = current.next(); + + try { + listener.received(connection, message); + } catch (Exception e) { + if (listener instanceof OnError) { + ((OnError) listener).error(connection, e); + } + else { + logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.", + objectType, + listener, + connection, + e); + } + } + } + + found = head != null; // true if we have something to publish to, otherwise false + break; + } + + // NO MATCH, so walk up. + objectType = objectType.getSuperclass(); + } + } + + return found; + } + + public + void removeAll() { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot of the listeners (single-writer-principle) + @SuppressWarnings("unchecked") + final IdentityMap listeners = REF.get(this); + + listeners.clear(); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } + } + + /** + * @return true if the listener was removed, false otherwise + */ + public + boolean removeAll(final Class classType) { + boolean found; + + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + // access a snapshot of the listeners (single-writer-principle) + @SuppressWarnings("unchecked") + final IdentityMap listeners = REF.get(this); + + found = listeners.remove(classType) != null; + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } + + return found; + } + + /** + * called on shutdown + */ + public + void clear() { + // synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this + // section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our + // use-case 99% of the time) + synchronized (lock) { + @SuppressWarnings("unchecked") + final IdentityMap listeners = REF.get(this); + + // The iterators for this map are NOT THREAD SAFE! + // using .entries() is what we are supposed to use! + final IdentityMap.Entries entries = listeners.entries(); + for (final IdentityMap.Entry next : entries) { + if (next.value != null) { + next.value.clear(); + } + } + + listeners.clear(); + + // save this snapshot back to the original (single writer principle) + REF.lazySet(this, listeners); + } + } + + + /** + * Gets the referenced object type for a specific listener, but ONLY necessary for listeners that receive messages + *

+ * This works for compile time code. The generic type parameter #2 (index 1) is pulled from type arguments. + * generic parameters cannot be primitive types + */ + private static Class identifyType(final Object listener) { + final Class clazz = listener.getClass(); + Class objectType = ClassHelper.getGenericParameterAsClassForSuperClass(clazz, 1); + + if (objectType != null) { + // SOMETIMES generics get confused on which parameter we actually mean (when sub-classing) + if (objectType != Object.class && ClassHelper.hasInterface(Connection.class, objectType)) { + Class objectType2 = ClassHelper.getGenericParameterAsClassForSuperClass(clazz, 2); + if (objectType2 != null) { + objectType = objectType2; + } + } + + return objectType; + } + else { + // there was no defined parameters + return Object.class; + } + } +} diff --git a/src/dorkbox/network/connection/ping/PingSystemListener.java b/src/dorkbox/network/connection/ping/PingSystemListener.java index 60a95571..82d733b8 100644 --- a/src/dorkbox/network/connection/ping/PingSystemListener.java +++ b/src/dorkbox/network/connection/ping/PingSystemListener.java @@ -16,10 +16,10 @@ package dorkbox.network.connection.ping; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.ListenerRaw; +import dorkbox.network.connection.Listener; public -class PingSystemListener extends ListenerRaw { +class PingSystemListener implements Listener.OnMessageReceived { public PingSystemListener() { diff --git a/src/dorkbox/network/rmi/RemoteInvocationResponse.java b/src/dorkbox/network/rmi/RemoteInvocationResponse.java new file mode 100644 index 00000000..c9dbb56b --- /dev/null +++ b/src/dorkbox/network/rmi/RemoteInvocationResponse.java @@ -0,0 +1,24 @@ +/* + * Copyright 2010 dorkbox, llc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dorkbox.network.rmi; + +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.Listener; + +abstract +class RemoteInvocationResponse implements Listener.OnDisconnected, + Listener.OnMessageReceived { +} diff --git a/src/dorkbox/network/rmi/RemoteObjectInvocationHandler.java b/src/dorkbox/network/rmi/RemoteObjectInvocationHandler.java index 2aa0986c..2f341190 100644 --- a/src/dorkbox/network/rmi/RemoteObjectInvocationHandler.java +++ b/src/dorkbox/network/rmi/RemoteObjectInvocationHandler.java @@ -37,7 +37,6 @@ package dorkbox.network.rmi; import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.ListenerRaw; import dorkbox.network.util.RMISerializationManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,20 +52,19 @@ import java.util.concurrent.locks.ReentrantLock; /** * Handles network communication when methods are invoked on a proxy. */ -public class RemoteObjectInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(RemoteObjectInvocationHandler.class); - final ReentrantLock lock = new ReentrantLock(); - final Condition responseCondition = this.lock.newCondition(); + private final ReentrantLock lock = new ReentrantLock(); + private final Condition responseCondition = this.lock.newCondition(); - final InvokeMethodResult[] responseTable = new InvokeMethodResult[64]; - final boolean[] pendingResponses = new boolean[64]; + private final InvokeMethodResult[] responseTable = new InvokeMethodResult[64]; + private final boolean[] pendingResponses = new boolean[64]; private final Connection connection; public final int objectID; private final String proxyString; - private final ListenerRaw responseListener; + private final RemoteInvocationResponse responseListener; private int timeoutMillis = 3000; private boolean isAsync = false; @@ -82,14 +80,13 @@ class RemoteObjectInvocationHandler implements InvocationHandler { private Byte lastResponseID; private byte nextResponseId = (byte) 1; - public RemoteObjectInvocationHandler(final Connection connection, final int objectID) { super(); this.connection = connection; this.objectID = objectID; this.proxyString = ""; - this.responseListener = new ListenerRaw() { + this.responseListener = new RemoteInvocationResponse() { @Override public void disconnected(Connection connection) { @@ -433,7 +430,7 @@ class RemoteObjectInvocationHandler implements InvocationHandler { throw new TimeoutException("Response timed out."); } - + private void close() { this.connection.listeners() .remove(this.responseListener); diff --git a/src/dorkbox/network/rmi/RmiBridge.java b/src/dorkbox/network/rmi/RmiBridge.java index 2cfc9cc2..65a0980a 100644 --- a/src/dorkbox/network/rmi/RmiBridge.java +++ b/src/dorkbox/network/rmi/RmiBridge.java @@ -38,7 +38,7 @@ import com.esotericsoftware.kryo.util.IntMap; import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.ListenerRaw; +import dorkbox.network.connection.Listener; import dorkbox.util.collections.ObjectIntMap; import org.slf4j.Logger; @@ -114,7 +114,7 @@ class RmiBridge { private final ObjectIntMap objectToID = new ObjectIntMap(); private final Executor executor; - private final ListenerRaw invokeListener = new ListenerRaw() { + private final Listener.OnMessageReceived invokeListener = new Listener.OnMessageReceived() { @SuppressWarnings("AutoBoxing") @Override public @@ -188,7 +188,7 @@ class RmiBridge { */ @SuppressWarnings("rawtypes") public - ListenerRaw getListener() { + Listener getListener() { return this.invokeListener; } diff --git a/test/dorkbox/network/ChunkedDataIdleTest.java b/test/dorkbox/network/ChunkedDataIdleTest.java index 7768d56b..1758cdbf 100644 --- a/test/dorkbox/network/ChunkedDataIdleTest.java +++ b/test/dorkbox/network/ChunkedDataIdleTest.java @@ -86,7 +86,7 @@ public class ChunkedDataIdleTest extends BaseTest { addEndPoint(server); server.setIdleTimeout(10); server.bind(false); - server.listeners().add(new Listener() { + server.listeners().add(new Listener.OnConnected() { @Override public void connected (Connection connection) { @@ -108,7 +108,7 @@ public class ChunkedDataIdleTest extends BaseTest { Client client = new Client(configuration); client.setIdleTimeout(10); addEndPoint(client); - client.listeners().add(new Listener() { + client.listeners().add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, Data object) { if (mainData.equals(object)) { diff --git a/test/dorkbox/network/ClientSendTest.java b/test/dorkbox/network/ClientSendTest.java index a4c90eba..97496512 100644 --- a/test/dorkbox/network/ClientSendTest.java +++ b/test/dorkbox/network/ClientSendTest.java @@ -52,7 +52,7 @@ class ClientSendTest extends BaseTest { server.bind(false); server.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, AMessage object) { @@ -67,7 +67,7 @@ class ClientSendTest extends BaseTest { client.connect(5000); client.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, AMessage object) { diff --git a/test/dorkbox/network/ConnectionTest.java b/test/dorkbox/network/ConnectionTest.java index 0ac5a916..58236451 100644 --- a/test/dorkbox/network/ConnectionTest.java +++ b/test/dorkbox/network/ConnectionTest.java @@ -141,7 +141,7 @@ class ConnectionTest extends BaseTest { server.bind(false); server.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { Timer timer = new Timer(); @Override @@ -156,7 +156,10 @@ class ConnectionTest extends BaseTest { } }, 1000); } + }); + server.listeners() + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, Object message) { System.err.println("Received message from client: " + message.getClass().getSimpleName()); @@ -178,7 +181,7 @@ class ConnectionTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnDisconnected() { @Override public void disconnected(Connection connection) { diff --git a/test/dorkbox/network/DiscoverHostTest.java b/test/dorkbox/network/DiscoverHostTest.java index 21cb149b..797dc585 100644 --- a/test/dorkbox/network/DiscoverHostTest.java +++ b/test/dorkbox/network/DiscoverHostTest.java @@ -58,7 +58,7 @@ class DiscoverHostTest extends BaseTest { Client client = new Client(configuration); addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(Connection connection) { diff --git a/test/dorkbox/network/IdleTest.java b/test/dorkbox/network/IdleTest.java index f9cc6729..70d780d6 100644 --- a/test/dorkbox/network/IdleTest.java +++ b/test/dorkbox/network/IdleTest.java @@ -23,7 +23,12 @@ import dorkbox.network.PingPongTest.TYPE; import dorkbox.network.connection.Connection; import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; -import dorkbox.network.connection.idle.*; +import dorkbox.network.connection.idle.IdleBridge; +import dorkbox.network.connection.idle.IdleListener; +import dorkbox.network.connection.idle.IdleListenerTCP; +import dorkbox.network.connection.idle.IdleListenerUDP; +import dorkbox.network.connection.idle.IdleListenerUDT; +import dorkbox.network.connection.idle.InputStreamSender; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -125,7 +130,7 @@ class IdleTest extends BaseTest { server.setIdleTimeout(100); server.bind(false); server.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public @@ -151,7 +156,7 @@ class IdleTest extends BaseTest { Client client = new Client(configuration); addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, Data object) { @@ -182,7 +187,7 @@ class IdleTest extends BaseTest { server.setIdleTimeout(100); server.bind(false); server.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(Connection connection) { @@ -243,7 +248,7 @@ class IdleTest extends BaseTest { Client client = new Client(configuration); addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { int total; @Override diff --git a/test/dorkbox/network/LargeResizeBufferTest.java b/test/dorkbox/network/LargeResizeBufferTest.java index 671023b9..acede3dd 100644 --- a/test/dorkbox/network/LargeResizeBufferTest.java +++ b/test/dorkbox/network/LargeResizeBufferTest.java @@ -59,7 +59,7 @@ class LargeResizeBufferTest extends BaseTest { server.bind(false); server.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { AtomicInteger received = new AtomicInteger(); AtomicInteger receivedBytes = new AtomicInteger(); @@ -86,7 +86,7 @@ class LargeResizeBufferTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { AtomicInteger received = new AtomicInteger(); AtomicInteger receivedBytes = new AtomicInteger(); diff --git a/test/dorkbox/network/ListenerTest.java b/test/dorkbox/network/ListenerTest.java index c42c5b06..a400ca71 100644 --- a/test/dorkbox/network/ListenerTest.java +++ b/test/dorkbox/network/ListenerTest.java @@ -19,7 +19,11 @@ */ package dorkbox.network; -import dorkbox.network.connection.*; +import dorkbox.network.connection.Connection; +import dorkbox.network.connection.ConnectionImpl; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; import dorkbox.network.rmi.RmiBridge; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -30,7 +34,10 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ListenerTest extends BaseTest { @@ -39,12 +46,18 @@ class ListenerTest extends BaseTest { private final int limit = 20; private AtomicInteger count = new AtomicInteger(0); - volatile String fail = null; - AtomicBoolean subClassWorkedOK = new AtomicBoolean(false); - AtomicBoolean subClassWorkedOK2 = new AtomicBoolean(false); - AtomicBoolean superClassWorkedOK = new AtomicBoolean(false); - AtomicBoolean superClass2WorkedOK = new AtomicBoolean(false); - AtomicBoolean disconnectWorkedOK = new AtomicBoolean(false); + AtomicBoolean checkFail1 = new AtomicBoolean(false); + AtomicBoolean checkFail2 = new AtomicBoolean(false); + + AtomicBoolean check1 = new AtomicBoolean(false); + AtomicBoolean check2 = new AtomicBoolean(false); + AtomicBoolean check3 = new AtomicBoolean(false); + AtomicBoolean check4 = new AtomicBoolean(false); + AtomicBoolean check5 = new AtomicBoolean(false); + AtomicBoolean check6 = new AtomicBoolean(false); + AtomicBoolean check7 = new AtomicBoolean(false); + AtomicBoolean check8 = new AtomicBoolean(false); + AtomicBoolean check9 = new AtomicBoolean(false); // quick and dirty test to also test connection sub-classing @@ -56,7 +69,7 @@ class ListenerTest extends BaseTest { public void check() { - ListenerTest.this.subClassWorkedOK.set(true); + ListenerTest.this.check1.set(true); } } @@ -70,10 +83,18 @@ class ListenerTest extends BaseTest { @Override public void check() { - ListenerTest.this.subClassWorkedOK.set(true); + ListenerTest.this.checkFail1.set(true); } } + abstract class SubListener implements Listener.OnMessageReceived { + } + + abstract class SubListener2 extends SubListener { + } + + abstract class SubListener3 implements Listener.OnMessageReceived, Listener.SelfDefinedType { + } @SuppressWarnings("rawtypes") @@ -94,76 +115,123 @@ class ListenerTest extends BaseTest { addEndPoint(server); server.bind(false); + final ListenerBridge listeners = server.listeners(); - server.listeners() - .add(new ListenerRaw() { - @Override - public - void received(TestConnectionA connection, String string) { - connection.check(); -// System.err.println("default check"); - connection.send() - .TCP(string); - } - }); + // standard listener + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(TestConnectionA connection, String string) { + connection.check(); + connection.send() + .TCP(string); + } + }); - server.listeners() - .add(new Listener() { - @Override - public - void received(Connection connection, String string) { -// System.err.println("subclass check"); - ListenerTest.this.subClassWorkedOK2.set(true); - } - }); + // standard listener with connection subclassed + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(Connection connection, String string) { + ListenerTest.this.check2.set(true); + } + }); - // should be able to happen! - server.listeners() - .add(new Listener() { - @Override - public - void received(Connection connection, Object string) { -// System.err.println("generic class check"); - ListenerTest.this.superClassWorkedOK.set(true); - } - }); + // standard listener with message subclassed + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(TestConnectionA connection, Object string) { + ListenerTest.this.check3.set(true); + } + }); + + // standard listener with connection subclassed AND message subclassed + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(Connection connection, Object string) { + ListenerTest.this.check4.set(true); + } + }); + + // standard listener with connection subclassed AND message subclassed NO GENERICS + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(Connection connection, Object string) { + ListenerTest.this.check5.set(true); + } + }); + + // subclassed listener with connection subclassed AND message subclassed NO GENERICS + listeners.add(new SubListener() { + @Override + public + void received(Connection connection, String string) { + ListenerTest.this.check6.set(true); + } + }); + + // subclassed listener with connection subclassed AND message subclassed NO GENERICS + listeners.add(new SubListener() { + @Override + public + void received(Connection connection, String string) { + ListenerTest.this.check6.set(true); + } + }); - // should be able to happen! - server.listeners() - .add(new ListenerRaw() { - @Override - public - void received(Connection connection, Object string) { -// System.err.println("generic class check"); - ListenerTest.this.superClass2WorkedOK.set(true); - } - }); + // subclassed listener with connection subclassed x 2 AND message subclassed NO GENERICS + listeners.add(new SubListener2() { + @Override + public + void received(Connection connection, String string) { + ListenerTest.this.check8.set(true); + } + }); + + + // subclassed listener with connection subclassed AND message subclassed NO GENERICS + listeners.add(new SubListener3() { + @Override + public + Class getType() { + return String.class; + } + + @Override + public + void received(Connection connection, String string) { + ListenerTest.this.check9.set(true); + } + }); + + + // standard listener disconnect check + listeners.add(new Listener.OnDisconnected() { + @Override + public + void disconnected(Connection connection) { + ListenerTest.this.check7.set(true); + } + }); - server.listeners() - .add(new Listener() { - @Override - public - void disconnected(Connection connection) { -// System.err.println("disconnect check"); - ListenerTest.this.disconnectWorkedOK.set(true); - } - }); // should not let this happen! try { - server.listeners() - .add(new ListenerRaw() { - @Override - public - void received(TestConnectionB connection, String string) { - connection.check(); - System.err.println(string); - connection.send() - .TCP(string); - } - }); - this.fail = "Should not be able to ADD listeners that are NOT the basetype or the interface"; + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(TestConnectionB connection, String string) { + connection.check(); + System.err.println(string); + connection.send() + .TCP(string); + } + }); + fail("Should not be able to ADD listeners that are NOT the basetype or the interface"); } catch (Exception e) { System.err.println("Successfully did NOT add listener that was not the base class"); } @@ -175,14 +243,17 @@ class ListenerTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(Connection connection) { connection.send() .TCP(ListenerTest.this.origString); // 20 a's } + }); + client.listeners() + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, String string) { @@ -193,7 +264,8 @@ class ListenerTest extends BaseTest { } else { if (!ListenerTest.this.origString.equals(string)) { - ListenerTest.this.fail = "original string not equal to the string received"; + checkFail2.set(true); + System.err.println("original string not equal to the string received"); } stopEndPoints(); } @@ -204,15 +276,20 @@ class ListenerTest extends BaseTest { client.connect(5000); waitForThreads(); - assertEquals(this.limit, this.count.get()); - assertTrue(this.subClassWorkedOK.get()); - assertTrue(this.subClassWorkedOK2.get()); - assertTrue(this.superClassWorkedOK.get()); - assertTrue(this.superClass2WorkedOK.get()); - assertTrue(this.disconnectWorkedOK.get()); - if (this.fail != null) { - fail(this.fail); - } + assertEquals(this.limit, this.count.get()); + + assertTrue(this.check1.get()); + assertTrue(this.check2.get()); + assertTrue(this.check3.get()); + assertTrue(this.check4.get()); + assertTrue(this.check5.get()); + assertTrue(this.check6.get()); + assertTrue(this.check7.get()); + assertTrue(this.check8.get()); + assertTrue(this.check9.get()); + + assertFalse(this.checkFail1.get()); + assertFalse(this.checkFail2.get()); } } diff --git a/test/dorkbox/network/MultipleServerTest.java b/test/dorkbox/network/MultipleServerTest.java index 7611a691..888f05e6 100644 --- a/test/dorkbox/network/MultipleServerTest.java +++ b/test/dorkbox/network/MultipleServerTest.java @@ -51,7 +51,7 @@ class MultipleServerTest extends BaseTest { server1.bind(false); server1.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, String object) { @@ -74,7 +74,7 @@ class MultipleServerTest extends BaseTest { addEndPoint(server2); server2.bind(false); server2.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, String object) { @@ -95,7 +95,7 @@ class MultipleServerTest extends BaseTest { Client client1 = new Client(configuration1); addEndPoint(client1); client1.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(Connection connection) { @@ -112,7 +112,7 @@ class MultipleServerTest extends BaseTest { Client client2 = new Client(configuration2); addEndPoint(client2); client2.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(Connection connection) { diff --git a/test/dorkbox/network/MultipleThreadTest.java b/test/dorkbox/network/MultipleThreadTest.java index 91ad9b79..58540b5b 100644 --- a/test/dorkbox/network/MultipleThreadTest.java +++ b/test/dorkbox/network/MultipleThreadTest.java @@ -22,6 +22,7 @@ package dorkbox.network; import dorkbox.network.connection.Connection; import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import org.junit.Test; @@ -70,53 +71,54 @@ class MultipleThreadTest extends BaseTest { server.bind(false); - server.listeners() - .add(new Listener() { + final ListenerBridge listeners = server.listeners(); + listeners.add(new Listener.OnConnected() { - @Override - public - void connected(final Connection connection) { - System.err.println("Client connected to server."); + @Override + public + void connected(final Connection connection) { + System.err.println("Client connected to server."); - // kickoff however many threads we need, and send data to the client. - for (int i = 1; i <= MultipleThreadTest.this.threadCount; i++) { - final int index = i; - new Thread() { - @Override - public - void run() { - for (int i = 1; i <= MultipleThreadTest.this.messageCount; i++) { - int incrementAndGet = MultipleThreadTest.this.sent.getAndIncrement(); - DataClass dataClass = new DataClass( - "Server -> client. Thread #" + index + " message# " + incrementAndGet, - incrementAndGet); + // kickoff however many threads we need, and send data to the client. + for (int i = 1; i <= MultipleThreadTest.this.threadCount; i++) { + final int index = i; + new Thread() { + @Override + public + void run() { + for (int i = 1; i <= MultipleThreadTest.this.messageCount; i++) { + int incrementAndGet = MultipleThreadTest.this.sent.getAndIncrement(); + DataClass dataClass = new DataClass("Server -> client. Thread #" + index + " message# " + incrementAndGet, + incrementAndGet); - //System.err.println(dataClass.data); - MultipleThreadTest.this.sentStringsToClientDebug.put(incrementAndGet, dataClass); - connection.send() - .TCP(dataClass) - .flush(); - } - } - }.start(); - } - } + //System.err.println(dataClass.data); + MultipleThreadTest.this.sentStringsToClientDebug.put(incrementAndGet, dataClass); + connection.send() + .TCP(dataClass) + .flush(); + } + } + }.start(); + } + } + }); - @Override - public - void received(Connection connection, DataClass object) { - int incrementAndGet = MultipleThreadTest.this.receivedServer.getAndIncrement(); + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(Connection connection, DataClass object) { + int incrementAndGet = MultipleThreadTest.this.receivedServer.getAndIncrement(); - //System.err.println("server #" + incrementAndGet); - if (incrementAndGet == serverReceiveTotal) { - System.err.println("Server DONE " + incrementAndGet); - stopEndPoints(); - synchronized (MultipleThreadTest.this.lock) { - MultipleThreadTest.this.lock.notifyAll(); - } - } - } - }); + //System.err.println("server #" + incrementAndGet); + if (incrementAndGet == serverReceiveTotal) { + System.err.println("Server DONE " + incrementAndGet); + stopEndPoints(); + synchronized (MultipleThreadTest.this.lock) { + MultipleThreadTest.this.lock.notifyAll(); + } + } + } + }); // ---- @@ -128,7 +130,7 @@ class MultipleThreadTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { final int clientIndex = index; final AtomicInteger received = new AtomicInteger(1); diff --git a/test/dorkbox/network/PingPongLocalTest.java b/test/dorkbox/network/PingPongLocalTest.java index 2738a27a..363068cb 100644 --- a/test/dorkbox/network/PingPongLocalTest.java +++ b/test/dorkbox/network/PingPongLocalTest.java @@ -22,6 +22,7 @@ package dorkbox.network; import dorkbox.network.connection.Connection; import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -33,9 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.fail; -public class PingPongLocalTest extends BaseTest { +public +class PingPongLocalTest extends BaseTest { + int tries = 10000; private volatile String fail; - int tries = 10000; @Test public void pingPongLocal() throws InitializationException, SecurityException, IOException, InterruptedException { @@ -50,53 +52,73 @@ public class PingPongLocalTest extends BaseTest { Server server = new Server(); addEndPoint(server); server.bind(false); - server.listeners().add(new Listener() { + final ListenerBridge listeners = server.listeners(); + listeners.add(new Listener.OnError() { @Override - public void error(Connection connection, Throwable throwable) { + public + void error(Connection connection, Throwable throwable) { PingPongLocalTest.this.fail = "Error during processing. " + throwable; } - + }); + listeners.add(new Listener.OnMessageReceived() { @Override - public void received(Connection connection, Data data) { + public + void received(Connection connection, Data data) { connection.id(); if (!data.equals(dataLOCAL)) { PingPongLocalTest.this.fail = "data is not equal on server."; throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail); } - connection.send().TCP(data); + connection.send() + .TCP(data); } }); - // ---- + // ---- Client client = new Client(); addEndPoint(client); - client.listeners().add(new Listener() { + final ListenerBridge listeners1 = client.listeners(); + listeners1.add(new Listener.OnConnected() { AtomicInteger check = new AtomicInteger(0); @Override - public void connected(Connection connection) { + public + void connected(Connection connection) { PingPongLocalTest.this.fail = null; - connection.send().TCP(dataLOCAL); + connection.send() + .TCP(dataLOCAL); // connection.sendUDP(dataUDP); // TCP and UDP are the same for a local channel. } + }); + + listeners1.add(new Listener.OnError() { + AtomicInteger check = new AtomicInteger(0); @Override - public void error(Connection connection, Throwable throwable) { + public + void error(Connection connection, Throwable throwable) { PingPongLocalTest.this.fail = "Error during processing. " + throwable; System.err.println(PingPongLocalTest.this.fail); } + }); + + listeners1.add(new Listener.OnMessageReceived() { + AtomicInteger check = new AtomicInteger(0); @Override - public void received(Connection connection, Data data) { + public + void received(Connection connection, Data data) { if (!data.equals(dataLOCAL)) { PingPongLocalTest.this.fail = "data is not equal on client."; throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail); } if (this.check.getAndIncrement() <= PingPongLocalTest.this.tries) { - connection.send().TCP(data); - } else { + connection.send() + .TCP(data); + } + else { System.err.println("Ran LOCAL " + PingPongLocalTest.this.tries + " times"); stopEndPoints(); } diff --git a/test/dorkbox/network/PingPongTest.java b/test/dorkbox/network/PingPongTest.java index ac6e26f8..08facc34 100644 --- a/test/dorkbox/network/PingPongTest.java +++ b/test/dorkbox/network/PingPongTest.java @@ -23,6 +23,7 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -75,136 +76,142 @@ class PingPongTest extends BaseTest { Server server = new Server(configuration); addEndPoint(server); server.bind(false); - server.listeners() - .add(new Listener() { - @Override - public - void error(Connection connection, Throwable throwable) { - PingPongTest.this.fail = "Error during processing. " + throwable; - } + final ListenerBridge listeners1 = server.listeners(); + listeners1.add(new Listener.OnError() { + @Override + public + void error(Connection connection, Throwable throwable) { + PingPongTest.this.fail = "Error during processing. " + throwable; + } + }); - @Override - public - void received(Connection connection, Data data) { - if (data.type == TYPE.TCP) { - if (!data.equals(dataTCP)) { - PingPongTest.this.fail = "TCP data is not equal on server."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } - connection.send() - .TCP(dataTCP); - } - else if (data.type == TYPE.UDP) { - if (!data.equals(dataUDP)) { - PingPongTest.this.fail = "UDP data is not equal on server."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } - connection.send() - .UDP(dataUDP); - } - else if (data.type == TYPE.UDT) { - if (!data.equals(dataUDT)) { - PingPongTest.this.fail = "UDT data is not equal on server."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } - connection.send() - .UDT(dataUDT); - } - else { - PingPongTest.this.fail = "Unknown data type on server."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } - } - }); + listeners1.add(new Listener.OnMessageReceived() { + @Override + public + void received(Connection connection, Data data) { + if (data.type == TYPE.TCP) { + if (!data.equals(dataTCP)) { + PingPongTest.this.fail = "TCP data is not equal on server."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } + connection.send() + .TCP(dataTCP); + } + else if (data.type == TYPE.UDP) { + if (!data.equals(dataUDP)) { + PingPongTest.this.fail = "UDP data is not equal on server."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } + connection.send() + .UDP(dataUDP); + } + else if (data.type == TYPE.UDT) { + if (!data.equals(dataUDT)) { + PingPongTest.this.fail = "UDT data is not equal on server."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } + connection.send() + .UDT(dataUDT); + } + else { + PingPongTest.this.fail = "Unknown data type on server."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } + } + }); // ---- Client client = new Client(configuration); addEndPoint(client); - client.listeners() - .add(new Listener() { - AtomicInteger checkTCP = new AtomicInteger(0); - AtomicInteger checkUDP = new AtomicInteger(0); - AtomicInteger checkUDT = new AtomicInteger(0); - AtomicBoolean doneTCP = new AtomicBoolean(false); - AtomicBoolean doneUDP = new AtomicBoolean(false); - AtomicBoolean doneUDT = new AtomicBoolean(false); + final ListenerBridge listeners = client.listeners(); + listeners.add(new Listener.OnConnected() { + @Override + public + void connected(Connection connection) { + PingPongTest.this.fail = null; + connection.send() + .TCP(dataTCP); + connection.send() + .UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost. + connection.send() + .UDT(dataUDT); + } + }); - @Override - public - void connected(Connection connection) { - PingPongTest.this.fail = null; - connection.send() - .TCP(dataTCP); - connection.send() - .UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost. - connection.send() - .UDT(dataUDT); - } + listeners.add(new Listener.OnError() { + @Override + public + void error(Connection connection, Throwable throwable) { + PingPongTest.this.fail = "Error during processing. " + throwable; + throwable.printStackTrace(); + } + }); - @Override - public - void error(Connection connection, Throwable throwable) { - PingPongTest.this.fail = "Error during processing. " + throwable; - throwable.printStackTrace(); - } + listeners.add(new Listener.OnMessageReceived() { + AtomicInteger checkTCP = new AtomicInteger(0); + AtomicInteger checkUDP = new AtomicInteger(0); + AtomicInteger checkUDT = new AtomicInteger(0); + AtomicBoolean doneTCP = new AtomicBoolean(false); + AtomicBoolean doneUDP = new AtomicBoolean(false); + AtomicBoolean doneUDT = new AtomicBoolean(false); - @Override - public - void received(Connection connection, Data data) { - if (data.type == TYPE.TCP) { - if (!data.equals(dataTCP)) { - PingPongTest.this.fail = "TCP data is not equal on client."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } - if (this.checkTCP.getAndIncrement() <= PingPongTest.this.tries) { - connection.send() - .TCP(dataTCP); - } - else { - System.err.println("TCP done."); - this.doneTCP.set(true); - } - } - else if (data.type == TYPE.UDP) { - if (!data.equals(dataUDP)) { - PingPongTest.this.fail = "UDP data is not equal on client."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } - if (this.checkUDP.getAndIncrement() <= PingPongTest.this.tries) { - connection.send() - .UDP(dataUDP); - } - else { - System.err.println("UDP done."); - this.doneUDP.set(true); - } - } - else if (data.type == TYPE.UDT) { - if (!data.equals(dataUDT)) { - PingPongTest.this.fail = "UDT data is not equal on client."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } - if (this.checkUDT.getAndIncrement() <= PingPongTest.this.tries) { - connection.send() - .UDT(dataUDT); - } - else { - System.err.println("UDT done."); - this.doneUDT.set(true); - } - } - else { - PingPongTest.this.fail = "Unknown data type on client."; - throw new RuntimeException("Fail! " + PingPongTest.this.fail); - } + @Override + public + void received(Connection connection, Data data) { + if (data.type == TYPE.TCP) { + if (!data.equals(dataTCP)) { + PingPongTest.this.fail = "TCP data is not equal on client."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } + if (this.checkTCP.getAndIncrement() <= PingPongTest.this.tries) { + connection.send() + .TCP(dataTCP); + } + else { + System.err.println("TCP done."); + this.doneTCP.set(true); + } + } + else if (data.type == TYPE.UDP) { + if (!data.equals(dataUDP)) { + PingPongTest.this.fail = "UDP data is not equal on client."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } + if (this.checkUDP.getAndIncrement() <= PingPongTest.this.tries) { + connection.send() + .UDP(dataUDP); + } + else { + System.err.println("UDP done."); + this.doneUDP.set(true); + } + } + else if (data.type == TYPE.UDT) { + if (!data.equals(dataUDT)) { + PingPongTest.this.fail = "UDT data is not equal on client."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } + if (this.checkUDT.getAndIncrement() <= PingPongTest.this.tries) { + connection.send() + .UDT(dataUDT); + } + else { + System.err.println("UDT done."); + this.doneUDT.set(true); + } + } + else { + PingPongTest.this.fail = "Unknown data type on client."; + throw new RuntimeException("Fail! " + PingPongTest.this.fail); + } - if (this.doneTCP.get() && this.doneUDP.get() && this.doneUDT.get()) { - System.err.println("Ran TCP, UDP, UDT " + PingPongTest.this.tries + " times each"); - stopEndPoints(); - } - } - }); + if (this.doneTCP.get() && this.doneUDP.get() && this.doneUDT.get()) { + System.err.println("Ran TCP, UDP, UDT " + PingPongTest.this.tries + " times each"); + stopEndPoints(); + } + } + }); client.connect(5000); diff --git a/test/dorkbox/network/ReconnectTest.java b/test/dorkbox/network/ReconnectTest.java index 76480dda..97bad79b 100644 --- a/test/dorkbox/network/ReconnectTest.java +++ b/test/dorkbox/network/ReconnectTest.java @@ -47,7 +47,7 @@ class ReconnectTest extends BaseTest { addEndPoint(server); server.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(final Connection connection) { @@ -68,7 +68,7 @@ class ReconnectTest extends BaseTest { final Client client = new Client(configuration); addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnDisconnected() { @Override public void disconnected(Connection connection) { diff --git a/test/dorkbox/network/ReuseTest.java b/test/dorkbox/network/ReuseTest.java index 2f4291c9..3f550703 100644 --- a/test/dorkbox/network/ReuseTest.java +++ b/test/dorkbox/network/ReuseTest.java @@ -21,6 +21,7 @@ package dorkbox.network; import dorkbox.network.connection.Connection; import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import org.junit.Test; @@ -48,47 +49,49 @@ class ReuseTest extends BaseTest { Server server = new Server(configuration); addEndPoint(server); - server.listeners() - .add(new Listener() { - @Override - public - void connected(Connection connection) { - connection.send() - .TCP("-- TCP from server"); - connection.send() - .UDP("-- UDP from server"); - } - - @Override - public - void received(Connection connection, String object) { - int incrementAndGet = ReuseTest.this.serverCount.incrementAndGet(); - System.err.println(" " + incrementAndGet + " : " + object); - } - }); + final ListenerBridge listeners = server.listeners(); + listeners.add(new Listener.OnConnected() { + @Override + public + void connected(Connection connection) { + connection.send() + .TCP("-- TCP from server"); + connection.send() + .UDP("-- UDP from server"); + } + }); + listeners.add(new Listener.OnMessageReceived() { + @Override + public + void received(Connection connection, String object) { + int incrementAndGet = ReuseTest.this.serverCount.incrementAndGet(); + System.err.println(" " + incrementAndGet + " : " + object); + } + }); // ---- Client client = new Client(configuration); addEndPoint(client); - client.listeners() - .add(new Listener() { - @Override - public - void connected(Connection connection) { - connection.send() - .TCP("-- TCP from client"); - connection.send() - .UDP("-- UDP from client"); - } - - @Override - public - void received(Connection connection, String object) { - int incrementAndGet = ReuseTest.this.clientCount.incrementAndGet(); - System.err.println(" " + incrementAndGet + " : " + object); - } - }); + final ListenerBridge listeners1 = client.listeners(); + listeners1.add(new Listener.OnConnected() { + @Override + public + void connected(Connection connection) { + connection.send() + .TCP("-- TCP from client"); + connection.send() + .UDP("-- UDP from client"); + } + }); + listeners1.add(new Listener.OnMessageReceived() { + @Override + public + void received(Connection connection, String object) { + int incrementAndGet = ReuseTest.this.clientCount.incrementAndGet(); + System.err.println(" " + incrementAndGet + " : " + object); + } + }); server.bind(false); int count = 10; @@ -122,14 +125,16 @@ class ReuseTest extends BaseTest { Server server = new Server(); addEndPoint(server); server.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(Connection connection) { connection.send() .TCP("-- LOCAL from server"); } - + }); + server.listeners() + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, String object) { @@ -143,14 +148,17 @@ class ReuseTest extends BaseTest { Client client = new Client(); addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(Connection connection) { connection.send() .TCP("-- LOCAL from client"); } + }); + client.listeners() + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, String object) { diff --git a/test/dorkbox/network/UnregisteredClassTest.java b/test/dorkbox/network/UnregisteredClassTest.java index d6cc465f..ff144ac5 100644 --- a/test/dorkbox/network/UnregisteredClassTest.java +++ b/test/dorkbox/network/UnregisteredClassTest.java @@ -23,6 +23,7 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; import org.junit.Test; @@ -66,13 +67,15 @@ class UnregisteredClassTest extends BaseTest { addEndPoint(server); server.bind(false); server.listeners() - .add(new Listener() { + .add(new Listener.OnError() { @Override public void error(Connection connection, Throwable throwable) { UnregisteredClassTest.this.fail = "Error during processing. " + throwable; } - + }); + server.listeners() + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, Data data) { @@ -101,70 +104,72 @@ class UnregisteredClassTest extends BaseTest { Client client = new Client(configuration); addEndPoint(client); - client.listeners() - .add(new Listener() { - AtomicInteger checkTCP = new AtomicInteger(0); - AtomicInteger checkUDP = new AtomicInteger(0); - AtomicBoolean doneTCP = new AtomicBoolean(false); - AtomicBoolean doneUDP = new AtomicBoolean(false); + final ListenerBridge listeners = client.listeners(); + listeners.add(new Listener.OnConnected() { + @Override + public + void connected(Connection connection) { + UnregisteredClassTest.this.fail = null; + connection.send() + .TCP(dataTCP); + connection.send() + .UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost. + } + }); + listeners.add(new Listener.OnError() { + @Override + public + void error(Connection connection, Throwable throwable) { + UnregisteredClassTest.this.fail = "Error during processing. " + throwable; + System.err.println(UnregisteredClassTest.this.fail); + } + }); + listeners.add(new Listener.OnMessageReceived() { + AtomicInteger checkTCP = new AtomicInteger(0); + AtomicInteger checkUDP = new AtomicInteger(0); + AtomicBoolean doneTCP = new AtomicBoolean(false); + AtomicBoolean doneUDP = new AtomicBoolean(false); - @Override - public - void connected(Connection connection) { - UnregisteredClassTest.this.fail = null; - connection.send() - .TCP(dataTCP); - connection.send() - .UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost. - } + @Override + public + void received(Connection connection, Data data) { + if (data.isTCP) { + if (!data.equals(dataTCP)) { + UnregisteredClassTest.this.fail = "TCP data is not equal on client."; + throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail); + } + if (this.checkTCP.getAndIncrement() <= UnregisteredClassTest.this.tries) { + connection.send() + .TCP(data); + UnregisteredClassTest.this.receivedTCP.incrementAndGet(); + } + else { + System.err.println("TCP done."); + this.doneTCP.set(true); + } + } + else { + if (!data.equals(dataUDP)) { + UnregisteredClassTest.this.fail = "UDP data is not equal on client."; + throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail); + } + if (this.checkUDP.getAndIncrement() <= UnregisteredClassTest.this.tries) { + connection.send() + .UDP(data); + UnregisteredClassTest.this.receivedUDP.incrementAndGet(); + } + else { + System.err.println("UDP done."); + this.doneUDP.set(true); + } + } - @Override - public - void error(Connection connection, Throwable throwable) { - UnregisteredClassTest.this.fail = "Error during processing. " + throwable; - System.err.println(UnregisteredClassTest.this.fail); - } - - @Override - public - void received(Connection connection, Data data) { - if (data.isTCP) { - if (!data.equals(dataTCP)) { - UnregisteredClassTest.this.fail = "TCP data is not equal on client."; - throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail); - } - if (this.checkTCP.getAndIncrement() <= UnregisteredClassTest.this.tries) { - connection.send() - .TCP(data); - UnregisteredClassTest.this.receivedTCP.incrementAndGet(); - } - else { - System.err.println("TCP done."); - this.doneTCP.set(true); - } - } - else { - if (!data.equals(dataUDP)) { - UnregisteredClassTest.this.fail = "UDP data is not equal on client."; - throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail); - } - if (this.checkUDP.getAndIncrement() <= UnregisteredClassTest.this.tries) { - connection.send() - .UDP(data); - UnregisteredClassTest.this.receivedUDP.incrementAndGet(); - } - else { - System.err.println("UDP done."); - this.doneUDP.set(true); - } - } - - if (this.doneTCP.get() && this.doneUDP.get()) { - System.err.println("Ran TCP & UDP " + UnregisteredClassTest.this.tries + " times each"); - stopEndPoints(); - } - } - }); + if (this.doneTCP.get() && this.doneUDP.get()) { + System.err.println("Ran TCP & UDP " + UnregisteredClassTest.this.tries + " times each"); + stopEndPoints(); + } + } + }); client.connect(5000); waitForThreads(); diff --git a/test/dorkbox/network/rmi/RmiGlobalTest.java b/test/dorkbox/network/rmi/RmiGlobalTest.java index 22dc1d25..a6b2c7b7 100644 --- a/test/dorkbox/network/rmi/RmiGlobalTest.java +++ b/test/dorkbox/network/rmi/RmiGlobalTest.java @@ -229,13 +229,16 @@ class RmiGlobalTest extends BaseTest { server.bind(false); server.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(final Connection connection) { RmiGlobalTest.runTest(connection, globalRemoteClientObject, CLIENT_GLOBAL_OBJECT_ID); } + }); + server.listeners() + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, MessageWithTestObject m) { @@ -260,7 +263,7 @@ class RmiGlobalTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, MessageWithTestObject m) { diff --git a/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java b/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java index 34d0d1cd..7bbd77ae 100644 --- a/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java +++ b/test/dorkbox/network/rmi/RmiSendObjectOverrideMethodTest.java @@ -86,7 +86,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { server.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, OtherObjectImpl object) { @@ -108,7 +108,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(final Connection connection) { diff --git a/test/dorkbox/network/rmi/RmiSendObjectTest.java b/test/dorkbox/network/rmi/RmiSendObjectTest.java index 54f1f184..535a8758 100644 --- a/test/dorkbox/network/rmi/RmiSendObjectTest.java +++ b/test/dorkbox/network/rmi/RmiSendObjectTest.java @@ -84,7 +84,7 @@ class RmiSendObjectTest extends BaseTest { server.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, OtherObjectImpl object) { @@ -104,7 +104,7 @@ class RmiSendObjectTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnConnected() { @Override public void connected(final Connection connection) { diff --git a/test/dorkbox/network/rmi/RmiTest.java b/test/dorkbox/network/rmi/RmiTest.java index e2a7567c..c0e34df5 100644 --- a/test/dorkbox/network/rmi/RmiTest.java +++ b/test/dorkbox/network/rmi/RmiTest.java @@ -41,6 +41,7 @@ import dorkbox.network.Server; import dorkbox.network.connection.Connection; import dorkbox.network.connection.KryoCryptoSerializationManager; import dorkbox.network.connection.Listener; +import dorkbox.network.connection.ListenerBridge; import dorkbox.network.util.CryptoSerializationManager; import dorkbox.util.exceptions.InitializationException; import dorkbox.util.exceptions.SecurityException; @@ -210,27 +211,29 @@ class RmiTest extends BaseTest { addEndPoint(server); server.bind(false); - server.listeners() - .add(new Listener() { - @Override - public - void connected(final Connection connection) { - System.err.println("Starting test for: Server -> Client"); - RmiTest.runTest(connection, 1); - } + final ListenerBridge listeners = server.listeners(); + listeners.add(new Listener.OnConnected() { + @Override + public + void connected(final Connection connection) { + System.err.println("Starting test for: Server -> Client"); + RmiTest.runTest(connection, 1); + } + }); + listeners.add(new Listener.OnMessageReceived() { - @Override - public - void received(Connection connection, MessageWithTestObject m) { - TestObject object = m.testObject; - final int id = object.id(); - assertEquals(2, id); - System.err.println("Client -> Server Finished!"); + @Override + public + void received(Connection connection, MessageWithTestObject m) { + TestObject object = m.testObject; + final int id = object.id(); + assertEquals(2, id); + System.err.println("Client -> Server Finished!"); - stopEndPoints(2000); - } + stopEndPoints(2000); + } - }); + }); // ---- @@ -241,7 +244,7 @@ class RmiTest extends BaseTest { addEndPoint(client); client.listeners() - .add(new Listener() { + .add(new Listener.OnMessageReceived() { @Override public void received(Connection connection, MessageWithTestObject m) {