Consolidated multiple RMI support classes (local/network)

This commit is contained in:
nathan 2019-01-25 17:04:12 +01:00
parent 12a9293526
commit 7e442f2f8a
6 changed files with 76 additions and 88 deletions

View File

@ -35,10 +35,10 @@ import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelNull;
import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.rmi.ConnectionNoOpSupport;
import dorkbox.network.rmi.ConnectionRmiImplSupport;
import dorkbox.network.rmi.ConnectionRmiLocalSupport;
import dorkbox.network.rmi.ConnectionRmiNetworkSupport;
import dorkbox.network.rmi.ConnectionRmiSupport;
import dorkbox.network.rmi.RemoteObjectCallback;
import dorkbox.network.rmi.RmiObjectHandler;
import io.netty.bootstrap.DatagramSessionChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
@ -146,17 +146,16 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_
boolean isNetworkChannel = this.channelWrapper instanceof ChannelNetworkWrapper;
if (endPoint.rmiEnabled) {
RmiObjectHandler handler;
if (isNetworkChannel) {
handler = endPoint.rmiNetworkHandler;
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent access, but
// there WILL be issues with thread visibility because a different worker thread can be called for different connections
this.rmiSupport = new ConnectionRmiNetworkSupport(this, endPoint.rmiGlobalBridge);
}
else {
handler = endPoint.rmiLocalHandler;
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent access, but
// there WILL be issues with thread visibility because a different worker thread can be called for different connections
this.rmiSupport = new ConnectionRmiLocalSupport(this, endPoint.rmiGlobalBridge);
}
// because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent access, but
// there WILL be issues with thread visibility because a different worker thread can be called for different connections
this.rmiSupport = new ConnectionRmiImplSupport(this, endPoint.rmiGlobalBridge, handler);
} else {
this.rmiSupport = new ConnectionNoOpSupport();
}

View File

