Refactored listener dispatch events

(connect/disconnect/idle/message-received) so that they are now
interfaces, and the backing implementation uses the single-writer
principle
This commit is contained in:
nathan 2016-04-03 18:58:38 +02:00
parent 55dd4c4d55
commit 76599e370e
38 changed files with 1575 additions and 872 deletions

View File

@ -772,7 +772,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
@SuppressWarnings("rawtypes")
@Override
public final
void add(ListenerRaw listener) {
void add(Listener listener) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --
@ -812,7 +812,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements ICryptoConn
@SuppressWarnings("rawtypes")
@Override
public final
void remove(ListenerRaw listener) {
void remove(Listener listener) {
if (this.endPoint instanceof EndPointServer) {
// when we are a server, NORMALLY listeners are added at the GLOBAL level
// meaning --

View File

@ -18,21 +18,20 @@ package dorkbox.network.connection;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.bridge.ConnectionBridgeServer;
import dorkbox.network.connection.bridge.ConnectionExceptSpecifiedBridgeServer;
import dorkbox.network.rmi.RmiMessages;
import dorkbox.network.connection.listenerManagement.OnConnectedManager;
import dorkbox.network.connection.listenerManagement.OnDisconnectedManager;
import dorkbox.network.connection.listenerManagement.OnIdleManager;
import dorkbox.network.connection.listenerManagement.OnMessageReceivedManager;
import dorkbox.util.ClassHelper;
import dorkbox.util.Property;
import dorkbox.util.collections.ConcurrentEntry;
import dorkbox.util.collections.ConcurrentIterator;
import org.slf4j.Logger;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static dorkbox.util.collections.ConcurrentIterator.headREF;
// .equals() compares the identity on purpose,this because we cannot create two separate objects that are somehow equal to each other.
@SuppressWarnings("unchecked")
public
@ -44,27 +43,29 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@Property
public static final float LOAD_FACTOR = 0.8F;
private static Listener<?> unRegisteredType_Listener = null;
private final String loggerName;
@SuppressWarnings("unused")
private volatile IdentityMap<Connection, ConnectionManager<C>> localManagers = new IdentityMap<Connection, ConnectionManager<C>>(8, ConnectionManager.LOAD_FACTOR);
@SuppressWarnings("unused")
private volatile IdentityMap<Type, ConcurrentIterator> listeners = new IdentityMap<Type, ConcurrentIterator>(32, LOAD_FACTOR);
private final OnConnectedManager<C> onConnectedManager;
private final OnDisconnectedManager<C> onDisconnectedManager;
private final OnIdleManager<C> onIdleManager;
private final OnMessageReceivedManager<C> onMessageReceivedManager;
@SuppressWarnings({"FieldCanBeLocal", "unused"})
private volatile ConcurrentEntry<C> connectionsHead = null; // reference to the first element
// This is only touched by a single thread, maintains a map of entries for FAST lookup during connection remove.
// This is ONLY touched by a single thread, maintains a map of entries for FAST lookup during connection remove.
private final IdentityMap<C, ConcurrentEntry> connectionEntries = new IdentityMap<C, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
@SuppressWarnings("unused")
private volatile IdentityMap<Connection, ConnectionManager<C>> localManagers = new IdentityMap<Connection, ConnectionManager<C>>(8, ConnectionManager.LOAD_FACTOR);
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object singleWriterLock1 = new Object();
private final Object singleWriterLock2 = new Object();
private final Object singleWriterLock3 = new Object();
@ -75,11 +76,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
IdentityMap.class,
"localManagers");
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, IdentityMap> listenersREF =
AtomicReferenceFieldUpdater.newUpdater(ConnectionManager.class,
IdentityMap.class,
"listeners");
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<ConnectionManager, ConcurrentEntry> connectionsREF =
@ -93,14 +90,19 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*/
private final Class<?> baseClass;
protected final org.slf4j.Logger logger;
volatile boolean shutdown = false;
private final AtomicBoolean hasAddedAtLeastOnce = new AtomicBoolean(false);
final AtomicBoolean shutdown = new AtomicBoolean(false);
public
ConnectionManager(final String loggerName, final Class<?> baseClass) {
this.loggerName = loggerName;
this.logger = org.slf4j.LoggerFactory.getLogger(loggerName);
this.baseClass = baseClass;
onConnectedManager = new OnConnectedManager<C>(logger);
onDisconnectedManager = new OnDisconnectedManager<C>(logger);
onIdleManager = new OnIdleManager<C>(logger);
onMessageReceivedManager = new OnMessageReceivedManager<C>(logger);
}
/**
@ -115,16 +117,21 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@SuppressWarnings("rawtypes")
@Override
public final
void add(final ListenerRaw listener) {
void add(final Listener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener cannot be null.");
}
// find the class that uses Listener.class.
Class<?> clazz = listener.getClass();
while (clazz.getSuperclass() != ListenerRaw.class) {
clazz = clazz.getSuperclass();
}
Class<?>[] interfaces = clazz.getInterfaces();
// for (Class<?> anInterface : interfaces) {
// }
//
// while (!(clazz.getSuperclass() != Object.class)) {
// clazz = clazz.getSuperclass();
// }
// this is the connection generic parameter for the listener
Class<?> genericClass = ClassHelper.getGenericParameterAsClassForSuperClass(clazz, 0);
@ -152,34 +159,39 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private
void addListener0(final ListenerRaw listener) {
Class<?> type = listener.getObjectType();
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
ConcurrentIterator subscribedListeners = listeners.get(type);
if (subscribedListeners == null) {
subscribedListeners = new ConcurrentIterator();
listeners.put(type, subscribedListeners);
}
subscribedListeners.add(listener);
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
void addListener0(final Listener listener) {
boolean found = false;
if (listener instanceof Listener.OnConnected) {
onConnectedManager.add((Listener.OnConnected<C>) listener);
found = true;
}
if (listener instanceof Listener.OnDisconnected) {
onDisconnectedManager.add((Listener.OnDisconnected<C>) listener);
found = true;
}
if (listener instanceof Listener.OnIdle) {
onIdleManager.add((Listener.OnIdle<C>) listener);
found = true;
}
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("listener added: {} <{}>",
if (listener instanceof Listener.OnMessageReceived) {
onMessageReceivedManager.add((Listener.OnMessageReceived<C, Object>) listener);
found = true;
}
final Logger logger2 = this.logger;
if (!found) {
logger2.error("No matching listener types. Unable to add listener: {}",
listener.getClass()
.getName(),
listener.getObjectType());
.getName());
}
else {
hasAddedAtLeastOnce.set(true);
if (logger2.isTraceEnabled()) {
logger2.trace("listener added: {}",
listener.getClass()
.getName());
}
}
}
@ -195,35 +207,36 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@SuppressWarnings("rawtypes")
@Override
public final
void remove(final ListenerRaw listener) {
void remove(final Listener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener cannot be null.");
}
Class<?> type = listener.getObjectType();
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
final ConcurrentIterator concurrentIterator = listeners.get(type);
if (concurrentIterator != null) {
concurrentIterator.remove(listener);
}
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
boolean found = false;
if (listener instanceof Listener.OnConnected) {
found = onConnectedManager.remove((Listener.OnConnected<C>) listener);
}
if (listener instanceof Listener.OnDisconnected) {
found |= onDisconnectedManager.remove((Listener.OnDisconnected<C>) listener);
}
if (listener instanceof Listener.OnIdle) {
found |= onIdleManager.remove((Listener.OnIdle<C>) listener);
}
if (listener instanceof Listener.OnMessageReceived) {
found |= onMessageReceivedManager.remove((Listener.OnMessageReceived<C, Object>) listener);
}
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("listener removed: {} <{}>",
final Logger logger2 = this.logger;
if (!found) {
logger2.error("No matching listener types. Unable to remove listener: {}",
listener.getClass()
.getName(),
listener.getObjectType());
.getName());
}
else if (logger2.isTraceEnabled()) {
logger2.trace("listener removed: {}",
listener.getClass()
.getName());
}
}
@ -234,22 +247,11 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@Override
public final
void removeAll() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
listeners.clear();
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
}
onMessageReceivedManager.removeAll();
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("all listeners removed !!");
logger2.trace("ALL listeners removed !!");
}
}
@ -265,22 +267,15 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
throw new IllegalArgumentException("classType cannot be null.");
}
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (singleWriterLock1) {
// access a snapshot of the listeners (single-writer-principle)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
listeners.remove(classType);
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
}
Logger logger2 = this.logger;
if (logger2.isTraceEnabled()) {
logger2.trace("all listeners removed for type: {}",
final Logger logger2 = this.logger;
if (onMessageReceivedManager.removeAll(classType)) {
if (logger2.isTraceEnabled()) {
logger2.trace("All listeners removed for type: {}",
classType.getClass()
.getName());
}
} else {
logger2.warn("No listeners found to remove for type: {}",
classType.getClass()
.getName());
}
@ -304,85 +299,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@SuppressWarnings("Duplicates")
private
boolean notifyOnMessage0(final C connection, final Object message, boolean foundListener) {
Class<?> objectType = message.getClass();
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
ConcurrentIterator concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) {
return true;
}
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
listener.error(connection, e);
}
}
foundListener = head != null; // true if we have something to publish to, otherwise false
}
if (!(message instanceof RmiMessages)) {
// we march through all super types of the object, and find the FIRST set
// of listeners that are registered and cast it as that, and notify the method.
// NOTICE: we do NOT call ALL TYPE -- meaning, if we have Object->Foo->Bar
// and have listeners for Object and Foo
// we will call Bar (from the above code)
// we will call Foo (from this code)
// we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object!
objectType = objectType.getSuperclass();
while (objectType != null) {
// check to see if we have what we are looking for in our CURRENT class
concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) {
return true;
}
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
listener.error(connection, e);
}
}
foundListener = head != null; // true if we have something to publish to, otherwise false
break;
}
// NO MATCH, so walk up.
objectType = objectType.getSuperclass();
}
}
foundListener |= onMessageReceivedManager.notifyReceived(connection, message, shutdown);
// now have to account for additional connection listener managers (non-global).
// access a snapshot of the managers (single-writer-principle)
@ -400,9 +317,6 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
connection.send()
.flush();
}
else if (unRegisteredType_Listener != null) {
unRegisteredType_Listener.received(connection, null);
}
else {
Logger logger2 = this.logger;
if (logger2.isErrorEnabled()) {
@ -422,35 +336,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@Override
public final
void notifyOnIdle(final C connection) {
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
boolean foundListener = false;
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries(); // entries is necessary for multiple threads
for (IdentityMap.Entry<Type, ConcurrentIterator> entry : entries) {
if (entry != null && entry.value != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(entry.value);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) {
return;
}
listener = current.getValue();
current = current.next();
try {
listener.idle(connection);
} catch (Exception e) {
logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e);
listener.error(connection, e);
}
}
foundListener |= head != null; // true if we have something to publish to, otherwise false
}
}
boolean foundListener = onIdleManager.notifyIdle(connection, shutdown);
if (foundListener) {
connection.send()
@ -477,34 +363,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
void connectionConnected(final C connection) {
addConnection(connection);
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
boolean foundListener = false;
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries();
for (IdentityMap.Entry<Type, ConcurrentIterator> entry : entries) {
if (entry != null && entry.value != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(entry.value);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) {
return;
}
listener = current.getValue();
current = current.next();
try {
listener.connected(connection);
} catch (Exception e) {
logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e);
listener.error(connection, e);
}
}
foundListener |= head != null; // true if we have something to publish to, otherwise false
}
}
boolean foundListener = onConnectedManager.notifyConnected(connection, shutdown);
if (foundListener) {
connection.send()
@ -529,41 +388,13 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
@Override
public
void connectionDisconnected(final C connection) {
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
boolean foundListener = false;
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries(); // entries is necessary for multiple threads
for (IdentityMap.Entry<Type, ConcurrentIterator> entry : entries) {
if (entry != null && entry.value != null) {
ConcurrentEntry<ListenerRaw<C, Object>> head = headREF.get(entry.value);
ConcurrentEntry<ListenerRaw<C, Object>> current = head;
ListenerRaw<C, Object> listener;
while (current != null) {
if (this.shutdown) {
return;
}
listener = current.getValue();
current = current.next();
try {
listener.disconnected(connection);
} catch (Exception e) {
logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e);
listener.error(connection, e);
}
}
foundListener |= head != null; // true if we have something to publish to, otherwise false
}
}
boolean foundListener = onDisconnectedManager.notifyDisconnected(connection, shutdown);
if (foundListener) {
connection.send()
.flush();
}
// now have to account for additional (local) listener managers.
// access a snapshot of the managers (single-writer-principle)
@ -614,6 +445,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*
* @param connection the connection to remove
*/
private
void removeConnection(C connection) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
@ -676,7 +508,8 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
manager = localManagers.get(connection);
if (manager == null) {
created = true;
manager = new ConnectionManager<C>(loggerName + "-" + connection.toString() + " Specific", ConnectionManager.this.baseClass);
manager = new ConnectionManager<C>(loggerName + "-" + connection.toString() + " Specific",
ConnectionManager.this.baseClass);
localManagers.put(connection, manager);
// save this snapshot back to the original (single writer principle)
@ -746,7 +579,7 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*/
final
boolean hasListeners() {
return listenersREF.get(this).size == 0;
return hasAddedAtLeastOnce.get();
}
/**
@ -754,24 +587,15 @@ class ConnectionManager<C extends Connection> implements ListenerBridge, ISessio
*/
final
void stop() {
this.shutdown = true;
this.shutdown.set(true);
// disconnect the sessions
closeConnections();
synchronized (singleWriterLock1) {
final IdentityMap<Type, ConcurrentIterator> listeners = listenersREF.get(this);
final Iterator<ConcurrentIterator> iterator = listeners.values()
.iterator();
while (iterator.hasNext()) {
final ConcurrentIterator next = iterator.next();
next.clear();
iterator.remove();
}
// save this snapshot back to the original (single writer principle)
listenersREF.lazySet(this, listeners);
}
onConnectedManager.clear();
onDisconnectedManager.clear();
onIdleManager.clear();
onMessageReceivedManager.clear();
}
/**

View File

@ -296,7 +296,7 @@ class EndPoint<C extends Connection> {
public
void run() {
// connectionManager.shutdown accurately reflects the state of the app. Safe to use here
if (EndPoint.this.connectionManager != null && !EndPoint.this.connectionManager.shutdown) {
if (EndPoint.this.connectionManager != null && !EndPoint.this.connectionManager.shutdown.get()) {
EndPoint.this.stop();
}
}
@ -364,7 +364,7 @@ class EndPoint<C extends Connection> {
}
/**
* The amount of milli-seconds that must elapse with no read or write before {@link ListenerRaw#idle(Connection)} }
* The amount of milli-seconds that must elapse with no read or write before {@link Listener.OnIdle#idle(Connection)} }
* will be triggered
*/
public

View File

@ -15,10 +15,82 @@
*/
package dorkbox.network.connection;
public abstract
class Listener<M extends Object> extends ListenerRaw<Connection, M> {
public
Listener() {
super(0);
import java.io.IOException;
public
interface Listener {
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
interface OnConnected<C extends Connection> extends Listener {
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
void connected(C connection);
}
/**
* Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method.
* <p/>
* Do not write data in this method! The channel can already be closed, resulting in an error if you attempt to do so.
*/
interface OnDisconnected<C extends Connection> extends Listener {
/**
* Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method.
* <p/>
* Do not write data in this method! The channel can already be closed, resulting in an error if you attempt to do so.
*/
void disconnected(C connection);
}
/**
* Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise)
*/
interface OnError<C extends Connection> extends Listener {
/**
* Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise).
*
* The error is sent to an error log before this method is called.
*/
void error(C connection, Throwable throwable);
}
/**
* Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold.
*/
interface OnIdle<C extends Connection> extends Listener {
/**
* Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold.
*/
void idle(C connection) throws IOException;
}
/**
* Called when an object has been received from the remote end of the connection.
* This method should not block for long periods as other network activity will not be processed until it returns.
*/
interface OnMessageReceived<C extends Connection, M extends Object> extends Listener {
void received(C connection, M message);
}
/**
* Permits a listener to specify it's own referenced object type, if passing in a generic parameter doesn't work. This is necessary since
* the system looks up incoming message types to determine what listeners to dispatch them to.
*/
interface SelfDefinedType {
/**
* Permits a listener to specify it's own referenced object type, if passing in a generic parameter doesn't work. This is necessary since
* the system looks up incoming message types to determine what listeners to dispatch them to.
*/
Class<?> getType();
}
}

View File

@ -38,7 +38,7 @@ interface ListenerBridge {
* the connection is notified on that event (ie, admin type listeners)
*/
@SuppressWarnings("rawtypes")
void add(ListenerRaw listener);
void add(Listener listener);
/**
* Removes a listener from this connection/endpoint to NO LONGER be notified
@ -53,7 +53,7 @@ interface ListenerBridge {
* the connection is removed
*/
@SuppressWarnings("rawtypes")
void remove(ListenerRaw listener);
void remove(Listener listener);
/**
* Removes all registered listeners from this connection/endpoint to NO

View File

@ -1,142 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection;
import dorkbox.util.ClassHelper;
import java.io.IOException;
public abstract
class ListenerRaw<C extends Connection, M extends Object> {
private final Class<?> objectType;
// for compile time code. The generic type parameter #2 (index 1) is pulled from type arguments.
// generic parameters cannot be primitive types
public
ListenerRaw() {
this(1);
}
// for sub-classed listeners, we might have to specify which parameter to use.
protected
ListenerRaw(int lastParameterIndex) {
if (lastParameterIndex > -1) {
@SuppressWarnings("rawtypes")
Class<? extends ListenerRaw> class1 = getClass();
Class<?> objectType = ClassHelper.getGenericParameterAsClassForSuperClass(class1, lastParameterIndex);
if (objectType != null) {
// SOMETIMES generics get confused on which parameter we actually mean (when sub-classing)
if (objectType != Object.class && ClassHelper.hasInterface(Connection.class, objectType)) {
Class<?> objectType2 = ClassHelper.getGenericParameterAsClassForSuperClass(class1, lastParameterIndex + 1);
if (objectType2 != null) {
objectType = objectType2;
}
}
this.objectType = objectType;
}
else {
this.objectType = Object.class;
}
}
else {
// for when we want to override it
this.objectType = Object.class;
}
}
/**
* Gets the referenced object type.
* <p/>
* non-final so this can be overridden by listeners that aren't able to define their type as a generic parameter
*/
public
Class<?> getObjectType() {
return this.objectType;
}
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
@SuppressWarnings("unused")
public
void connected(C connection) {
}
/**
* Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method.
* <p/>
* Do not write data in this method! The channel can be closed, resulting in an error if you attempt to do so.
*/
@SuppressWarnings("unused")
public
void disconnected(C connection) {
}
/**
* Called when an object has been received from the remote end of the connection.
* This method should not block for long periods as other network activity will not be processed until it returns.
*/
@SuppressWarnings("unused")
public
void received(C connection, M message) {
}
/**
* Called when the connection is idle for longer than the {@link EndPoint#setIdleTimeout(int)} idle threshold.
*/
@SuppressWarnings("unused")
public
void idle(C connection) throws IOException {
}
/**
* Called when there is an error of some kind during the up/down stream process (to/from the socket or otherwise)
*/
@SuppressWarnings("unused")
public
void error(C connection, Throwable throwable) {
throwable.printStackTrace();
}
@Override
public
int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (this.objectType == null ? 0 : this.objectType.hashCode());
return result;
}
// only possible way for it to be equal, is if it is the same object
@Override
public
boolean equals(Object obj) {
return this == obj;
}
@Override
public
String toString() {
return "Listener [type=" + getObjectType() + "]";
}
}

View File

@ -17,7 +17,7 @@ package dorkbox.network.connection;
import dorkbox.network.rmi.RmiRegistration;
class RegisterRmiSystemListener extends ListenerRaw<ConnectionImpl, RmiRegistration> {
class RegisterRmiSystemListener implements Listener.OnMessageReceived<ConnectionImpl, RmiRegistration> {
RegisterRmiSystemListener() {
}

View File

@ -16,14 +16,12 @@
package dorkbox.network.connection.idle;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ListenerRaw;
public abstract
class IdleListener<C extends Connection, M> extends ListenerRaw<C, M> {
import dorkbox.network.connection.Listener;
public
interface IdleListener<C extends Connection, M> extends Listener {
/**
* used by the Idle Sender
*/
abstract
void send(C connection, M message);
}

View File

@ -18,7 +18,7 @@ package dorkbox.network.connection.idle;
import dorkbox.network.connection.Connection;
public
class IdleListenerTCP<C extends Connection, M> extends IdleListener<C, M> {
class IdleListenerTCP<C extends Connection, M> implements IdleListener<C, M> {
/**
* used by the Idle Sender
@ -31,6 +31,7 @@ class IdleListenerTCP<C extends Connection, M> extends IdleListener<C, M> {
* used by the Idle Sender
*/
@Override
public
void send(C connection, M message) {
connection.send()
.TCP(message);

View File

@ -18,7 +18,7 @@ package dorkbox.network.connection.idle;
import dorkbox.network.connection.Connection;
public
class IdleListenerUDP<C extends Connection, M> extends IdleListener<C, M> {
class IdleListenerUDP<C extends Connection, M> implements IdleListener<C, M> {
/**
* used by the Idle Sender
@ -31,6 +31,7 @@ class IdleListenerUDP<C extends Connection, M> extends IdleListener<C, M> {
* used by the Idle Sender
*/
@Override
public
void send(C connection, M message) {
connection.send()
.UDP(message);

View File

@ -18,7 +18,7 @@ package dorkbox.network.connection.idle;
import dorkbox.network.connection.Connection;
public
class IdleListenerUDT<C extends Connection, M> extends IdleListener<C, M> {
class IdleListenerUDT<C extends Connection, M> implements IdleListener<C, M> {
/**
* used by the Idle Sender
@ -31,6 +31,7 @@ class IdleListenerUDT<C extends Connection, M> extends IdleListener<C, M> {
* used by the Idle Sender
*/
@Override
public
void send(C connection, M message) {
connection.send()
.UDT(message);

View File

@ -16,12 +16,12 @@
package dorkbox.network.connection.idle;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ListenerRaw;
import dorkbox.network.connection.Listener;
import java.io.IOException;
public abstract
class IdleSender<C extends Connection, M> extends ListenerRaw<C, M> {
class IdleSender<C extends Connection, M> implements Listener.OnIdle<C> {
final IdleListener<C, M> idleListener;
volatile boolean started;

View File

@ -0,0 +1,156 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.listenerManagement;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnConnected;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.util.collections.ConcurrentEntry;
import org.slf4j.Logger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
@SuppressWarnings("Duplicates")
public final
class OnConnectedManager<C extends Connection> {
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnConnected<C>, ConcurrentEntry> entries = new IdentityMap<OnConnected<C>, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnConnected<C>> head = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnConnectedManager, ConcurrentEntry> REF =
AtomicReferenceFieldUpdater.newUpdater(OnConnectedManager.class,
ConcurrentEntry.class,
"head");
public
OnConnectedManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnConnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnConnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head1 = REF.get(this);
if (concurrentEntry == head1) {
// if it was second, now it's first
head1 = head1.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head1);
entries.remove(listener);
return true;
} else {
return false;
}
}
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyConnected(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnConnected<C>> head = REF.get(this);
ConcurrentEntry<OnConnected<C>> current = head;
OnConnected<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.connected(connection);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener on 'connected' for listener '{}', connection '{}'.", listener, connection, e);
}
}
}
return head != null; // true if we have something, otherwise false
}
/**
* called on shutdown
*/
public
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
this.entries.clear();
this.head = null;
}
}
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.listenerManagement;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnDisconnected;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.util.collections.ConcurrentEntry;
import org.slf4j.Logger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
@SuppressWarnings("Duplicates")
public final
class OnDisconnectedManager<C extends Connection> {
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnDisconnected<C>, ConcurrentEntry> entries = new IdentityMap<OnDisconnected<C>, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnDisconnected<C>> head = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnDisconnectedManager, ConcurrentEntry> REF =
AtomicReferenceFieldUpdater.newUpdater(OnDisconnectedManager.class,
ConcurrentEntry.class,
"head");
public
OnDisconnectedManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnDisconnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnDisconnected<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head1 = REF.get(this);
if (concurrentEntry == head1) {
// if it was second, now it's first
head1 = head1.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head1);
entries.remove(listener);
return true;
} else {
return false;
}
}
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyDisconnected(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnDisconnected<C>> head = REF.get(this);
ConcurrentEntry<OnDisconnected<C>> current = head;
OnDisconnected<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.disconnected(connection);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener on 'disconnected' for listener '{}', connection '{}'.", listener, connection, e);
}
}
}
return head != null; // true if we have something, otherwise false
}
/**
* called on shutdown
*/
public
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
this.entries.clear();
this.head = null;
}
}
}

View File

@ -0,0 +1,159 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.listenerManagement;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.network.connection.Listener.OnIdle;
import dorkbox.util.collections.ConcurrentEntry;
import org.slf4j.Logger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
@SuppressWarnings("Duplicates")
public final
class OnIdleManager<C extends Connection> {
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
// This is only touched by a single thread, maintains a map of entries for FAST lookup during listener remove.
private final IdentityMap<OnIdle<C>, ConcurrentEntry> entries = new IdentityMap<OnIdle<C>, ConcurrentEntry>(32, ConnectionManager.LOAD_FACTOR);
private volatile ConcurrentEntry<OnIdle<C>> head = null; // reference to the first element
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnIdleManager, ConcurrentEntry> REF =
AtomicReferenceFieldUpdater.newUpdater(OnIdleManager.class,
ConcurrentEntry.class,
"head");
public
OnIdleManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnIdle<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry head = REF.get(this);
if (!entries.containsKey(listener)) {
head = new ConcurrentEntry<Object>(listener, head);
entries.put(listener, head);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head);
}
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnIdle<C> listener) {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot (single-writer-principle)
ConcurrentEntry concurrentEntry = entries.get(listener);
if (concurrentEntry != null) {
ConcurrentEntry head1 = REF.get(this);
if (concurrentEntry == head1) {
// if it was second, now it's first
head1 = head1.next();
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
}
else {
concurrentEntry.remove();
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, head1);
entries.remove(listener);
return true;
}
else {
return false;
}
}
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyIdle(final C connection, final AtomicBoolean shutdown) {
ConcurrentEntry<OnIdle<C>> head = REF.get(this);
ConcurrentEntry<OnIdle<C>> current = head;
OnIdle<C> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.idle(connection);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify listener on 'idle' for listener '{}', connection '{}'.", listener, connection, e);
}
}
}
return head != null; // true if we have something, otherwise false
}
/**
* called on shutdown
*/
public
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
this.entries.clear();
this.head = null;
}
}
}