@ -35,8 +35,6 @@ import dorkbox.network.connection.wrapper.ChannelLocalWrapper;
import dorkbox.network.connection.wrapper.ChannelNetworkWrapper;
import dorkbox.network.connection.wrapper.ChannelWrapper;
import dorkbox.network.rmi.RmiBridge;
import dorkbox.network.rmi.RmiObjectLocalHandler;
import dorkbox.network.rmi.RmiObjectNetworkHandler;
import dorkbox.network.serialization.NetworkSerializationManager;
import dorkbox.network.serialization.Serialization;
import dorkbox.network.store.NullSettingsStore;
@ -170,8 +168,6 @@ class EndPoint extends Shutdownable {
final boolean rmiEnabled;
// we only want one instance of these created. These will be called appropriately
final RmiObjectLocalHandler rmiLocalHandler;
final RmiObjectNetworkHandler rmiNetworkHandler;
final RmiBridge rmiGlobalBridge;
@ -283,13 +279,9 @@ class EndPoint extends Shutdownable {
connectionManager = new ConnectionManager(type.getSimpleName(), connection0(null, null).getClass());
if (rmiEnabled) {
rmiLocalHandler = new RmiObjectLocalHandler(logger);
rmiNetworkHandler = new RmiObjectNetworkHandler(logger);
rmiGlobalBridge = new RmiBridge(logger, true);
}
else {
rmiLocalHandler = null;
rmiNetworkHandler = null;
rmiGlobalBridge = null;
}

View File

@ -36,32 +36,32 @@ import dorkbox.util.collections.LockFreeHashMap;
import dorkbox.util.collections.LockFreeIntMap;
import dorkbox.util.generics.ClassHelper;
public
public abstract
class ConnectionRmiImplSupport implements ConnectionRmiSupport {
private final RmiBridge rmiGlobalBridge;
private final RmiBridge rmiLocalBridge;
private final RmiObjectHandler rmiHandler;
private final Map<Integer, RemoteObject> proxyIdCache;
private final List<OnMessageReceived<Connection, InvokeMethodResult>> proxyListeners;
final ConnectionImpl connection;
private final LockFreeIntMap<RemoteObjectCallback> rmiRegistrationCallbacks;
private final Logger logger;
private volatile int rmiCallbackId = 0;
public
ConnectionRmiImplSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge, final RmiObjectHandler rmiHandler) {
final ConnectionImpl connection;
protected final Logger logger;
protected
ConnectionRmiImplSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge) {
this.connection = connection;
if (rmiGlobalBridge == null || rmiHandler == null) {
if (rmiGlobalBridge == null ) {
throw new NullPointerException("RMI cannot be null if using RMI support!");
}
this.rmiGlobalBridge = rmiGlobalBridge;
this.rmiHandler = rmiHandler;
logger = rmiGlobalBridge.logger;
@ -78,6 +78,12 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
rmiRegistrationCallbacks = new LockFreeIntMap<RemoteObjectCallback>();
}
abstract InvokeMethod getInvokeMethod(final NetworkSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod);
abstract void registration(final ConnectionImpl connection, final RmiRegistration message);
abstract Object normalMessages(final Object message);
public
void close() {
// proxy listeners are cleared in the removeAll() call (which happens BEFORE close)
@ -147,7 +153,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
if (message instanceof InvokeMethod) {
NetworkSerializationManager serialization = connection.getEndPoint().getSerialization();
InvokeMethod invokeMethod = rmiHandler.getInvokeMethod(serialization, connection, (InvokeMethod) message);
InvokeMethod invokeMethod = getInvokeMethod(serialization, connection, (InvokeMethod) message);
int objectID = invokeMethod.objectID;
@ -182,7 +188,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
return true;
}
else if (message instanceof RmiRegistration) {
rmiHandler.registration(this, connection, (RmiRegistration) message);
registration(connection, (RmiRegistration) message);
return true;
}
@ -195,7 +201,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
* For local connections, we have to switch it appropriately in the LocalRmiProxy
*/
public
RmiRegistration createNewRmiObject(final NetworkSerializationManager serialization, final Class<?> interfaceClass, final Class<?> implementationClass, final int callbackId, final Logger logger) {
RmiRegistration createNewRmiObject(final NetworkSerializationManager serialization, final Class<?> interfaceClass, final Class<?> implementationClass, final int callbackId) {
KryoExtra kryo = null;
Object object = null;
int rmiId = 0;
@ -290,7 +296,7 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
}
public
void runCallback(final Class<?> interfaceClass, final int callbackId, final Object remoteObject, final Logger logger) {
void runCallback(final Class<?> interfaceClass, final int callbackId, final Object remoteObject) {
RemoteObjectCallback callback = rmiRegistrationCallbacks.remove(callbackId);
try {
@ -391,6 +397,6 @@ class ConnectionRmiImplSupport implements ConnectionRmiSupport {
public
Object fixupRmi(final ConnectionImpl connection, final Object message) {
// "local RMI" objects have to be modified, this part does that
return rmiHandler.normalMessages(this, message);
return normalMessages(message);
}
}

View File

@ -1,17 +1,16 @@
/*
* Copyright 2018 dorkbox, llc.
* Copyright 2019 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
@ -20,8 +19,6 @@ import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.util.IdentityMap;
@ -39,30 +36,30 @@ import dorkbox.network.serialization.NetworkSerializationManager;
* This is for a LOCAL connection (same-JVM)
*/
public
class RmiObjectLocalHandler implements RmiObjectHandler {
class ConnectionRmiLocalSupport extends ConnectionRmiImplSupport {
private static final boolean ENABLE_PROXY_OBJECTS = RmiBridge.ENABLE_PROXY_OBJECTS;
private static final Field[] NO_REMOTE_FIELDS = new Field[0];
private static final AtomicReferenceFieldUpdater<RmiObjectLocalHandler, IdentityMap> implToProxyREF = AtomicReferenceFieldUpdater.newUpdater(
RmiObjectLocalHandler.class,
private static final AtomicReferenceFieldUpdater<ConnectionRmiLocalSupport, IdentityMap> implToProxyREF = AtomicReferenceFieldUpdater.newUpdater(
ConnectionRmiLocalSupport.class,
IdentityMap.class,
"implToProxy");
private static final AtomicReferenceFieldUpdater<RmiObjectLocalHandler, IdentityMap> remoteObjectREF = AtomicReferenceFieldUpdater.newUpdater(
RmiObjectLocalHandler.class,
private static final AtomicReferenceFieldUpdater<ConnectionRmiLocalSupport, IdentityMap> remoteObjectREF = AtomicReferenceFieldUpdater.newUpdater(
ConnectionRmiLocalSupport.class,
IdentityMap.class,
"objectHasRemoteObjects");
private volatile IdentityMap<Object, Object> implToProxy = new IdentityMap<Object, Object>();
private volatile IdentityMap<Object, Field[]> objectHasRemoteObjects = new IdentityMap<Object, Field[]>();
private final Logger logger;
public
RmiObjectLocalHandler(final Logger logger) {
this.logger = logger;
ConnectionRmiLocalSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge) {
super(connection, rmiGlobalBridge);
}
@Override
public
InvokeMethod getInvokeMethod(final NetworkSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod) {
int methodClassID = invokeMethod.cachedMethod.methodClassID;
@ -121,7 +118,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
@Override
public
void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) {
void registration(final ConnectionImpl connection, final RmiRegistration registration) {
// manage creating/getting/notifying this RMI object
// these fields are ALWAYS present!
@ -141,7 +138,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
Class<?> rmiImpl = serialization.getRmiImpl(registration.interfaceClass);
RmiRegistration registrationResult = rmiSupport.createNewRmiObject(serialization, interfaceClass, rmiImpl, callbackId, logger);
RmiRegistration registrationResult = createNewRmiObject(serialization, interfaceClass, rmiImpl, callbackId);
connection.send(registrationResult);
// connection transport is flushed in calling method (don't need to do it here)
}
@ -149,7 +146,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
// Check if we are getting an already existing REMOTE object. This check is always AFTER the check to create a new object
else {
// GET a LOCAL rmi object, if none get a specific, GLOBAL rmi object (objects that are not bound to a single connection).
Object implementationObject = rmiSupport.getImplementationObject(registration.rmiId);
Object implementationObject = getImplementationObject(registration.rmiId);
connection.send(new RmiRegistration(interfaceClass, registration.rmiId, callbackId, implementationObject));
// connection transport is flushed in calling method (don't need to do it here)
}
@ -175,7 +172,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
else {
// override the implementation object with the proxy. This is required because RMI must be the same between "network" and "local"
// connections -- even if this "slows down" the speed/performance of what "local" connections offer.
proxyObject = rmiSupport.getProxyObject(registration.rmiId, interfaceClass);
proxyObject = getProxyObject(registration.rmiId, interfaceClass);
if (proxyObject != null && registration.remoteObject != null) {
// have to save A and B so we can correctly switch as necessary
@ -187,10 +184,10 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
}
}
rmiSupport.runCallback(interfaceClass, callbackId, proxyObject, logger);
runCallback(interfaceClass, callbackId, proxyObject);
}
else {
rmiSupport.runCallback(interfaceClass, callbackId, registration.remoteObject, logger);
runCallback(interfaceClass, callbackId, registration.remoteObject);
}
}
}
@ -198,7 +195,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
@SuppressWarnings("unchecked")
@Override
public
Object normalMessages(final ConnectionRmiImplSupport rmiSupport, final Object message) {
Object normalMessages(final Object message) {
// else, this was "just a local message"
// because we NORMALLY pass around just the object (there is no serialization going on...) we have to explicitly check to see
@ -246,7 +243,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o);
int id = handler.rmiObjectId;
field.set(message, rmiSupport.getImplementationObject(id));
field.set(message, getImplementationObject(id));
fields.add(field);
}
else {
@ -299,7 +296,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler {
RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o);
int id = handler.rmiObjectId;
field.set(message, rmiSupport.getImplementationObject(id));
field.set(message, getImplementationObject(id));
}
else {
// is a field supposed to be a proxy?

View File

@ -1,33 +1,31 @@
/*
* Copyright 2018 dorkbox, llc.
* Copyright 2019 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.rmi;
import org.slf4j.Logger;
import dorkbox.network.connection.ConnectionImpl;
import dorkbox.network.serialization.NetworkSerializationManager;
/**
*
*/
public
class RmiObjectNetworkHandler implements RmiObjectHandler {
private final Logger logger;
class ConnectionRmiNetworkSupport extends ConnectionRmiImplSupport {
public
RmiObjectNetworkHandler(final Logger logger) {
this.logger = logger;
ConnectionRmiNetworkSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge) {
super(connection, rmiGlobalBridge);
}
public
@ -38,7 +36,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler {
@Override
public
void registration(final ConnectionRmiImplSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration registration) {
void registration(final ConnectionImpl connection, final RmiRegistration registration) {
// manage creating/getting/notifying this RMI object
// these fields are ALWAYS present!
@ -60,7 +58,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler {
// For network connections, the interface class kryo ID == implementation class kryo ID, so they switch automatically.
RmiRegistration registrationResult = rmiSupport.createNewRmiObject(serialization, interfaceClass, rmiImpl, callbackId, logger);
RmiRegistration registrationResult = createNewRmiObject(serialization, interfaceClass, rmiImpl, callbackId);
connection.send(registrationResult);
// connection transport is flushed in calling method (don't need to do it here)
}
@ -70,7 +68,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler {
// THIS IS ON THE REMOTE CONNECTION (where the object implementation will really exist)
//
// GET a LOCAL rmi object, if none get a specific, GLOBAL rmi object (objects that are not bound to a single connection).
Object implementationObject = rmiSupport.getImplementationObject(registration.rmiId);
Object implementationObject = getImplementationObject(registration.rmiId);
connection.send(new RmiRegistration(interfaceClass, registration.rmiId, callbackId, implementationObject));
// connection transport is flushed in calling method (don't need to do it here)
}
@ -82,13 +80,13 @@ class RmiObjectNetworkHandler implements RmiObjectHandler {
// this is the response.
// THIS IS ON THE LOCAL CONNECTION SIDE, which is the side that called 'getRemoteObject()' This can be Server or Client.
rmiSupport.runCallback(interfaceClass, callbackId, registration.remoteObject, logger);
runCallback(interfaceClass, callbackId, registration.remoteObject);
}
}
@Override
public
Object normalMessages(final ConnectionRmiImplSupport connection, final Object message) {
Object normalMessages(final Object message) {
return message;
}
}

View File

@ -14,8 +14,6 @@
*/
package dorkbox.network.rmi;
import org.slf4j.Logger;
import dorkbox.network.connection.ConnectionImpl;
public
@ -34,8 +32,6 @@ interface ConnectionRmiSupport {
<T> int getRegisteredId(final T object);
void runCallback(final Class<?> interfaceClass, final int callbackId, final Object remoteObject, final Logger logger);
RemoteObject getProxyObject(final int rmiId, final Class<?> iFace);
Object getImplementationObject(final int objectId);