View File

@ -0,0 +1,320 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.connection.listenerManagement;
import com.esotericsoftware.kryo.util.IdentityMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionManager;
import dorkbox.network.connection.Listener.OnError;
import dorkbox.network.connection.Listener.OnMessageReceived;
import dorkbox.network.connection.Listener.SelfDefinedType;
import dorkbox.network.rmi.RmiMessages;
import dorkbox.util.ClassHelper;
import dorkbox.util.collections.ConcurrentEntry;
import dorkbox.util.collections.ConcurrentIterator;
import org.slf4j.Logger;
import java.lang.reflect.Type;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static dorkbox.util.collections.ConcurrentIterator.headREF;
/**
* Called when the remote end has been connected. This will be invoked before any objects are received by the network.
* This method should not block for long periods as other network activity will not be processed
* until it returns.
*/
@SuppressWarnings("Duplicates")
public final
class OnMessageReceivedManager<C extends Connection> {
private final Logger logger;
//
// The iterators for IdentityMap are NOT THREAD SAFE!
//
@SuppressWarnings("unused")
private volatile IdentityMap<Type, ConcurrentIterator> listeners = new IdentityMap<Type, ConcurrentIterator>(32, ConnectionManager.LOAD_FACTOR);
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
private static final AtomicReferenceFieldUpdater<OnMessageReceivedManager, IdentityMap> REF =
AtomicReferenceFieldUpdater.newUpdater(OnMessageReceivedManager.class,
IdentityMap.class,
"listeners");
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
private final Object lock = new Object();
public
OnMessageReceivedManager(final Logger logger) {
this.logger = logger;
}
public void add(final OnMessageReceived<C, Object> listener) {
final Class<?> type;
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
ConcurrentIterator subscribedListeners = listeners.get(type);
if (subscribedListeners == null) {
subscribedListeners = new ConcurrentIterator();
listeners.put(type, subscribedListeners);
}
subscribedListeners.add(listener);
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean remove(final OnMessageReceived<C, Object> listener) {
final Class<?> type;
if (listener instanceof SelfDefinedType) {
type = ((SelfDefinedType) listener).getType();
}
else {
type = identifyType(listener);
}
boolean found = false;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
final ConcurrentIterator concurrentIterator = listeners.get(type);
if (concurrentIterator != null) {
concurrentIterator.remove(listener);
found = true;
}
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
return found;
}
/**
* @return true if a listener was found, false otherwise
*/
@SuppressWarnings("unchecked")
public
boolean notifyReceived(final C connection, final Object message, final AtomicBoolean shutdown) {
boolean found = false;
Class<?> objectType = message.getClass();
// this is the GLOBAL version (unless it's the call from below, then it's the connection scoped version)
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
ConcurrentIterator concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<OnMessageReceived<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<OnMessageReceived<C, Object>> current = head;
OnMessageReceived<C, Object> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
}
}
}
found = head != null; // true if we have something to publish to, otherwise false
}
if (!(message instanceof RmiMessages)) {
// we march through all super types of the object, and find the FIRST set
// of listeners that are registered and cast it as that, and notify the method.
// NOTICE: we do NOT call ALL TYPE -- meaning, if we have Object->Foo->Bar
// and have listeners for Object and Foo
// we will call Bar (from the above code)
// we will call Foo (from this code)
// we will NOT call Object (since we called Foo). If Foo was not registered, THEN we would call object!
objectType = objectType.getSuperclass();
while (objectType != null) {
// check to see if we have what we are looking for in our CURRENT class
concurrentIterator = listeners.get(objectType);
if (concurrentIterator != null) {
ConcurrentEntry<OnMessageReceived<C, Object>> head = headREF.get(concurrentIterator);
ConcurrentEntry<OnMessageReceived<C, Object>> current = head;
OnMessageReceived<C, Object> listener;
while (current != null && !shutdown.get()) {
listener = current.getValue();
current = current.next();
try {
listener.received(connection, message);
} catch (Exception e) {
if (listener instanceof OnError) {
((OnError<C>) listener).error(connection, e);
}
else {
logger.error("Unable to notify on message '{}' for listener '{}', connection '{}'.",
objectType,
listener,
connection,
e);
}
}
}
found = head != null; // true if we have something to publish to, otherwise false
break;
}
// NO MATCH, so walk up.
objectType = objectType.getSuperclass();
}
}
return found;
}
public
void removeAll() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
listeners.clear();
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}
/**
* @return true if the listener was removed, false otherwise
*/
public
boolean removeAll(final Class<?> classType) {
boolean found;
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
// access a snapshot of the listeners (single-writer-principle)
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
found = listeners.remove(classType) != null;
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
return found;
}
/**
* called on shutdown
*/
public
void clear() {
// synchronized is used here to ensure the "single writer principle", and make sure that ONLY one thread at a time can enter this
// section. Because of this, we can have unlimited reader threads all going at the same time, without contention (which is our
// use-case 99% of the time)
synchronized (lock) {
@SuppressWarnings("unchecked")
final IdentityMap<Type, ConcurrentIterator> listeners = REF.get(this);
// The iterators for this map are NOT THREAD SAFE!
// using .entries() is what we are supposed to use!
final IdentityMap.Entries<Type, ConcurrentIterator> entries = listeners.entries();
for (final IdentityMap.Entry<Type, ConcurrentIterator> next : entries) {
if (next.value != null) {
next.value.clear();
}
}
listeners.clear();
// save this snapshot back to the original (single writer principle)
REF.lazySet(this, listeners);
}
}
/**
* Gets the referenced object type for a specific listener, but ONLY necessary for listeners that receive messages
* <p/>
* This works for compile time code. The generic type parameter #2 (index 1) is pulled from type arguments.
* generic parameters cannot be primitive types
*/
private static Class<?> identifyType(final Object listener) {
final Class<?> clazz = listener.getClass();
Class<?> objectType = ClassHelper.getGenericParameterAsClassForSuperClass(clazz, 1);
if (objectType != null) {
// SOMETIMES generics get confused on which parameter we actually mean (when sub-classing)
if (objectType != Object.class && ClassHelper.hasInterface(Connection.class, objectType)) {
Class<?> objectType2 = ClassHelper.getGenericParameterAsClassForSuperClass(clazz, 2);
if (objectType2 != null) {
objectType = objectType2;
}
}
return objectType;
}
else {
// there was no defined parameters
return Object.class;
}
}
}

View File

@ -16,10 +16,10 @@
package dorkbox.network.connection.ping;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.ListenerRaw;
import dorkbox.network.connection.Listener;
public
class PingSystemListener extends ListenerRaw<ConnectionImpl, PingMessage> {
class PingSystemListener implements Listener.OnMessageReceived<ConnectionImpl, PingMessage> {
public
PingSystemListener() {

View File

@ -0,0 +1,24 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
abstract
class RemoteInvocationResponse<C extends Connection> implements Listener.OnDisconnected<C>,
Listener.OnMessageReceived<C, InvokeMethodResult> {
}

View File

@ -37,7 +37,6 @@ package dorkbox.network.rmi;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ListenerRaw;
import dorkbox.network.util.RMISerializationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,20 +52,19 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* Handles network communication when methods are invoked on a proxy.
*/
public
class RemoteObjectInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteObjectInvocationHandler.class);
final ReentrantLock lock = new ReentrantLock();
final Condition responseCondition = this.lock.newCondition();
private final ReentrantLock lock = new ReentrantLock();
private final Condition responseCondition = this.lock.newCondition();
final InvokeMethodResult[] responseTable = new InvokeMethodResult[64];
final boolean[] pendingResponses = new boolean[64];
private final InvokeMethodResult[] responseTable = new InvokeMethodResult[64];
private final boolean[] pendingResponses = new boolean[64];
private final Connection connection;
public final int objectID;
private final String proxyString;
private final ListenerRaw<Connection, InvokeMethodResult> responseListener;
private final RemoteInvocationResponse<Connection> responseListener;
private int timeoutMillis = 3000;
private boolean isAsync = false;
@ -82,14 +80,13 @@ class RemoteObjectInvocationHandler implements InvocationHandler {
private Byte lastResponseID;
private byte nextResponseId = (byte) 1;
public
RemoteObjectInvocationHandler(final Connection connection, final int objectID) {
super();
this.connection = connection;
this.objectID = objectID;
this.proxyString = "<proxy #" + objectID + ">";
this.responseListener = new ListenerRaw<Connection, InvokeMethodResult>() {
this.responseListener = new RemoteInvocationResponse<Connection>() {
@Override
public
void disconnected(Connection connection) {
@ -433,7 +430,7 @@ class RemoteObjectInvocationHandler implements InvocationHandler {
throw new TimeoutException("Response timed out.");
}
private
void close() {
this.connection.listeners()
.remove(this.responseListener);

View File

@ -38,7 +38,7 @@ import com.esotericsoftware.kryo.util.IntMap;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.ListenerRaw;
import dorkbox.network.connection.Listener;
import dorkbox.util.collections.ObjectIntMap;
import org.slf4j.Logger;
@ -114,7 +114,7 @@ class RmiBridge {
private final ObjectIntMap<Object> objectToID = new ObjectIntMap<Object>();
private final Executor executor;
private final ListenerRaw<ConnectionImpl, InvokeMethod> invokeListener = new ListenerRaw<ConnectionImpl, InvokeMethod>() {
private final Listener.OnMessageReceived<ConnectionImpl, InvokeMethod> invokeListener = new Listener.OnMessageReceived<ConnectionImpl, InvokeMethod>() {
@SuppressWarnings("AutoBoxing")
@Override
public
@ -188,7 +188,7 @@ class RmiBridge {
*/
@SuppressWarnings("rawtypes")
public
ListenerRaw getListener() {
Listener getListener() {
return this.invokeListener;
}

View File

@ -86,7 +86,7 @@ public class ChunkedDataIdleTest extends BaseTest {
addEndPoint(server);
server.setIdleTimeout(10);
server.bind(false);
server.listeners().add(new Listener<Data>() {
server.listeners().add(new Listener.OnConnected<Connection>() {
@Override
public void connected (Connection connection) {
@ -108,7 +108,7 @@ public class ChunkedDataIdleTest extends BaseTest {
Client client = new Client(configuration);
client.setIdleTimeout(10);
addEndPoint(client);
client.listeners().add(new Listener<Data>() {
client.listeners().add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public void received(Connection connection, Data object) {
if (mainData.equals(object)) {

View File

@ -52,7 +52,7 @@ class ClientSendTest extends BaseTest {
server.bind(false);
server.listeners()
.add(new Listener<AMessage>() {
.add(new Listener.OnMessageReceived<Connection, AMessage>() {
@Override
public
void received(Connection connection, AMessage object) {
@ -67,7 +67,7 @@ class ClientSendTest extends BaseTest {
client.connect(5000);
client.listeners()
.add(new Listener<AMessage>() {
.add(new Listener.OnMessageReceived<Connection, AMessage>() {
@Override
public
void received(Connection connection, AMessage object) {

View File

@ -141,7 +141,7 @@ class ConnectionTest extends BaseTest {
server.bind(false);
server.listeners()
.add(new Listener<Object>() {
.add(new Listener.OnConnected<Connection>() {
Timer timer = new Timer();
@Override
@ -156,7 +156,10 @@ class ConnectionTest extends BaseTest {
}
}, 1000);
}
});
server.listeners()
.add(new Listener.OnMessageReceived<Connection, Object>() {
@Override
public void received(Connection connection, Object message) {
System.err.println("Received message from client: " + message.getClass().getSimpleName());
@ -178,7 +181,7 @@ class ConnectionTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<Object>() {
.add(new Listener.OnDisconnected<Connection>() {
@Override
public
void disconnected(Connection connection) {

View File

@ -58,7 +58,7 @@ class DiscoverHostTest extends BaseTest {
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener<Object>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {

View File

@ -23,7 +23,12 @@ import dorkbox.network.PingPongTest.TYPE;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.idle.*;
import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.connection.idle.IdleListener;
import dorkbox.network.connection.idle.IdleListenerTCP;
import dorkbox.network.connection.idle.IdleListenerUDP;
import dorkbox.network.connection.idle.IdleListenerUDT;
import dorkbox.network.connection.idle.InputStreamSender;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
@ -125,7 +130,7 @@ class IdleTest extends BaseTest {
server.setIdleTimeout(100);
server.bind(false);
server.listeners()
.add(new Listener<Data>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
@ -151,7 +156,7 @@ class IdleTest extends BaseTest {
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener<Data>() {
.add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public
void received(Connection connection, Data object) {
@ -182,7 +187,7 @@ class IdleTest extends BaseTest {
server.setIdleTimeout(100);
server.bind(false);
server.listeners()
.add(new Listener<byte[]>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
@ -243,7 +248,7 @@ class IdleTest extends BaseTest {
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener<byte[]>() {
.add(new Listener.OnMessageReceived<Connection, byte[]>() {
int total;
@Override

View File

@ -59,7 +59,7 @@ class LargeResizeBufferTest extends BaseTest {
server.bind(false);
server.listeners()
.add(new Listener<LargeMessage>() {
.add(new Listener.OnMessageReceived<Connection, LargeMessage>() {
AtomicInteger received = new AtomicInteger();
AtomicInteger receivedBytes = new AtomicInteger();
@ -86,7 +86,7 @@ class LargeResizeBufferTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<LargeMessage>() {
.add(new Listener.OnMessageReceived<Connection, LargeMessage>() {
AtomicInteger received = new AtomicInteger();
AtomicInteger receivedBytes = new AtomicInteger();

View File

@ -19,7 +19,11 @@
*/
package dorkbox.network;
import dorkbox.network.connection.*;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.ListenerBridge;
import dorkbox.network.rmi.RmiBridge;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
@ -30,7 +34,10 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public
class ListenerTest extends BaseTest {
@ -39,12 +46,18 @@ class ListenerTest extends BaseTest {
private final int limit = 20;
private AtomicInteger count = new AtomicInteger(0);
volatile String fail = null;
AtomicBoolean subClassWorkedOK = new AtomicBoolean(false);
AtomicBoolean subClassWorkedOK2 = new AtomicBoolean(false);
AtomicBoolean superClassWorkedOK = new AtomicBoolean(false);
AtomicBoolean superClass2WorkedOK = new AtomicBoolean(false);
AtomicBoolean disconnectWorkedOK = new AtomicBoolean(false);
AtomicBoolean checkFail1 = new AtomicBoolean(false);
AtomicBoolean checkFail2 = new AtomicBoolean(false);
AtomicBoolean check1 = new AtomicBoolean(false);
AtomicBoolean check2 = new AtomicBoolean(false);
AtomicBoolean check3 = new AtomicBoolean(false);
AtomicBoolean check4 = new AtomicBoolean(false);
AtomicBoolean check5 = new AtomicBoolean(false);
AtomicBoolean check6 = new AtomicBoolean(false);
AtomicBoolean check7 = new AtomicBoolean(false);
AtomicBoolean check8 = new AtomicBoolean(false);
AtomicBoolean check9 = new AtomicBoolean(false);
// quick and dirty test to also test connection sub-classing
@ -56,7 +69,7 @@ class ListenerTest extends BaseTest {
public
void check() {
ListenerTest.this.subClassWorkedOK.set(true);
ListenerTest.this.check1.set(true);
}
}
@ -70,10 +83,18 @@ class ListenerTest extends BaseTest {
@Override
public
void check() {
ListenerTest.this.subClassWorkedOK.set(true);
ListenerTest.this.checkFail1.set(true);
}
}
abstract class SubListener implements Listener.OnMessageReceived<Connection, String> {
}
abstract class SubListener2 extends SubListener {
}
abstract class SubListener3 implements Listener.OnMessageReceived<Connection, String>, Listener.SelfDefinedType {
}
@SuppressWarnings("rawtypes")
@ -94,76 +115,123 @@ class ListenerTest extends BaseTest {
addEndPoint(server);
server.bind(false);
final ListenerBridge listeners = server.listeners();
server.listeners()
.add(new ListenerRaw<TestConnectionA, String>() {
@Override
public
void received(TestConnectionA connection, String string) {
connection.check();
// System.err.println("default check");
connection.send()
.TCP(string);
}
});
// standard listener
listeners.add(new Listener.OnMessageReceived<TestConnectionA, String>() {
@Override
public
void received(TestConnectionA connection, String string) {
connection.check();
connection.send()
.TCP(string);
}
});
server.listeners()
.add(new Listener<String>() {
@Override
public
void received(Connection connection, String string) {
// System.err.println("subclass check");
ListenerTest.this.subClassWorkedOK2.set(true);
}
});
// standard listener with connection subclassed
listeners.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String string) {
ListenerTest.this.check2.set(true);
}
});
// should be able to happen!
server.listeners()
.add(new Listener() {
@Override
public
void received(Connection connection, Object string) {
// System.err.println("generic class check");
ListenerTest.this.superClassWorkedOK.set(true);
}
});
// standard listener with message subclassed
listeners.add(new Listener.OnMessageReceived<TestConnectionA, Object>() {
@Override
public
void received(TestConnectionA connection, Object string) {
ListenerTest.this.check3.set(true);
}
});
// standard listener with connection subclassed AND message subclassed
listeners.add(new Listener.OnMessageReceived<Connection, Object>() {
@Override
public
void received(Connection connection, Object string) {
ListenerTest.this.check4.set(true);
}
});
// standard listener with connection subclassed AND message subclassed NO GENERICS
listeners.add(new Listener.OnMessageReceived() {
@Override
public
void received(Connection connection, Object string) {
ListenerTest.this.check5.set(true);
}
});
// subclassed listener with connection subclassed AND message subclassed NO GENERICS
listeners.add(new SubListener() {
@Override
public
void received(Connection connection, String string) {
ListenerTest.this.check6.set(true);
}
});
// subclassed listener with connection subclassed AND message subclassed NO GENERICS
listeners.add(new SubListener() {
@Override
public
void received(Connection connection, String string) {
ListenerTest.this.check6.set(true);
}
});
// should be able to happen!
server.listeners()
.add(new ListenerRaw() {
@Override
public
void received(Connection connection, Object string) {
// System.err.println("generic class check");
ListenerTest.this.superClass2WorkedOK.set(true);
}
});
// subclassed listener with connection subclassed x 2 AND message subclassed NO GENERICS
listeners.add(new SubListener2() {
@Override
public
void received(Connection connection, String string) {
ListenerTest.this.check8.set(true);
}
});
// subclassed listener with connection subclassed AND message subclassed NO GENERICS
listeners.add(new SubListener3() {
@Override
public
Class<?> getType() {
return String.class;
}
@Override
public
void received(Connection connection, String string) {
ListenerTest.this.check9.set(true);
}
});
// standard listener disconnect check
listeners.add(new Listener.OnDisconnected<Connection>() {
@Override
public
void disconnected(Connection connection) {
ListenerTest.this.check7.set(true);
}
});
server.listeners()
.add(new Listener() {
@Override
public
void disconnected(Connection connection) {
// System.err.println("disconnect check");
ListenerTest.this.disconnectWorkedOK.set(true);
}
});
// should not let this happen!
try {
server.listeners()
.add(new ListenerRaw<TestConnectionB, String>() {
@Override
public
void received(TestConnectionB connection, String string) {
connection.check();
System.err.println(string);
connection.send()
.TCP(string);
}
});
this.fail = "Should not be able to ADD listeners that are NOT the basetype or the interface";
listeners.add(new Listener.OnMessageReceived<TestConnectionB, String>() {
@Override
public
void received(TestConnectionB connection, String string) {
connection.check();
System.err.println(string);
connection.send()
.TCP(string);
}
});
fail("Should not be able to ADD listeners that are NOT the basetype or the interface");
} catch (Exception e) {
System.err.println("Successfully did NOT add listener that was not the base class");
}
@ -175,14 +243,17 @@ class ListenerTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<String>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.TCP(ListenerTest.this.origString); // 20 a's
}
});
client.listeners()
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String string) {
@ -193,7 +264,8 @@ class ListenerTest extends BaseTest {
}
else {
if (!ListenerTest.this.origString.equals(string)) {
ListenerTest.this.fail = "original string not equal to the string received";
checkFail2.set(true);
System.err.println("original string not equal to the string received");
}
stopEndPoints();
}
@ -204,15 +276,20 @@ class ListenerTest extends BaseTest {
client.connect(5000);
waitForThreads();
assertEquals(this.limit, this.count.get());
assertTrue(this.subClassWorkedOK.get());
assertTrue(this.subClassWorkedOK2.get());
assertTrue(this.superClassWorkedOK.get());
assertTrue(this.superClass2WorkedOK.get());
assertTrue(this.disconnectWorkedOK.get());
if (this.fail != null) {
fail(this.fail);
}
assertEquals(this.limit, this.count.get());
assertTrue(this.check1.get());
assertTrue(this.check2.get());
assertTrue(this.check3.get());
assertTrue(this.check4.get());
assertTrue(this.check5.get());
assertTrue(this.check6.get());
assertTrue(this.check7.get());
assertTrue(this.check8.get());
assertTrue(this.check9.get());
assertFalse(this.checkFail1.get());
assertFalse(this.checkFail2.get());
}
}

View File

@ -51,7 +51,7 @@ class MultipleServerTest extends BaseTest {
server1.bind(false);
server1.listeners()
.add(new Listener<String>() {
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
@ -74,7 +74,7 @@ class MultipleServerTest extends BaseTest {
addEndPoint(server2);
server2.bind(false);
server2.listeners()
.add(new Listener<String>() {
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
@ -95,7 +95,7 @@ class MultipleServerTest extends BaseTest {
Client client1 = new Client(configuration1);
addEndPoint(client1);
client1.listeners()
.add(new Listener<String>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
@ -112,7 +112,7 @@ class MultipleServerTest extends BaseTest {
Client client2 = new Client(configuration2);
addEndPoint(client2);
client2.listeners()
.add(new Listener<String>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {

View File

@ -22,6 +22,7 @@ package dorkbox.network;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.ListenerBridge;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
import org.junit.Test;
@ -70,53 +71,54 @@ class MultipleThreadTest extends BaseTest {
server.bind(false);
server.listeners()
.add(new Listener<DataClass>() {
final ListenerBridge listeners = server.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
System.err.println("Client connected to server.");
@Override
public
void connected(final Connection connection) {
System.err.println("Client connected to server.");
// kickoff however many threads we need, and send data to the client.
for (int i = 1; i <= MultipleThreadTest.this.threadCount; i++) {
final int index = i;
new Thread() {
@Override
public
void run() {
for (int i = 1; i <= MultipleThreadTest.this.messageCount; i++) {
int incrementAndGet = MultipleThreadTest.this.sent.getAndIncrement();
DataClass dataClass = new DataClass(
"Server -> client. Thread #" + index + " message# " + incrementAndGet,
incrementAndGet);
// kickoff however many threads we need, and send data to the client.
for (int i = 1; i <= MultipleThreadTest.this.threadCount; i++) {
final int index = i;
new Thread() {
@Override
public
void run() {
for (int i = 1; i <= MultipleThreadTest.this.messageCount; i++) {
int incrementAndGet = MultipleThreadTest.this.sent.getAndIncrement();
DataClass dataClass = new DataClass("Server -> client. Thread #" + index + " message# " + incrementAndGet,
incrementAndGet);
//System.err.println(dataClass.data);
MultipleThreadTest.this.sentStringsToClientDebug.put(incrementAndGet, dataClass);
connection.send()
.TCP(dataClass)
.flush();
}
}
}.start();
}
}
//System.err.println(dataClass.data);
MultipleThreadTest.this.sentStringsToClientDebug.put(incrementAndGet, dataClass);
connection.send()
.TCP(dataClass)
.flush();
}
}
}.start();
}
}
});
@Override
public
void received(Connection connection, DataClass object) {
int incrementAndGet = MultipleThreadTest.this.receivedServer.getAndIncrement();
listeners.add(new Listener.OnMessageReceived<Connection, DataClass>() {
@Override
public
void received(Connection connection, DataClass object) {
int incrementAndGet = MultipleThreadTest.this.receivedServer.getAndIncrement();
//System.err.println("server #" + incrementAndGet);
if (incrementAndGet == serverReceiveTotal) {
System.err.println("Server DONE " + incrementAndGet);
stopEndPoints();
synchronized (MultipleThreadTest.this.lock) {
MultipleThreadTest.this.lock.notifyAll();
}
}
}
});
//System.err.println("server #" + incrementAndGet);
if (incrementAndGet == serverReceiveTotal) {
System.err.println("Server DONE " + incrementAndGet);
stopEndPoints();
synchronized (MultipleThreadTest.this.lock) {
MultipleThreadTest.this.lock.notifyAll();
}
}
}
});
// ----
@ -128,7 +130,7 @@ class MultipleThreadTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<DataClass>() {
.add(new Listener.OnMessageReceived<Connection, DataClass>() {
final int clientIndex = index;
final AtomicInteger received = new AtomicInteger(1);

View File

@ -22,6 +22,7 @@ package dorkbox.network;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.ListenerBridge;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
@ -33,9 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.fail;
public class PingPongLocalTest extends BaseTest {
public
class PingPongLocalTest extends BaseTest {
int tries = 10000;
private volatile String fail;
int tries = 10000;
@Test
public void pingPongLocal() throws InitializationException, SecurityException, IOException, InterruptedException {
@ -50,53 +52,73 @@ public class PingPongLocalTest extends BaseTest {
Server server = new Server();
addEndPoint(server);
server.bind(false);
server.listeners().add(new Listener<Data>() {
final ListenerBridge listeners = server.listeners();
listeners.add(new Listener.OnError<Connection>() {
@Override
public void error(Connection connection, Throwable throwable) {
public
void error(Connection connection, Throwable throwable) {
PingPongLocalTest.this.fail = "Error during processing. " + throwable;
}
});
listeners.add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public void received(Connection connection, Data data) {
public
void received(Connection connection, Data data) {
connection.id();
if (!data.equals(dataLOCAL)) {
PingPongLocalTest.this.fail = "data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail);
}
connection.send().TCP(data);
connection.send()
.TCP(data);
}
});
// ----
// ----
Client client = new Client();
addEndPoint(client);
client.listeners().add(new Listener<Data>() {
final ListenerBridge listeners1 = client.listeners();
listeners1.add(new Listener.OnConnected<Connection>() {
AtomicInteger check = new AtomicInteger(0);
@Override
public void connected(Connection connection) {
public
void connected(Connection connection) {
PingPongLocalTest.this.fail = null;
connection.send().TCP(dataLOCAL);
connection.send()
.TCP(dataLOCAL);
// connection.sendUDP(dataUDP); // TCP and UDP are the same for a local channel.
}
});
listeners1.add(new Listener.OnError<Connection>() {
AtomicInteger check = new AtomicInteger(0);
@Override
public void error(Connection connection, Throwable throwable) {
public
void error(Connection connection, Throwable throwable) {
PingPongLocalTest.this.fail = "Error during processing. " + throwable;
System.err.println(PingPongLocalTest.this.fail);
}
});
listeners1.add(new Listener.OnMessageReceived<Connection, Data>() {
AtomicInteger check = new AtomicInteger(0);
@Override
public void received(Connection connection, Data data) {
public
void received(Connection connection, Data data) {
if (!data.equals(dataLOCAL)) {
PingPongLocalTest.this.fail = "data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail);
}
if (this.check.getAndIncrement() <= PingPongLocalTest.this.tries) {
connection.send().TCP(data);
} else {
connection.send()
.TCP(data);
}
else {
System.err.println("Ran LOCAL " + PingPongLocalTest.this.tries + " times");
stopEndPoints();
}

View File

@ -23,6 +23,7 @@ import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.ListenerBridge;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
@ -75,136 +76,142 @@ class PingPongTest extends BaseTest {
Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
server.listeners()
.add(new Listener<Data>() {
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongTest.this.fail = "Error during processing. " + throwable;
}
final ListenerBridge listeners1 = server.listeners();
listeners1.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongTest.this.fail = "Error during processing. " + throwable;
}
});
@Override
public
void received(Connection connection, Data data) {
if (data.type == TYPE.TCP) {
if (!data.equals(dataTCP)) {
PingPongTest.this.fail = "TCP data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
connection.send()
.TCP(dataTCP);
}
else if (data.type == TYPE.UDP) {
if (!data.equals(dataUDP)) {
PingPongTest.this.fail = "UDP data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
connection.send()
.UDP(dataUDP);
}
else if (data.type == TYPE.UDT) {
if (!data.equals(dataUDT)) {
PingPongTest.this.fail = "UDT data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
connection.send()
.UDT(dataUDT);
}
else {
PingPongTest.this.fail = "Unknown data type on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
}
});
listeners1.add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public
void received(Connection connection, Data data) {
if (data.type == TYPE.TCP) {
if (!data.equals(dataTCP)) {
PingPongTest.this.fail = "TCP data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
connection.send()
.TCP(dataTCP);
}
else if (data.type == TYPE.UDP) {
if (!data.equals(dataUDP)) {
PingPongTest.this.fail = "UDP data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
connection.send()
.UDP(dataUDP);
}
else if (data.type == TYPE.UDT) {
if (!data.equals(dataUDT)) {
PingPongTest.this.fail = "UDT data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
connection.send()
.UDT(dataUDT);
}
else {
PingPongTest.this.fail = "Unknown data type on server.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
}
});
// ----
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener<Data>() {
AtomicInteger checkTCP = new AtomicInteger(0);
AtomicInteger checkUDP = new AtomicInteger(0);
AtomicInteger checkUDT = new AtomicInteger(0);
AtomicBoolean doneTCP = new AtomicBoolean(false);
AtomicBoolean doneUDP = new AtomicBoolean(false);
AtomicBoolean doneUDT = new AtomicBoolean(false);
final ListenerBridge listeners = client.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
PingPongTest.this.fail = null;
connection.send()
.TCP(dataTCP);
connection.send()
.UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost.
connection.send()
.UDT(dataUDT);
}
});
@Override
public
void connected(Connection connection) {
PingPongTest.this.fail = null;
connection.send()
.TCP(dataTCP);
connection.send()
.UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost.
connection.send()
.UDT(dataUDT);
}
listeners.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongTest.this.fail = "Error during processing. " + throwable;
throwable.printStackTrace();
}
});
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongTest.this.fail = "Error during processing. " + throwable;
throwable.printStackTrace();
}
listeners.add(new Listener.OnMessageReceived<Connection, Data>() {
AtomicInteger checkTCP = new AtomicInteger(0);
AtomicInteger checkUDP = new AtomicInteger(0);
AtomicInteger checkUDT = new AtomicInteger(0);
AtomicBoolean doneTCP = new AtomicBoolean(false);
AtomicBoolean doneUDP = new AtomicBoolean(false);
AtomicBoolean doneUDT = new AtomicBoolean(false);
@Override
public
void received(Connection connection, Data data) {
if (data.type == TYPE.TCP) {
if (!data.equals(dataTCP)) {
PingPongTest.this.fail = "TCP data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
if (this.checkTCP.getAndIncrement() <= PingPongTest.this.tries) {
connection.send()
.TCP(dataTCP);
}
else {
System.err.println("TCP done.");
this.doneTCP.set(true);
}
}
else if (data.type == TYPE.UDP) {
if (!data.equals(dataUDP)) {
PingPongTest.this.fail = "UDP data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
if (this.checkUDP.getAndIncrement() <= PingPongTest.this.tries) {
connection.send()
.UDP(dataUDP);
}
else {
System.err.println("UDP done.");
this.doneUDP.set(true);
}
}
else if (data.type == TYPE.UDT) {
if (!data.equals(dataUDT)) {
PingPongTest.this.fail = "UDT data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
if (this.checkUDT.getAndIncrement() <= PingPongTest.this.tries) {
connection.send()
.UDT(dataUDT);
}
else {
System.err.println("UDT done.");
this.doneUDT.set(true);
}
}
else {
PingPongTest.this.fail = "Unknown data type on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
@Override
public
void received(Connection connection, Data data) {
if (data.type == TYPE.TCP) {
if (!data.equals(dataTCP)) {
PingPongTest.this.fail = "TCP data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
if (this.checkTCP.getAndIncrement() <= PingPongTest.this.tries) {
connection.send()
.TCP(dataTCP);
}
else {
System.err.println("TCP done.");
this.doneTCP.set(true);
}
}
else if (data.type == TYPE.UDP) {
if (!data.equals(dataUDP)) {
PingPongTest.this.fail = "UDP data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
if (this.checkUDP.getAndIncrement() <= PingPongTest.this.tries) {
connection.send()
.UDP(dataUDP);
}
else {
System.err.println("UDP done.");
this.doneUDP.set(true);
}
}
else if (data.type == TYPE.UDT) {
if (!data.equals(dataUDT)) {
PingPongTest.this.fail = "UDT data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
if (this.checkUDT.getAndIncrement() <= PingPongTest.this.tries) {
connection.send()
.UDT(dataUDT);
}
else {
System.err.println("UDT done.");
this.doneUDT.set(true);
}
}
else {
PingPongTest.this.fail = "Unknown data type on client.";
throw new RuntimeException("Fail! " + PingPongTest.this.fail);
}
if (this.doneTCP.get() && this.doneUDP.get() && this.doneUDT.get()) {
System.err.println("Ran TCP, UDP, UDT " + PingPongTest.this.tries + " times each");
stopEndPoints();
}
}
});
if (this.doneTCP.get() && this.doneUDP.get() && this.doneUDT.get()) {
System.err.println("Ran TCP, UDP, UDT " + PingPongTest.this.tries + " times each");
stopEndPoints();
}
}
});
client.connect(5000);

View File

@ -47,7 +47,7 @@ class ReconnectTest extends BaseTest {
addEndPoint(server);
server.listeners()
.add(new Listener<Object>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
@ -68,7 +68,7 @@ class ReconnectTest extends BaseTest {
final Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener<Object>() {
.add(new Listener.OnDisconnected<Connection>() {
@Override
public
void disconnected(Connection connection) {

View File

@ -21,6 +21,7 @@ package dorkbox.network;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.ListenerBridge;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
import org.junit.Test;
@ -48,47 +49,49 @@ class ReuseTest extends BaseTest {
Server server = new Server(configuration);
addEndPoint(server);
server.listeners()
.add(new Listener<String>() {
@Override
public
void connected(Connection connection) {
connection.send()
.TCP("-- TCP from server");
connection.send()
.UDP("-- UDP from server");
}
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReuseTest.this.serverCount.incrementAndGet();
System.err.println("<S " + connection + "> " + incrementAndGet + " : " + object);
}
});
final ListenerBridge listeners = server.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.TCP("-- TCP from server");
connection.send()
.UDP("-- UDP from server");
}
});
listeners.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReuseTest.this.serverCount.incrementAndGet();
System.err.println("<S " + connection + "> " + incrementAndGet + " : " + object);
}
});
// ----
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener<String>() {
@Override
public
void connected(Connection connection) {
connection.send()
.TCP("-- TCP from client");
connection.send()
.UDP("-- UDP from client");
}
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReuseTest.this.clientCount.incrementAndGet();
System.err.println("<C " + connection + "> " + incrementAndGet + " : " + object);
}
});
final ListenerBridge listeners1 = client.listeners();
listeners1.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.TCP("-- TCP from client");
connection.send()
.UDP("-- UDP from client");
}
});
listeners1.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReuseTest.this.clientCount.incrementAndGet();
System.err.println("<C " + connection + "> " + incrementAndGet + " : " + object);
}
});
server.bind(false);
int count = 10;
@ -122,14 +125,16 @@ class ReuseTest extends BaseTest {
Server server = new Server();
addEndPoint(server);
server.listeners()
.add(new Listener<String>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.TCP("-- LOCAL from server");
}
});
server.listeners()
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
@ -143,14 +148,17 @@ class ReuseTest extends BaseTest {
Client client = new Client();
addEndPoint(client);
client.listeners()
.add(new Listener<String>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.TCP("-- LOCAL from client");
}
});
client.listeners()
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {

View File

@ -23,6 +23,7 @@ import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.ListenerBridge;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
import org.junit.Test;
@ -66,13 +67,15 @@ class UnregisteredClassTest extends BaseTest {
addEndPoint(server);
server.bind(false);
server.listeners()
.add(new Listener<Data>() {
.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
UnregisteredClassTest.this.fail = "Error during processing. " + throwable;
}
});
server.listeners()
.add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public
void received(Connection connection, Data data) {
@ -101,70 +104,72 @@ class UnregisteredClassTest extends BaseTest {
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener<Data>() {
AtomicInteger checkTCP = new AtomicInteger(0);
AtomicInteger checkUDP = new AtomicInteger(0);
AtomicBoolean doneTCP = new AtomicBoolean(false);
AtomicBoolean doneUDP = new AtomicBoolean(false);
final ListenerBridge listeners = client.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
UnregisteredClassTest.this.fail = null;
connection.send()
.TCP(dataTCP);
connection.send()
.UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost.
}
});
listeners.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
UnregisteredClassTest.this.fail = "Error during processing. " + throwable;
System.err.println(UnregisteredClassTest.this.fail);
}
});
listeners.add(new Listener.OnMessageReceived<Connection, Data>() {
AtomicInteger checkTCP = new AtomicInteger(0);
AtomicInteger checkUDP = new AtomicInteger(0);
AtomicBoolean doneTCP = new AtomicBoolean(false);
AtomicBoolean doneUDP = new AtomicBoolean(false);
@Override
public
void connected(Connection connection) {
UnregisteredClassTest.this.fail = null;
connection.send()
.TCP(dataTCP);
connection.send()
.UDP(dataUDP); // UDP ping pong stops if a UDP packet is lost.
}
@Override
public
void received(Connection connection, Data data) {
if (data.isTCP) {
if (!data.equals(dataTCP)) {
UnregisteredClassTest.this.fail = "TCP data is not equal on client.";
throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail);
}
if (this.checkTCP.getAndIncrement() <= UnregisteredClassTest.this.tries) {
connection.send()
.TCP(data);
UnregisteredClassTest.this.receivedTCP.incrementAndGet();
}
else {
System.err.println("TCP done.");
this.doneTCP.set(true);
}
}
else {
if (!data.equals(dataUDP)) {
UnregisteredClassTest.this.fail = "UDP data is not equal on client.";
throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail);
}
if (this.checkUDP.getAndIncrement() <= UnregisteredClassTest.this.tries) {
connection.send()
.UDP(data);
UnregisteredClassTest.this.receivedUDP.incrementAndGet();
}
else {
System.err.println("UDP done.");
this.doneUDP.set(true);
}
}
@Override
public
void error(Connection connection, Throwable throwable) {
UnregisteredClassTest.this.fail = "Error during processing. " + throwable;
System.err.println(UnregisteredClassTest.this.fail);
}
@Override
public
void received(Connection connection, Data data) {
if (data.isTCP) {
if (!data.equals(dataTCP)) {
UnregisteredClassTest.this.fail = "TCP data is not equal on client.";
throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail);
}
if (this.checkTCP.getAndIncrement() <= UnregisteredClassTest.this.tries) {
connection.send()
.TCP(data);
UnregisteredClassTest.this.receivedTCP.incrementAndGet();
}
else {
System.err.println("TCP done.");
this.doneTCP.set(true);
}
}
else {
if (!data.equals(dataUDP)) {
UnregisteredClassTest.this.fail = "UDP data is not equal on client.";
throw new RuntimeException("Fail! " + UnregisteredClassTest.this.fail);
}
if (this.checkUDP.getAndIncrement() <= UnregisteredClassTest.this.tries) {
connection.send()
.UDP(data);
UnregisteredClassTest.this.receivedUDP.incrementAndGet();
}
else {
System.err.println("UDP done.");
this.doneUDP.set(true);
}
}
if (this.doneTCP.get() && this.doneUDP.get()) {
System.err.println("Ran TCP & UDP " + UnregisteredClassTest.this.tries + " times each");
stopEndPoints();
}
}
});
if (this.doneTCP.get() && this.doneUDP.get()) {
System.err.println("Ran TCP & UDP " + UnregisteredClassTest.this.tries + " times each");
stopEndPoints();
}
}
});
client.connect(5000);
waitForThreads();

View File

@ -229,13 +229,16 @@ class RmiGlobalTest extends BaseTest {
server.bind(false);
server.listeners()
.add(new Listener<MessageWithTestObject>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
RmiGlobalTest.runTest(connection, globalRemoteClientObject, CLIENT_GLOBAL_OBJECT_ID);
}
});
server.listeners()
.add(new Listener.OnMessageReceived<Connection, MessageWithTestObject>() {
@Override
public
void received(Connection connection, MessageWithTestObject m) {
@ -260,7 +263,7 @@ class RmiGlobalTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<MessageWithTestObject>() {
.add(new Listener.OnMessageReceived<Connection, MessageWithTestObject>() {
@Override
public
void received(Connection connection, MessageWithTestObject m) {

View File

@ -86,7 +86,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest {
server.listeners()
.add(new Listener<OtherObjectImpl>() {
.add(new Listener.OnMessageReceived<Connection, OtherObjectImpl>() {
@Override
public
void received(Connection connection, OtherObjectImpl object) {
@ -108,7 +108,7 @@ class RmiSendObjectOverrideMethodTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<Object>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {

View File

@ -84,7 +84,7 @@ class RmiSendObjectTest extends BaseTest {
server.listeners()
.add(new Listener<OtherObjectImpl>() {
.add(new Listener.OnMessageReceived<Connection, OtherObjectImpl>() {
@Override
public
void received(Connection connection, OtherObjectImpl object) {
@ -104,7 +104,7 @@ class RmiSendObjectTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<Object>() {
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {

View File

@ -41,6 +41,7 @@ import dorkbox.network.Server;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.KryoCryptoSerializationManager;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.ListenerBridge;
import dorkbox.network.util.CryptoSerializationManager;
import dorkbox.util.exceptions.InitializationException;
import dorkbox.util.exceptions.SecurityException;
@ -210,27 +211,29 @@ class RmiTest extends BaseTest {
addEndPoint(server);
server.bind(false);
server.listeners()
.add(new Listener<MessageWithTestObject>() {
@Override
public
void connected(final Connection connection) {
System.err.println("Starting test for: Server -> Client");
RmiTest.runTest(connection, 1);
}
final ListenerBridge listeners = server.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
System.err.println("Starting test for: Server -> Client");
RmiTest.runTest(connection, 1);
}
});
listeners.add(new Listener.OnMessageReceived<Connection, MessageWithTestObject>() {
@Override
public
void received(Connection connection, MessageWithTestObject m) {
TestObject object = m.testObject;
final int id = object.id();
assertEquals(2, id);
System.err.println("Client -> Server Finished!");
@Override
public
void received(Connection connection, MessageWithTestObject m) {
TestObject object = m.testObject;
final int id = object.id();
assertEquals(2, id);
System.err.println("Client -> Server Finished!");
stopEndPoints(2000);
}
stopEndPoints(2000);
}
});
});
// ----
@ -241,7 +244,7 @@ class RmiTest extends BaseTest {
addEndPoint(client);
client.listeners()
.add(new Listener<MessageWithTestObject>() {
.add(new Listener.OnMessageReceived<Connection, MessageWithTestObject>() {
@Override
public
void received(Connection connection, MessageWithTestObject m) {