From 2a4ae9939024f6a8fa16374a2c51f03c76e0f8e0 Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 25 Jan 2019 16:17:15 +0100 Subject: [PATCH] RMI Optimizations, merged/renamed primary serialization interface. --- src/dorkbox/network/Configuration.java | 12 +- .../network/connection/ConnectionImpl.java | 76 +-- ...CryptoConnection.java => Connection_.java} | 11 +- src/dorkbox/network/connection/EndPoint.java | 5 +- .../network/connection/EndPointClient.java | 13 +- src/dorkbox/network/connection/KryoExtra.java | 48 +- .../connection/RegistrationWrapper.java | 4 +- .../network/connection/RmiConnection.java | 48 -- .../network/connection/Shutdownable.java | 14 +- .../ConnectionRegistrationImpl.java | 30 +- .../remote/RegistrationRemoteHandler.java | 4 +- .../network/pipeline/tcp/KryoDecoder.java | 10 +- .../pipeline/tcp/KryoDecoderCrypto.java | 12 +- .../network/pipeline/tcp/KryoEncoder.java | 8 +- .../pipeline/tcp/KryoEncoderCrypto.java | 12 +- .../network/pipeline/udp/KryoDecoderUdp.java | 6 +- .../pipeline/udp/KryoDecoderUdpCrypto.java | 12 +- .../network/pipeline/udp/KryoEncoderUdp.java | 9 +- .../pipeline/udp/KryoEncoderUdpCrypto.java | 13 +- .../network/rmi/ConnectionRmiSupport.java | 131 +++--- .../network/rmi/ConnectionSupport.java | 7 +- .../rmi/InvocationHandlerSerializer.java | 4 +- .../network/rmi/InvokeMethodSerializer.java | 2 +- .../network/rmi/RemoteObjectSerializer.java | 4 +- src/dorkbox/network/rmi/RmiBridge.java | 12 +- src/dorkbox/network/rmi/RmiNopConnection.java | 131 ++++++ src/dorkbox/network/rmi/RmiObjectHandler.java | 4 +- .../network/rmi/RmiObjectLocalHandler.java | 16 +- .../network/rmi/RmiObjectNetworkHandler.java | 8 +- ...LocalHandler.java => RmiProxyHandler.java} | 35 +- .../network/rmi/RmiProxyNetworkHandler.java | 440 ------------------ .../rmi/RmiRegistrationSerializer.java | 4 +- .../serialization/ClassRegistration.java | 3 +- .../serialization/ClassSerializer.java | 3 +- .../serialization/ClassSerializer1.java | 3 +- .../serialization/ClassSerializer2.java | 3 +- .../serialization/ClassSerializerRmi.java | 3 +- .../CryptoSerializationManager.java | 48 -- ....java => NetworkSerializationManager.java} | 37 +- .../network/serialization/Serialization.java | 50 +- test/dorkbox/network/rmi/RmiGlobalTest.java | 3 +- .../rmi/RmiObjectIdExhaustionTest.java | 3 +- test/dorkbox/network/rmi/RmiTest.java | 3 +- 43 files changed, 420 insertions(+), 884 deletions(-) rename src/dorkbox/network/connection/{CryptoConnection.java => Connection_.java} (85%) delete mode 100644 src/dorkbox/network/connection/RmiConnection.java create mode 100644 src/dorkbox/network/rmi/RmiNopConnection.java rename src/dorkbox/network/rmi/{RmiProxyLocalHandler.java => RmiProxyHandler.java} (93%) delete mode 100644 src/dorkbox/network/rmi/RmiProxyNetworkHandler.java delete mode 100644 src/dorkbox/network/serialization/CryptoSerializationManager.java rename src/dorkbox/network/serialization/{RmiSerializationManager.java => NetworkSerializationManager.java} (79%) diff --git a/src/dorkbox/network/Configuration.java b/src/dorkbox/network/Configuration.java index 642de464..faf8dbb0 100644 --- a/src/dorkbox/network/Configuration.java +++ b/src/dorkbox/network/Configuration.java @@ -15,10 +15,8 @@ */ package dorkbox.network; -import java.util.concurrent.Executor; - import dorkbox.network.connection.EndPoint; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.network.store.SettingsStore; public @@ -59,13 +57,7 @@ class Configuration { /** * Specify the serialization manager to use. If null, it uses the default. */ - public CryptoSerializationManager serialization = null; - - /** - * Sets the executor used to invoke methods when an invocation is received from a remote endpoint. By default, no executor is set and - * invocations occur on the network thread, which should not be blocked for long. - */ - public Executor rmiExecutor = null; + public NetworkSerializationManager serialization = null; /** * The number of threads used for the worker threads by the end point. By default, this is the CPU_COUNT/2 or 1, whichever is larger. diff --git a/src/dorkbox/network/connection/ConnectionImpl.java b/src/dorkbox/network/connection/ConnectionImpl.java index c0498cc7..bac8eae9 100644 --- a/src/dorkbox/network/connection/ConnectionImpl.java +++ b/src/dorkbox/network/connection/ConnectionImpl.java @@ -36,10 +36,8 @@ import dorkbox.network.connection.wrapper.ChannelNull; import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.rmi.ConnectionRmiSupport; import dorkbox.network.rmi.ConnectionSupport; -import dorkbox.network.rmi.RemoteObject; import dorkbox.network.rmi.RemoteObjectCallback; import dorkbox.network.rmi.RmiObjectHandler; -import dorkbox.network.rmi.TimeoutException; import io.netty.bootstrap.DatagramSessionChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; @@ -66,7 +64,7 @@ import io.netty.util.concurrent.Promise; @SuppressWarnings("unused") @Sharable public -class ConnectionImpl extends ChannelInboundHandlerAdapter implements CryptoConnection, Connection, Listeners, ConnectionBridge { +class ConnectionImpl extends ChannelInboundHandlerAdapter implements Connection_, Listeners, ConnectionBridge { public static boolean isTcpChannel(Class channelClass) { return channelClass == OioSocketChannel.class || @@ -130,7 +128,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements CryptoConne // RMI support for this connection - private final ConnectionSupport rmiSupport; + final ConnectionSupport rmiSupport; /** * All of the parameters can be null, when metaChannel wants to get the base class type @@ -158,7 +156,7 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements CryptoConne // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent access, but // there WILL be issues with thread visibility because a different worker thread can be called for different connections - this.rmiSupport = new ConnectionRmiSupport(endPoint.rmiGlobalBridge, handler); + this.rmiSupport = new ConnectionRmiSupport(this, endPoint.rmiGlobalBridge, handler); } else { this.rmiSupport = new ConnectionSupport(); } @@ -1012,7 +1010,11 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements CryptoConne // // - + @Override + public + ConnectionSupport rmiSupport() { + return rmiSupport; + } @Override public final @@ -1026,7 +1028,6 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements CryptoConne rmiSupport.getRemoteObject(this, objectId, callback); } - /** * Manages the RMI stuff for a connection. * @@ -1044,66 +1045,5 @@ class ConnectionImpl extends ChannelInboundHandlerAdapter implements CryptoConne return rmiSupport.fixupRmi(this, message); } - /** - * This will remove the invoke and invoke response listeners for this remote object - */ - public - void removeRmiListeners(final int objectID, final Listener listener) { - rmiSupport.removeAllListeners(); //? this is called from close(), when the "RMI" object is closed. TODO: REMOVE THIS? - } - public final - void runRmiCallback(final Class interfaceClass, final int callbackId, final Object remoteObject) { - rmiSupport.runCallback(interfaceClass, callbackId, remoteObject, logger); - } - - /** - * Used by RMI by the LOCAL side when setting up the to fetch an object for the REMOTE side - * - * @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID. - */ - @Override - public - int getRegisteredId(final T object) { - return rmiSupport.getRegisteredId(object); - } - - /** - * Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)} - *

- *

- * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked - * remotely. - *

- * Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link - * RemoteObject#setResponseTimeout(int) response timeout}. - *

- * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be - * called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be - * called on the update thread. - *

- * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will - * have the proxy object replaced with the registered object. - * - * @see RemoteObject - * - * @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID - * @param iFace this is the RMI interface - */ - @Override - public - RemoteObject getProxyObject(final int rmiId, final Class iFace) { - return rmiSupport.getProxyObject(this, rmiId, iFace); - } - - /** - * This is used by RMI for the REMOTE side, to get the implementation - * - * @param objectId this is the RMI object ID - */ - @Override - public - Object getImplementationObject(final int objectId) { - return rmiSupport.getImplementationObject(objectId); - } } diff --git a/src/dorkbox/network/connection/CryptoConnection.java b/src/dorkbox/network/connection/Connection_.java similarity index 85% rename from src/dorkbox/network/connection/CryptoConnection.java rename to src/dorkbox/network/connection/Connection_.java index b8277007..9e2155af 100644 --- a/src/dorkbox/network/connection/CryptoConnection.java +++ b/src/dorkbox/network/connection/Connection_.java @@ -17,11 +17,18 @@ package dorkbox.network.connection; import org.bouncycastle.crypto.params.ParametersWithIV; +import dorkbox.network.rmi.ConnectionSupport; + /** - * Supporting methods for encrypting data to a remote endpoint + * Supporting methods for encrypting data to a remote endpoint and RMI */ public -interface CryptoConnection extends RmiConnection, Connection { +interface Connection_ extends Connection { + + /** + * @return the RMI support for this connection + */ + ConnectionSupport rmiSupport(); /** * This is the per-message sequence number. diff --git a/src/dorkbox/network/connection/EndPoint.java b/src/dorkbox/network/connection/EndPoint.java index fb94849b..dae6c5d4 100644 --- a/src/dorkbox/network/connection/EndPoint.java +++ b/src/dorkbox/network/connection/EndPoint.java @@ -37,6 +37,7 @@ import dorkbox.network.connection.wrapper.ChannelWrapper; import dorkbox.network.rmi.RmiBridge; import dorkbox.network.rmi.RmiObjectLocalHandler; import dorkbox.network.rmi.RmiObjectNetworkHandler; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.network.serialization.Serialization; import dorkbox.network.store.NullSettingsStore; import dorkbox.network.store.SettingsStore; @@ -158,7 +159,7 @@ class EndPoint extends Shutdownable { protected final Configuration config; protected final ConnectionManager connectionManager; - protected final dorkbox.network.serialization.CryptoSerializationManager serializationManager; + protected final NetworkSerializationManager serializationManager; protected final RegistrationWrapper registrationWrapper; final ECPrivateKeyParameters privateKey; @@ -363,7 +364,7 @@ class EndPoint extends Shutdownable { * Returns the serialization wrapper if there is an object type that needs to be added outside of the basics. */ public - dorkbox.network.serialization.CryptoSerializationManager getSerialization() { + NetworkSerializationManager getSerialization() { return serializationManager; } diff --git a/src/dorkbox/network/connection/EndPointClient.java b/src/dorkbox/network/connection/EndPointClient.java index 6ab7217a..f80b15f9 100644 --- a/src/dorkbox/network/connection/EndPointClient.java +++ b/src/dorkbox/network/connection/EndPointClient.java @@ -314,9 +314,7 @@ class EndPointClient extends EndPoint { closeConnections(true); shutdownAllChannels(); - // shutdownEventLoops(); - - + // shutdownEventLoops(); we don't do this here! connection = null; isConnected.set(false); @@ -330,4 +328,13 @@ class EndPointClient extends EndPoint { // make sure we're not waiting on registration stopRegistration(); } + + @Override + protected + void shutdownChannelsPre() { + closeConnection(); + + // this calls connectionManager.stop() + super.shutdownChannelsPre(); + } } diff --git a/src/dorkbox/network/connection/KryoExtra.java b/src/dorkbox/network/connection/KryoExtra.java index 499724aa..6990ef1f 100644 --- a/src/dorkbox/network/connection/KryoExtra.java +++ b/src/dorkbox/network/connection/KryoExtra.java @@ -28,7 +28,9 @@ import com.esotericsoftware.kryo.util.MapReferenceResolver; import dorkbox.network.pipeline.ByteBufInput; import dorkbox.network.pipeline.ByteBufOutput; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.rmi.ConnectionSupport; +import dorkbox.network.rmi.RmiNopConnection; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.bytes.BigEndian; import dorkbox.util.bytes.OptimizeUtilsByteArray; import dorkbox.util.bytes.OptimizeUtilsByteBuf; @@ -43,7 +45,9 @@ import net.jpountz.lz4.LZ4FastDecompressor; */ @SuppressWarnings("Duplicates") public -class KryoExtra extends Kryo { +class KryoExtra extends Kryo { + private static final Connection_ NOP_CONNECTION = new RmiNopConnection(); + // snappycomp : 7.534 micros/op; 518.5 MB/s (output: 55.1%) // snappyuncomp : 1.391 micros/op; 2808.1 MB/s // lz4comp : 6.210 micros/op; 629.0 MB/s (output: 55.4%) @@ -54,8 +58,8 @@ class KryoExtra extends Kryo { private final ByteBufInput reader = new ByteBufInput(); private final ByteBufOutput writer = new ByteBufOutput(); - // volatile to provide object visibility for entire class - public volatile RmiConnection connection; + // volatile to provide object visibility for entire class. This is unique per connection + public volatile ConnectionSupport rmiSupport; private final GCMBlockCipher aesEngine = new GCMBlockCipher(new AESFastEngine()); @@ -85,10 +89,10 @@ class KryoExtra extends Kryo { private byte[] decompressOutput; private ByteBuf decompressBuf; - private CryptoSerializationManager serializationManager; + private NetworkSerializationManager serializationManager; public - KryoExtra(final CryptoSerializationManager serializationManager) { + KryoExtra(final NetworkSerializationManager serializationManager) { super(new DefaultClassResolver(), new MapReferenceResolver(), new DefaultStreamFactory()); this.serializationManager = serializationManager; @@ -96,8 +100,8 @@ class KryoExtra extends Kryo { public synchronized void write(final ByteBuf buffer, final Object message) throws IOException { - // connection will always be NULL during connection initialization - this.connection = null; + // these will always be NULL during connection initialization + this.rmiSupport = null; // write the object to the NORMAL output buffer! writer.setBuffer(buffer); @@ -107,8 +111,8 @@ class KryoExtra extends Kryo { public synchronized Object read(final ByteBuf buffer) throws IOException { - // connection will always be NULL during connection initialization - this.connection = null; + // these will always be NULL during connection initialization + this.rmiSupport = null; //////////////// // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! @@ -127,16 +131,16 @@ class KryoExtra extends Kryo { */ public synchronized void writeCompressed(final ByteBuf buffer, final Object message) throws IOException { - writeCompressed(null, buffer, message); + writeCompressed(NOP_CONNECTION, buffer, message); } /** * This is NOT ENCRYPTED (and is only done on the loopback connection!) */ public synchronized - void writeCompressed(final C connection, final ByteBuf buffer, final Object message) throws IOException { + void writeCompressed(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.connection = connection; + this.rmiSupport = connection.rmiSupport(); ByteBuf objectOutputBuffer = this.tempBuffer; objectOutputBuffer.clear(); // always have to reset everything @@ -228,16 +232,16 @@ class KryoExtra extends Kryo { */ public Object readCompressed(final ByteBuf buffer, int length) throws IOException { - return readCompressed(null, buffer, length); + return readCompressed(NOP_CONNECTION, buffer, length); } /** * This is NOT ENCRYPTED (and is only done on the loopback connection!) */ public - Object readCompressed(final C connection, final ByteBuf buffer, int length) throws IOException { + Object readCompressed(final Connection_ connection, final ByteBuf buffer, int length) throws IOException { // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.connection = connection; + this.rmiSupport = connection.rmiSupport(); //////////////// @@ -320,9 +324,9 @@ class KryoExtra extends Kryo { } public synchronized - void writeCrypto(final C connection, final ByteBuf buffer, final Object message) throws IOException { + void writeCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.connection = connection; + this.rmiSupport = connection.rmiSupport(); ByteBuf objectOutputBuffer = this.tempBuffer; objectOutputBuffer.clear(); // always have to reset everything @@ -455,10 +459,9 @@ class KryoExtra extends Kryo { } public - Object readCrypto(final C connection, final ByteBuf buffer, int length) throws IOException { + Object readCrypto(final Connection_ connection, final ByteBuf buffer, int length) throws IOException { // required by RMI and some serializers to determine which connection wrote (or has info about) this object - this.connection = connection; - + this.rmiSupport = connection.rmiSupport(); //////////////// // Note: we CANNOT write BACK to the buffer as "temp" storage, since there could be additional data on it! @@ -591,7 +594,8 @@ class KryoExtra extends Kryo { } public - CryptoSerializationManager getSerializationManager() { + NetworkSerializationManager getSerializationManager() { return serializationManager; } + } diff --git a/src/dorkbox/network/connection/RegistrationWrapper.java b/src/dorkbox/network/connection/RegistrationWrapper.java index 35e507bb..cb64606e 100644 --- a/src/dorkbox/network/connection/RegistrationWrapper.java +++ b/src/dorkbox/network/connection/RegistrationWrapper.java @@ -32,7 +32,7 @@ import dorkbox.network.pipeline.udp.KryoDecoderUdp; import dorkbox.network.pipeline.udp.KryoDecoderUdpCrypto; import dorkbox.network.pipeline.udp.KryoEncoderUdp; import dorkbox.network.pipeline.udp.KryoEncoderUdpCrypto; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.RandomUtil; import dorkbox.util.collections.IntMap.Values; import dorkbox.util.collections.LockFreeIntMap; @@ -83,7 +83,7 @@ class RegistrationWrapper { } public - CryptoSerializationManager getSerialization() { + NetworkSerializationManager getSerialization() { return endPoint.getSerialization(); } diff --git a/src/dorkbox/network/connection/RmiConnection.java b/src/dorkbox/network/connection/RmiConnection.java deleted file mode 100644 index fc659d0a..00000000 --- a/src/dorkbox/network/connection/RmiConnection.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.connection; - -import dorkbox.network.rmi.RemoteObject; - -/** - * Supporting methods for RMI connections - */ -public -interface RmiConnection { - - /** - * Used by RMI for the LOCAL side, to get the proxy object as an interface - * - * @param objectId this is the RMI object ID - * @param iFace must be the interface the proxy will bind to - */ - RemoteObject getProxyObject(final int objectId, final Class iFace); - - /** - * This is used by RMI for the REMOTE side, to get the implementation - * - * @param objectId this is the RMI object ID - */ - Object getImplementationObject(final int objectId); - - /** - * Used by RMI - * - * @return the registered ID for a specific object. This is used by the "local" side when setting up the to fetch an object for the - * "remote" side for RMI - */ - int getRegisteredId(final T object); -} diff --git a/src/dorkbox/network/connection/Shutdownable.java b/src/dorkbox/network/connection/Shutdownable.java index 069296f4..ac39f9d2 100644 --- a/src/dorkbox/network/connection/Shutdownable.java +++ b/src/dorkbox/network/connection/Shutdownable.java @@ -193,8 +193,8 @@ class Shutdownable { for (ChannelFuture f : shutdownChannelList) { Channel channel = f.channel(); if (channel.isOpen()) { - channel.close() - .awaitUninterruptibly(maxShutdownWaitTimeInMilliSeconds); + // from the git example on how to shutdown a channel + channel.close().syncUninterruptibly(); Thread.yield(); } } @@ -216,16 +216,18 @@ class Shutdownable { } for (EventLoopGroup loopGroup : loopGroups) { - shutdownThreadList.add(loopGroup.shutdownGracefully(maxShutdownWaitTimeInMilliSeconds, - maxShutdownWaitTimeInMilliSeconds * 2, - TimeUnit.MILLISECONDS)); + Future future = loopGroup.shutdownGracefully(maxShutdownWaitTimeInMilliSeconds / 2, maxShutdownWaitTimeInMilliSeconds, TimeUnit.MILLISECONDS); + shutdownThreadList.add(future); Thread.yield(); } // now wait for them to finish! // It can take a few seconds to shut down the executor. This will affect unit testing, where connections are quickly created/stopped for (Future f : shutdownThreadList) { - f.syncUninterruptibly(); + try { + f.await(maxShutdownWaitTimeInMilliSeconds); + } catch (InterruptedException ignored) { + } Thread.yield(); } } diff --git a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java b/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java index 86acea8b..a66df550 100644 --- a/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java +++ b/src/dorkbox/network/connection/registration/ConnectionRegistrationImpl.java @@ -19,13 +19,13 @@ import org.bouncycastle.crypto.params.ParametersWithIV; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.ConnectionPoint; -import dorkbox.network.connection.CryptoConnection; +import dorkbox.network.connection.Connection_; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.Listeners; import dorkbox.network.connection.bridge.ConnectionBridge; import dorkbox.network.connection.idle.IdleBridge; import dorkbox.network.connection.idle.IdleSender; -import dorkbox.network.rmi.RemoteObject; +import dorkbox.network.rmi.ConnectionSupport; import dorkbox.network.rmi.RemoteObjectCallback; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -36,7 +36,7 @@ import io.netty.channel.ChannelHandlerContext; * This is to prevent race conditions where onMessage() can happen BEFORE a "connection" is "connected" */ public -class ConnectionRegistrationImpl implements CryptoConnection, ChannelHandler { +class ConnectionRegistrationImpl implements Connection_, ChannelHandler { public final ConnectionImpl connection; public @@ -71,24 +71,6 @@ class ConnectionRegistrationImpl implements CryptoConnection, ChannelHandler { return connection.getCryptoParameters(); } - @Override - public - RemoteObject getProxyObject(final int objectID, final Class iFace) { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - Object getImplementationObject(final int objectId) { - throw new IllegalArgumentException("not implemented"); - } - - @Override - public - int getRegisteredId(final T object) { - return 0; - } - @Override public boolean hasRemoteKeyChanged() { @@ -184,4 +166,10 @@ class ConnectionRegistrationImpl implements CryptoConnection, ChannelHandler { void getRemoteObject(final int objectId, final RemoteObjectCallback callback) { throw new IllegalArgumentException("not implemented"); } + + @Override + public + ConnectionSupport rmiSupport() { + return null; + } } diff --git a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java index 8133c1d6..61eb8848 100644 --- a/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java +++ b/src/dorkbox/network/connection/registration/remote/RegistrationRemoteHandler.java @@ -32,7 +32,7 @@ import dorkbox.network.connection.registration.Registration; import dorkbox.network.connection.registration.RegistrationHandler; import dorkbox.network.pipeline.tcp.KryoDecoder; import dorkbox.network.pipeline.tcp.KryoDecoderCrypto; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.crypto.CryptoECC; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -72,7 +72,7 @@ class RegistrationRemoteHandler extends RegistrationHandler { private static final String IDLE_HANDLER = "idleHandler"; - protected final CryptoSerializationManager serializationManager; + protected final NetworkSerializationManager serializationManager; RegistrationRemoteHandler(final String name, final RegistrationWrapper registrationWrapper, final EventLoopGroup workerEventLoop) { super(name, registrationWrapper, workerEventLoop); diff --git a/src/dorkbox/network/pipeline/tcp/KryoDecoder.java b/src/dorkbox/network/pipeline/tcp/KryoDecoder.java index 24642f16..df022aa3 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoDecoder.java +++ b/src/dorkbox/network/pipeline/tcp/KryoDecoder.java @@ -18,7 +18,7 @@ package dorkbox.network.pipeline.tcp; import java.io.IOException; import java.util.List; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteBuf; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -26,17 +26,17 @@ import io.netty.handler.codec.ByteToMessageDecoder; public class KryoDecoder extends ByteToMessageDecoder { - private final CryptoSerializationManager serializationManager; + private final NetworkSerializationManager serializationManager; public - KryoDecoder(final CryptoSerializationManager serializationManager) { + KryoDecoder(final NetworkSerializationManager serializationManager) { super(); this.serializationManager = serializationManager; } @SuppressWarnings("unused") protected - Object readObject(CryptoSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception { + Object readObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, ByteBuf in, int length) throws Exception { // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. return serializationManager.read(in, length); } @@ -81,7 +81,7 @@ class KryoDecoder extends ByteToMessageDecoder { // we can't test against a single "max size", since objects can back-up on the buffer. // we must ABSOLUTELY follow a "max size" rule when encoding objects, however. - final CryptoSerializationManager serializationManager = this.serializationManager; + final NetworkSerializationManager serializationManager = this.serializationManager; // Make sure if there's enough bytes in the buffer. if (length > readableBytes) { diff --git a/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java b/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java index bee51e7d..582c2394 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java +++ b/src/dorkbox/network/pipeline/tcp/KryoDecoderCrypto.java @@ -15,8 +15,8 @@ */ package dorkbox.network.pipeline.tcp; -import dorkbox.network.connection.CryptoConnection; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -26,20 +26,20 @@ public class KryoDecoderCrypto extends KryoDecoder { public - KryoDecoderCrypto(final CryptoSerializationManager serializationManager) { + KryoDecoderCrypto(final NetworkSerializationManager serializationManager) { super(serializationManager); } @Override protected - Object readObject(final CryptoSerializationManager serializationManager, + Object readObject(final NetworkSerializationManager serializationManager, final ChannelHandlerContext context, final ByteBuf in, final int length) throws Exception { try { - CryptoConnection connection = (CryptoConnection) context.pipeline() - .last(); + Connection_ connection = (Connection_) context.pipeline() + .last(); return serializationManager.readWithCrypto(connection, in, length); } catch (Exception e) { throw e; diff --git a/src/dorkbox/network/pipeline/tcp/KryoEncoder.java b/src/dorkbox/network/pipeline/tcp/KryoEncoder.java index 16c1b79a..0e1fd105 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoEncoder.java +++ b/src/dorkbox/network/pipeline/tcp/KryoEncoder.java @@ -17,7 +17,7 @@ package dorkbox.network.pipeline.tcp; import java.io.IOException; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.bytes.OptimizeUtilsByteBuf; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; @@ -29,11 +29,11 @@ public class KryoEncoder extends MessageToByteEncoder { // maximum size of length field. Un-optimized will always be 4, but optimized version can take from 1 - 4 (for 0-Integer.MAX_VALUE). private static final int reservedLengthIndex = 4; - private final CryptoSerializationManager serializationManager; + private final NetworkSerializationManager serializationManager; // When this is a UDP encode, there are ALREADY size limits placed on the buffer, so any extra checks are unnecessary public - KryoEncoder(final CryptoSerializationManager serializationManager) { + KryoEncoder(final NetworkSerializationManager serializationManager) { super(true); // just use direct buffers anyways. When using Heap buffers, they because chunked and the backing array is invalid. this.serializationManager = serializationManager; } @@ -41,7 +41,7 @@ class KryoEncoder extends MessageToByteEncoder { // the crypto writer will override this @SuppressWarnings("unused") protected - void writeObject(final CryptoSerializationManager kryoWrapper, + void writeObject(final NetworkSerializationManager kryoWrapper, final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException { diff --git a/src/dorkbox/network/pipeline/tcp/KryoEncoderCrypto.java b/src/dorkbox/network/pipeline/tcp/KryoEncoderCrypto.java index f5ef41f1..136136b3 100644 --- a/src/dorkbox/network/pipeline/tcp/KryoEncoderCrypto.java +++ b/src/dorkbox/network/pipeline/tcp/KryoEncoderCrypto.java @@ -17,8 +17,8 @@ package dorkbox.network.pipeline.tcp; import java.io.IOException; -import dorkbox.network.connection.CryptoConnection; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -28,19 +28,19 @@ public class KryoEncoderCrypto extends KryoEncoder { public - KryoEncoderCrypto(final CryptoSerializationManager serializationManager) { + KryoEncoderCrypto(final NetworkSerializationManager serializationManager) { super(serializationManager); } @Override protected - void writeObject(final CryptoSerializationManager serializationManager, + void writeObject(final NetworkSerializationManager serializationManager, final ChannelHandlerContext context, final Object msg, final ByteBuf buffer) throws IOException { - CryptoConnection connection = (CryptoConnection) context.pipeline() - .last(); + Connection_ connection = (Connection_) context.pipeline() + .last(); serializationManager.writeWithCrypto(connection, buffer, msg); } } diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java index e104079e..8026e501 100644 --- a/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdp.java @@ -20,7 +20,7 @@ import java.util.List; import org.slf4j.LoggerFactory; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.AddressedEnvelope; import io.netty.channel.Channel; @@ -33,10 +33,10 @@ import io.netty.handler.timeout.IdleStateEvent; @Sharable public class KryoDecoderUdp extends MessageToMessageDecoder { - private final CryptoSerializationManager serializationManager; + private final NetworkSerializationManager serializationManager; public - KryoDecoderUdp(CryptoSerializationManager serializationManager) { + KryoDecoderUdp(NetworkSerializationManager serializationManager) { this.serializationManager = serializationManager; } diff --git a/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java index ecf697b4..973da936 100644 --- a/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java +++ b/src/dorkbox/network/pipeline/udp/KryoDecoderUdpCrypto.java @@ -20,8 +20,8 @@ import java.util.List; import org.slf4j.LoggerFactory; -import dorkbox.network.connection.CryptoConnection; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler.Sharable; @@ -35,10 +35,10 @@ import io.netty.handler.timeout.IdleStateEvent; public class KryoDecoderUdpCrypto extends MessageToMessageDecoder { - private final CryptoSerializationManager serializationManager; + private final NetworkSerializationManager serializationManager; public - KryoDecoderUdpCrypto(CryptoSerializationManager serializationManager) { + KryoDecoderUdpCrypto(NetworkSerializationManager serializationManager) { this.serializationManager = serializationManager; } @@ -70,8 +70,8 @@ class KryoDecoderUdpCrypto extends MessageToMessageDecoder { public void decode(ChannelHandlerContext context, DatagramPacket in, List out) throws Exception { try { - CryptoConnection last = (CryptoConnection) context.pipeline() - .last(); + Connection_ last = (Connection_) context.pipeline() + .last(); ByteBuf data = in.content(); Object object = serializationManager.readWithCrypto(last, data, data.readableBytes()); out.add(object); diff --git a/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java b/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java index 94501e34..19f76f35 100644 --- a/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java +++ b/src/dorkbox/network/pipeline/udp/KryoEncoderUdp.java @@ -22,7 +22,7 @@ import java.util.List; import org.slf4j.LoggerFactory; import dorkbox.network.connection.EndPoint; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -35,11 +35,11 @@ class KryoEncoderUdp extends MessageToMessageEncoder { private static final int maxSize = EndPoint.udpMaxSize; - private final CryptoSerializationManager serializationManager; + private final NetworkSerializationManager serializationManager; public - KryoEncoderUdp(final CryptoSerializationManager serializationManager) { + KryoEncoderUdp(final NetworkSerializationManager serializationManager) { super(); this.serializationManager = serializationManager; } @@ -80,8 +80,7 @@ class KryoEncoderUdp extends MessageToMessageEncoder { } // the crypto writer will override this - void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) - throws IOException { + void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext context, Object msg, ByteBuf buffer) throws IOException { // no connection here because we haven't created one yet. When we do, we replace this handler with a new one. serializationManager.write(buffer, msg); } diff --git a/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java index 83f4ebae..22861fc8 100644 --- a/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java +++ b/src/dorkbox/network/pipeline/udp/KryoEncoderUdpCrypto.java @@ -17,8 +17,8 @@ package dorkbox.network.pipeline.udp; import java.io.IOException; -import dorkbox.network.connection.CryptoConnection; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.connection.Connection_; +import dorkbox.network.serialization.NetworkSerializationManager; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -28,16 +28,15 @@ public class KryoEncoderUdpCrypto extends KryoEncoderUdp { public - KryoEncoderUdpCrypto(CryptoSerializationManager serializationManager) { + KryoEncoderUdpCrypto(NetworkSerializationManager serializationManager) { super(serializationManager); } @Override - void writeObject(CryptoSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) - throws IOException { + void writeObject(NetworkSerializationManager serializationManager, ChannelHandlerContext ctx, Object msg, ByteBuf buffer) throws IOException { - CryptoConnection last = (CryptoConnection) ctx.pipeline() - .last(); + Connection_ last = (Connection_) ctx.pipeline() + .last(); serializationManager.writeWithCrypto(last, buffer, msg); } } diff --git a/src/dorkbox/network/rmi/ConnectionRmiSupport.java b/src/dorkbox/network/rmi/ConnectionRmiSupport.java index 4cfa7ba4..57375c82 100644 --- a/src/dorkbox/network/rmi/ConnectionRmiSupport.java +++ b/src/dorkbox/network/rmi/ConnectionRmiSupport.java @@ -17,7 +17,7 @@ import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.KryoExtra; import dorkbox.network.connection.Listener; import dorkbox.network.connection.Listener.OnMessageReceived; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.collections.LockFreeHashMap; import dorkbox.util.collections.LockFreeIntMap; import dorkbox.util.generics.ClassHelper; @@ -31,13 +31,17 @@ class ConnectionRmiSupport extends ConnectionSupport { private final Map proxyIdCache; private final List> proxyListeners; + final ConnectionImpl connection; + private final LockFreeIntMap rmiRegistrationCallbacks; private final Logger logger; private volatile int rmiCallbackId = 0; public - ConnectionRmiSupport(final RmiBridge rmiGlobalBridge, final RmiObjectHandler rmiHandler) { + ConnectionRmiSupport(final ConnectionImpl connection, final RmiBridge rmiGlobalBridge, final RmiObjectHandler rmiHandler) { + this.connection = connection; + if (rmiGlobalBridge == null || rmiHandler == null) { throw new NullPointerException("RMI cannot be null if using RMI support!"); } @@ -67,6 +71,9 @@ class ConnectionRmiSupport extends ConnectionSupport { rmiRegistrationCallbacks.clear(); } + /** + * This will remove the invoke and invoke response listeners for this remote object + */ public void removeAllListeners() { proxyListeners.clear(); @@ -123,12 +130,10 @@ class ConnectionRmiSupport extends ConnectionSupport { public boolean manage(final ConnectionImpl connection, final Object message) { if (message instanceof InvokeMethod) { - CryptoSerializationManager serialization = connection.getEndPoint().getSerialization(); + NetworkSerializationManager serialization = connection.getEndPoint().getSerialization(); InvokeMethod invokeMethod = rmiHandler.getInvokeMethod(serialization, connection, (InvokeMethod) message); - - int objectID = invokeMethod.objectID; // have to make sure to get the correct object (global vs local) @@ -142,27 +147,17 @@ class ConnectionRmiSupport extends ConnectionSupport { return true; // maybe false? } - // Executor executor2 = RmiBridge.this.executor; - // if (executor2 == null) { - try { - RmiBridge.invoke(connection, target, invokeMethod, logger); - } catch (IOException e) { - logger.error("Unable to invoke method.", e); + try { + InvokeMethodResult result = RmiBridge.invoke(connection, target, invokeMethod, logger); + if (result != null) { + // System.err.println("Sending: " + invokeMethod.responseID); + connection.send(result).flush(); } - // } - // else { - // executor2.execute(new Runnable() { - // @Override - // public - // void run() { - // try { - // RmiBridge.invoke(connection, target, invokeMethod, logger); - // } catch (IOException e) { - // logger.error("Unable to invoke method.", e); - // } - // } - // }); - // } + + } catch (IOException e) { + logger.error("Unable to invoke method.", e); + } + return true; } else if (message instanceof InvokeMethodResult) { @@ -185,11 +180,7 @@ class ConnectionRmiSupport extends ConnectionSupport { * For local connections, we have to switch it appropriately in the LocalRmiProxy */ public - RmiRegistration createNewRmiObject(final CryptoSerializationManager serialization, - final Class interfaceClass, - final Class implementationClass, - final int callbackId, - final Logger logger) { + RmiRegistration createNewRmiObject(final NetworkSerializationManager serialization, final Class interfaceClass, final Class implementationClass, final int callbackId, final Logger logger) { KryoExtra kryo = null; Object object = null; int rmiId = 0; @@ -295,6 +286,11 @@ class ConnectionRmiSupport extends ConnectionSupport { } } + /** + * Used by RMI by the LOCAL side when setting up the to fetch an object for the REMOTE side + * + * @return the registered ID for a specific object, or RmiBridge.INVALID_RMI if there was no ID. + */ public int getRegisteredId(final T object) { // always check global before checking local, because less contention on the synchronization @@ -308,17 +304,43 @@ class ConnectionRmiSupport extends ConnectionSupport { } } + /** + * This is used by RMI for the REMOTE side, to get the implementation + * + * @param objectId this is the RMI object ID + */ public - Object getImplementationObject(final int objectID) { - if (RmiBridge.isGlobal(objectID)) { - return rmiGlobalBridge.getRegisteredObject(objectID); + Object getImplementationObject(final int objectId) { + if (RmiBridge.isGlobal(objectId)) { + return rmiGlobalBridge.getRegisteredObject(objectId); } else { - return rmiLocalBridge.getRegisteredObject(objectID); + return rmiLocalBridge.getRegisteredObject(objectId); } } + /** + * Warning. This is an advanced method. You should probably be using {@link Connection#createRemoteObject(Class, RemoteObjectCallback)} + *

+ *

+ * Returns a proxy object that implements the specified interface, and the methods invoked on the proxy object will be invoked + * remotely. + *

+ * Methods that return a value will throw {@link TimeoutException} if the response is not received with the {@link + * RemoteObject#setResponseTimeout(int) response timeout}. + *

+ * If {@link RemoteObject#setAsync(boolean) non-blocking} is false (the default), then methods that return a value must not be + * called from the update thread for the connection. An exception will be thrown if this occurs. Methods with a void return value can be + * called on the update thread. + *

+ * If a proxy returned from this method is part of an object graph sent over the network, the object graph on the receiving side will + * have the proxy object replaced with the registered object. + * + * @see RemoteObject + * @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID + * @param iFace this is the RMI interface + */ public - RemoteObject getProxyObject(final ConnectionImpl connection, final int rmiId, final Class iFace) { + RemoteObject getProxyObject(final int rmiId, final Class iFace) { if (iFace == null) { throw new IllegalArgumentException("iface cannot be null."); } @@ -335,7 +357,7 @@ class ConnectionRmiSupport extends ConnectionSupport { // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. // the ACTUAL proxy is created in the connection impl. - RmiProxyNetworkHandler proxyObject = new RmiProxyNetworkHandler(connection, rmiId, iFace); + RmiProxyHandler proxyObject = new RmiProxyHandler(this.connection, this, rmiId, iFace); proxyListeners.add(proxyObject.getListener()); // This is the interface inheritance by the proxy object @@ -351,43 +373,6 @@ class ConnectionRmiSupport extends ConnectionSupport { return remoteObject; } - public - RemoteObject getLocalProxyObject(final ConnectionImpl connection, final int rmiId, final Class iFace, final Object object) { - if (iFace == null) { - throw new IllegalArgumentException("iface cannot be null."); - } - if (!iFace.isInterface()) { - throw new IllegalArgumentException("iface must be an interface."); - } - if (object == null) { - throw new IllegalArgumentException("object cannot be null."); - } - - - // we want to have a connection specific cache of IDs - // because this is PER CONNECTION, there is no need for synchronize(), since there will not be any issues with concurrent - // access, but there WILL be issues with thread visibility because a different worker thread can be called for different connections - RemoteObject remoteObject = proxyIdCache.get(rmiId); - - if (remoteObject == null) { - // duplicates are fine, as they represent the same object (as specified by the ID) on the remote side. - - // the ACTUAL proxy is created in the connection impl. - RmiProxyLocalHandler proxyObject = new RmiProxyLocalHandler(connection, rmiId, iFace, object); - proxyListeners.add(proxyObject.getListener()); - - Class[] temp = new Class[2]; - temp[0] = RemoteObject.class; - temp[1] = iFace; - - remoteObject = (RemoteObject) Proxy.newProxyInstance(RmiBridge.class.getClassLoader(), temp, proxyObject); - - proxyIdCache.put(rmiId, remoteObject); - } - - return remoteObject; - } - public Object fixupRmi(final ConnectionImpl connection, final Object message) { // "local RMI" objects have to be modified, this part does that diff --git a/src/dorkbox/network/rmi/ConnectionSupport.java b/src/dorkbox/network/rmi/ConnectionSupport.java index b6d5a862..123605d9 100644 --- a/src/dorkbox/network/rmi/ConnectionSupport.java +++ b/src/dorkbox/network/rmi/ConnectionSupport.java @@ -45,12 +45,7 @@ class ConnectionSupport { } public - RemoteObject getProxyObject(final ConnectionImpl connection, final int rmiId, final Class iFace) { - return null; - } - - public - RemoteObject getLocalProxyObject(final ConnectionImpl connection, final int rmiId, final Class iFace, final Object object) { + RemoteObject getProxyObject(final int rmiId, final Class iFace) { return null; } diff --git a/src/dorkbox/network/rmi/InvocationHandlerSerializer.java b/src/dorkbox/network/rmi/InvocationHandlerSerializer.java index a6872a02..758fc493 100644 --- a/src/dorkbox/network/rmi/InvocationHandlerSerializer.java +++ b/src/dorkbox/network/rmi/InvocationHandlerSerializer.java @@ -38,7 +38,7 @@ class InvocationHandlerSerializer extends Serializer { @Override public void write(Kryo kryo, Output output, Object object) { - RmiProxyNetworkHandler handler = (RmiProxyNetworkHandler) Proxy.getInvocationHandler(object); + RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(object); output.writeInt(handler.rmiObjectId, true); } @@ -49,7 +49,7 @@ class InvocationHandlerSerializer extends Serializer { int objectID = input.readInt(true); KryoExtra kryoExtra = (KryoExtra) kryo; - Object object = kryoExtra.connection.getImplementationObject(objectID); + Object object = kryoExtra.rmiSupport.getImplementationObject(objectID); if (object == null) { logger.error("Unknown object ID in RMI ObjectSpace: {}", objectID); diff --git a/src/dorkbox/network/rmi/InvokeMethodSerializer.java b/src/dorkbox/network/rmi/InvokeMethodSerializer.java index f0ef8b3d..f72b8b16 100644 --- a/src/dorkbox/network/rmi/InvokeMethodSerializer.java +++ b/src/dorkbox/network/rmi/InvokeMethodSerializer.java @@ -127,7 +127,7 @@ class InvokeMethodSerializer extends Serializer { argStartIndex = 1; args = new Object[serializers.length + 1]; - args[0] = ((KryoExtra) kryo).connection; + args[0] = ((ConnectionRmiSupport) ((KryoExtra) kryo).rmiSupport).connection; } else { method = cachedMethod.method; diff --git a/src/dorkbox/network/rmi/RemoteObjectSerializer.java b/src/dorkbox/network/rmi/RemoteObjectSerializer.java index d7eb67c8..e5f9a291 100644 --- a/src/dorkbox/network/rmi/RemoteObjectSerializer.java +++ b/src/dorkbox/network/rmi/RemoteObjectSerializer.java @@ -63,7 +63,7 @@ class RemoteObjectSerializer extends Serializer { public void write(Kryo kryo, Output output, Object object) { KryoExtra kryoExtra = (KryoExtra) kryo; - int id = kryoExtra.connection.getRegisteredId(object); + int id = kryoExtra.rmiSupport.getRegisteredId(object); // output.writeInt(id, true); } @@ -76,6 +76,6 @@ class RemoteObjectSerializer extends Serializer { // We have to lookup the iface, since the proxy object requires it Class iface = rmiImplToIface.get(implementationType); - return kryoExtra.connection.getProxyObject(objectID, iface); + return kryoExtra.rmiSupport.getProxyObject(objectID, iface); } } diff --git a/src/dorkbox/network/rmi/RmiBridge.java b/src/dorkbox/network/rmi/RmiBridge.java index eaaa7a59..e027829b 100644 --- a/src/dorkbox/network/rmi/RmiBridge.java +++ b/src/dorkbox/network/rmi/RmiBridge.java @@ -42,7 +42,7 @@ import org.slf4j.Logger; import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; -import dorkbox.network.serialization.RmiSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.util.Property; import dorkbox.util.collections.LockFreeIntBiMap; @@ -51,7 +51,7 @@ import dorkbox.util.collections.LockFreeIntBiMap; * object transformation (because there is no serialization occurring) using a series of weak hashmaps. *

*

- * Objects are {@link RmiSerializationManager#registerRmi(Class, Class)}, and endpoint connections can then {@link + * Objects are {@link NetworkSerializationManager#registerRmi(Class, Class)}, and endpoint connections can then {@link * Connection#createRemoteObject(Class, RemoteObjectCallback)} for the registered objects. *

* It costs at least 2 bytes more to use remote method invocation than just sending the parameters. If the method has a return value which @@ -148,7 +148,7 @@ class RmiBridge { */ @SuppressWarnings("NumericCastThatLosesPrecision") protected static - void invoke(final Connection connection, final Object target, final InvokeMethod invokeMethod, final Logger logger) throws IOException { + InvokeMethodResult invoke(final Connection connection, final Object target, final InvokeMethod invokeMethod, final Logger logger) throws IOException { CachedMethod cachedMethod = invokeMethod.cachedMethod; if (logger.isTraceEnabled()) { @@ -211,7 +211,7 @@ class RmiBridge { } if (responseID == 0) { - return; + return null; } InvokeMethodResult invokeMethodResult = new InvokeMethodResult(); @@ -228,10 +228,8 @@ class RmiBridge { invokeMethodResult.result = result; } - // System.err.println("Sending: " + invokeMethod.responseID); - connection.send(invokeMethodResult).flush(); - // logger.error("{} sent data: {} with id ({})", connection, result, invokeMethod.responseID); + return invokeMethodResult; } private diff --git a/src/dorkbox/network/rmi/RmiNopConnection.java b/src/dorkbox/network/rmi/RmiNopConnection.java new file mode 100644 index 00000000..7c7e93af --- /dev/null +++ b/src/dorkbox/network/rmi/RmiNopConnection.java @@ -0,0 +1,131 @@ +package dorkbox.network.rmi; + +import org.bouncycastle.crypto.params.ParametersWithIV; + +import dorkbox.network.connection.ConnectionPoint; +import dorkbox.network.connection.Connection_; +import dorkbox.network.connection.EndPoint; +import dorkbox.network.connection.Listeners; +import dorkbox.network.connection.bridge.ConnectionBridge; +import dorkbox.network.connection.idle.IdleBridge; +import dorkbox.network.connection.idle.IdleSender; + +/** + * + */ +public +class RmiNopConnection implements Connection_ { + @Override + public + boolean hasRemoteKeyChanged() { + return false; + } + + @Override + public + String getRemoteHost() { + return null; + } + + @Override + public + boolean isLoopback() { + return false; + } + + @Override + public + EndPoint getEndPoint() { + return null; + } + + @Override + public + int id() { + return 0; + } + + @Override + public + String idAsHex() { + return null; + } + + @Override + public + boolean hasUDP() { + return false; + } + + @Override + public + ConnectionBridge send() { + return null; + } + + @Override + public + ConnectionPoint send(final Object message) { + return null; + } + + @Override + public + IdleBridge sendOnIdle(final IdleSender sender) { + return null; + } + + @Override + public + IdleBridge sendOnIdle(final Object message) { + return null; + } + + @Override + public + Listeners listeners() { + return null; + } + + @Override + public + void close() { + + } + + @Override + public + void closeAsap() { + + } + + @Override + public + void createRemoteObject(final Class interfaceClass, final RemoteObjectCallback callback) { + + } + + @Override + public + void getRemoteObject(final int objectId, final RemoteObjectCallback callback) { + + } + + @Override + public + ConnectionSupport rmiSupport() { + return null; + } + + @Override + public + long getNextGcmSequence() { + return 0; + } + + @Override + public + ParametersWithIV getCryptoParameters() { + return null; + } +} diff --git a/src/dorkbox/network/rmi/RmiObjectHandler.java b/src/dorkbox/network/rmi/RmiObjectHandler.java index 98dcc973..ab818094 100644 --- a/src/dorkbox/network/rmi/RmiObjectHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectHandler.java @@ -16,12 +16,12 @@ package dorkbox.network.rmi; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; public interface RmiObjectHandler { - InvokeMethod getInvokeMethod(final CryptoSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod); + InvokeMethod getInvokeMethod(final NetworkSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod); void registration(final ConnectionRmiSupport rmiSupport, final ConnectionImpl connection, final RmiRegistration message); diff --git a/src/dorkbox/network/rmi/RmiObjectLocalHandler.java b/src/dorkbox/network/rmi/RmiObjectLocalHandler.java index 6d2251a2..a63785dd 100644 --- a/src/dorkbox/network/rmi/RmiObjectLocalHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectLocalHandler.java @@ -29,7 +29,7 @@ import com.esotericsoftware.kryo.util.IdentityMap; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.KryoExtra; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; /** * This is for a local-connection (same-JVM) RMI method invocation @@ -64,7 +64,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler { } public - InvokeMethod getInvokeMethod(final CryptoSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod) { + InvokeMethod getInvokeMethod(final NetworkSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod) { int methodClassID = invokeMethod.cachedMethod.methodClassID; int methodIndex = invokeMethod.cachedMethod.methodIndex; // have to replace the cached methods with the correct (remote) version, otherwise the wrong methods CAN BE invoked. @@ -137,7 +137,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler { // have to convert the iFace -> Impl EndPoint endPoint = connection.getEndPoint(); - CryptoSerializationManager serialization = endPoint.getSerialization(); + NetworkSerializationManager serialization = endPoint.getSerialization(); Class rmiImpl = serialization.getRmiImpl(registration.interfaceClass); @@ -175,7 +175,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler { else { // override the implementation object with the proxy. This is required because RMI must be the same between "network" and "local" // connections -- even if this "slows down" the speed/performance of what "local" connections offer. - proxyObject = rmiSupport.getLocalProxyObject(connection, registration.rmiId, interfaceClass, registration.remoteObject); + proxyObject = rmiSupport.getProxyObject(registration.rmiId, interfaceClass); if (proxyObject != null && registration.remoteObject != null) { // have to save A and B so we can correctly switch as necessary @@ -187,10 +187,10 @@ class RmiObjectLocalHandler implements RmiObjectHandler { } } - connection.runRmiCallback(interfaceClass, callbackId, proxyObject); + rmiSupport.runCallback(interfaceClass, callbackId, proxyObject, logger); } else { - connection.runRmiCallback(interfaceClass, callbackId, registration.remoteObject); + rmiSupport.runCallback(interfaceClass, callbackId, registration.remoteObject, logger); } } } @@ -243,7 +243,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler { o = field.get(message); if (o instanceof RemoteObject) { - RmiProxyLocalHandler handler = (RmiProxyLocalHandler) Proxy.getInvocationHandler(o); + RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o); int id = handler.rmiObjectId; field.set(message, rmiSupport.getImplementationObject(id)); @@ -296,7 +296,7 @@ class RmiObjectLocalHandler implements RmiObjectHandler { o = field.get(message); if (o instanceof RemoteObject) { - RmiProxyNetworkHandler handler = (RmiProxyNetworkHandler) Proxy.getInvocationHandler(o); + RmiProxyHandler handler = (RmiProxyHandler) Proxy.getInvocationHandler(o); int id = handler.rmiObjectId; field.set(message, rmiSupport.getImplementationObject(id)); diff --git a/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java b/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java index 4827fca8..43ca9abb 100644 --- a/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java +++ b/src/dorkbox/network/rmi/RmiObjectNetworkHandler.java @@ -18,7 +18,7 @@ package dorkbox.network.rmi; import org.slf4j.Logger; import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.serialization.CryptoSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; public class RmiObjectNetworkHandler implements RmiObjectHandler { @@ -31,7 +31,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler { } public - InvokeMethod getInvokeMethod(final CryptoSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod) { + InvokeMethod getInvokeMethod(final NetworkSerializationManager serialization, final ConnectionImpl connection, final InvokeMethod invokeMethod) { // everything is fine, there is nothing necessary to fix return invokeMethod; } @@ -54,7 +54,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler { // CREATE a new ID, and register the ID and new object (must create a new one) in the object maps // have to lookup the implementation class - CryptoSerializationManager serialization = connection.getEndPoint().getSerialization(); + NetworkSerializationManager serialization = connection.getEndPoint().getSerialization(); Class rmiImpl = serialization.getRmiImpl(interfaceClass); @@ -82,7 +82,7 @@ class RmiObjectNetworkHandler implements RmiObjectHandler { // this is the response. // THIS IS ON THE LOCAL CONNECTION SIDE, which is the side that called 'getRemoteObject()' This can be Server or Client. - connection.runRmiCallback(interfaceClass, callbackId, registration.remoteObject); + rmiSupport.runCallback(interfaceClass, callbackId, registration.remoteObject, logger); } } diff --git a/src/dorkbox/network/rmi/RmiProxyLocalHandler.java b/src/dorkbox/network/rmi/RmiProxyHandler.java similarity index 93% rename from src/dorkbox/network/rmi/RmiProxyLocalHandler.java rename to src/dorkbox/network/rmi/RmiProxyHandler.java index bfe2e19a..1542b986 100644 --- a/src/dorkbox/network/rmi/RmiProxyLocalHandler.java +++ b/src/dorkbox/network/rmi/RmiProxyHandler.java @@ -51,7 +51,7 @@ import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.KryoExtra; import dorkbox.network.connection.Listener; -import dorkbox.network.serialization.RmiSerializationManager; +import dorkbox.network.serialization.NetworkSerializationManager; /** * Handles network communication when methods are invoked on a proxy. @@ -60,8 +60,9 @@ import dorkbox.network.serialization.RmiSerializationManager; *

* If there are no checked exceptions thrown, then we don't have to explicitly set 'transmitExceptions' to false */ +@SuppressWarnings("Duplicates") public -class RmiProxyLocalHandler implements InvocationHandler { +class RmiProxyHandler implements InvocationHandler { private final Logger logger; private final ReentrantLock lock = new ReentrantLock(); @@ -71,8 +72,9 @@ class RmiProxyLocalHandler implements InvocationHandler { private final boolean[] pendingResponses = new boolean[64]; private final ConnectionImpl connection; + private final ConnectionRmiSupport rmiSupport; public final int rmiObjectId; // this is the RMI id - public final int ID; // this is the KRYO id + public final int classId; // this is the KRYO class id private final String proxyString; @@ -97,25 +99,26 @@ class RmiProxyLocalHandler implements InvocationHandler { /** * @param connection this is really the network client -- there is ONLY ever 1 connection + * @param rmiSupport is used to provide RMI support * @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID * @param iFace this is the RMI interface - * @param object */ public - RmiProxyLocalHandler(final ConnectionImpl connection, final int rmiId, final Class iFace, final Object object) { + RmiProxyHandler(final ConnectionImpl connection, final ConnectionRmiSupport rmiSupport, final int rmiId, final Class iFace) { super(); this.connection = connection; + this.rmiSupport = rmiSupport; this.rmiObjectId = rmiId; this.proxyString = ""; - EndPoint endPointConnection = this.connection.getEndPoint(); - final RmiSerializationManager serializationManager = endPointConnection.getSerialization(); + EndPoint endPoint = this.connection.getEndPoint(); + final NetworkSerializationManager serializationManager = endPoint.getSerialization(); KryoExtra kryoExtra = null; try { kryoExtra = serializationManager.takeKryo(); - this.ID = kryoExtra.getRegistration(iFace).getId(); + this.classId = kryoExtra.getRegistration(iFace).getId(); } finally { if (kryoExtra != null) { serializationManager.returnKryo(kryoExtra); @@ -135,16 +138,16 @@ class RmiProxyLocalHandler implements InvocationHandler { } synchronized (this) { - if (RmiProxyLocalHandler.this.pendingResponses[responseID]) { - RmiProxyLocalHandler.this.responseTable[responseID] = invokeMethodResult; + if (RmiProxyHandler.this.pendingResponses[responseID]) { + RmiProxyHandler.this.responseTable[responseID] = invokeMethodResult; } } - RmiProxyLocalHandler.this.lock.lock(); + RmiProxyHandler.this.lock.lock(); try { - RmiProxyLocalHandler.this.responseCondition.signalAll(); + RmiProxyHandler.this.responseCondition.signalAll(); } finally { - RmiProxyLocalHandler.this.lock.unlock(); + RmiProxyHandler.this.lock.unlock(); } } }; @@ -165,7 +168,7 @@ class RmiProxyLocalHandler implements InvocationHandler { String name = method.getName(); if (name.equals("close")) { - connection.removeRmiListeners(rmiObjectId, getListener()); + rmiSupport.removeAllListeners(); return null; } else if (name.equals("setResponseTimeout")) { @@ -233,7 +236,7 @@ class RmiProxyLocalHandler implements InvocationHandler { // which method do we access? We always want to access the IMPLEMENTATION (if available!) CachedMethod[] cachedMethods = connection.getEndPoint() .getSerialization() - .getMethods(ID); + .getMethods(classId); for (int i = 0, n = cachedMethods.length; i < n; i++) { CachedMethod cachedMethod = cachedMethods[i]; @@ -435,7 +438,7 @@ class RmiProxyLocalHandler implements InvocationHandler { if (getClass() != obj.getClass()) { return false; } - RmiProxyLocalHandler other = (RmiProxyLocalHandler) obj; + RmiProxyHandler other = (RmiProxyHandler) obj; return this.rmiObjectId == other.rmiObjectId; } } diff --git a/src/dorkbox/network/rmi/RmiProxyNetworkHandler.java b/src/dorkbox/network/rmi/RmiProxyNetworkHandler.java deleted file mode 100644 index efb874e3..00000000 --- a/src/dorkbox/network/rmi/RmiProxyNetworkHandler.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Copyright (c) 2008, Nathan Sweet - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following - * conditions are met: - * - * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. - * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials provided with the distribution. - * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, - * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING - * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ -package dorkbox.network.rmi; - - -import java.io.IOException; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import dorkbox.network.connection.Connection; -import dorkbox.network.connection.ConnectionImpl; -import dorkbox.network.connection.EndPoint; -import dorkbox.network.connection.KryoExtra; -import dorkbox.network.connection.Listener; -import dorkbox.network.serialization.RmiSerializationManager; - -/** - * Handles network communication when methods are invoked on a proxy. - *

- * If the method return type is 'void', then we don't have to explicitly set 'transmitReturnValue' to false - *

- * If there are no checked exceptions thrown, then we don't have to explicitly set 'transmitExceptions' to false - */ -public -class RmiProxyNetworkHandler implements InvocationHandler { - private final Logger logger; - - private final ReentrantLock lock = new ReentrantLock(); - private final Condition responseCondition = this.lock.newCondition(); - - private final InvokeMethodResult[] responseTable = new InvokeMethodResult[64]; - private final boolean[] pendingResponses = new boolean[64]; - - private final ConnectionImpl connection; - public final int rmiObjectId; // this is the RMI id - public final int ID; // this is the KRYO id - - - private final String proxyString; - private final - Listener.OnMessageReceived responseListener; - - private int timeoutMillis = 3000; - private boolean isAsync = false; - - // if the return type is 'void', then this has no meaning. - private boolean transmitReturnValue = false; - - // if there are no checked exceptions thrown, then this has no meaning - private boolean transmitExceptions = false; - - private boolean enableToString; - - private boolean udp; - - private Byte lastResponseID; - private byte nextResponseId = (byte) 1; - - /** - * @param connection this is really the network client -- there is ONLY ever 1 connection - * @param rmiId this is the remote object ID (assigned by RMI). This is NOT the kryo registration ID - * @param iFace this is the RMI interface - */ - public - RmiProxyNetworkHandler(final ConnectionImpl connection, final int rmiId, final Class iFace) { - super(); - - this.connection = connection; - this.rmiObjectId = rmiId; - this.proxyString = ""; - - EndPoint endPointConnection = this.connection.getEndPoint(); - final RmiSerializationManager serializationManager = endPointConnection.getSerialization(); - - KryoExtra kryoExtra = null; - try { - kryoExtra = serializationManager.takeKryo(); - this.ID = kryoExtra.getRegistration(iFace).getId(); - } finally { - if (kryoExtra != null) { - serializationManager.returnKryo(kryoExtra); - } - } - - this.logger = LoggerFactory.getLogger(connection.getEndPoint().getName() + ":" + this.getClass().getSimpleName()); - - this.responseListener = new Listener.OnMessageReceived() { - @Override - public - void received(Connection connection, InvokeMethodResult invokeMethodResult) { - byte responseID = invokeMethodResult.responseId; - - if (invokeMethodResult.rmiObjectId != rmiId) { - return; - } - - synchronized (this) { - if (RmiProxyNetworkHandler.this.pendingResponses[responseID]) { - RmiProxyNetworkHandler.this.responseTable[responseID] = invokeMethodResult; - } - } - - RmiProxyNetworkHandler.this.lock.lock(); - try { - RmiProxyNetworkHandler.this.responseCondition.signalAll(); - } finally { - RmiProxyNetworkHandler.this.lock.unlock(); - } - } - }; - } - - public - Listener.OnMessageReceived getListener() { - return responseListener; - } - - @SuppressWarnings({"AutoUnboxing", "AutoBoxing", "NumericCastThatLosesPrecision", "IfCanBeSwitch"}) - @Override - public - Object invoke(final Object proxy, final Method method, final Object[] args) throws Exception { - final Class declaringClass = method.getDeclaringClass(); - if (declaringClass == RemoteObject.class) { - // manage all of the RemoteObject proxy methods - - String name = method.getName(); - if (name.equals("close")) { - connection.removeRmiListeners(rmiObjectId, getListener()); - return null; - } - else if (name.equals("setResponseTimeout")) { - this.timeoutMillis = (Integer) args[0]; - return null; - } - else if (name.equals("getResponseTimeout")) { - return this.timeoutMillis; - } - else if (name.equals("setAsync")) { - this.isAsync = (Boolean) args[0]; - return null; - } - else if (name.equals("setTransmitReturnValue")) { - this.transmitReturnValue = (Boolean) args[0]; - return null; - } - else if (name.equals("setTransmitExceptions")) { - this.transmitExceptions = (Boolean) args[0]; - return null; - } - else if (name.equals("setTCP")) { - this.udp = false; - return null; - } - else if (name.equals("setUDP")) { - this.udp = true; - return null; - } - else if (name.equals("enableToString")) { - this.enableToString = (Boolean) args[0]; - return null; - } - else if (name.equals("waitForLastResponse")) { - if (this.lastResponseID == null) { - throw new IllegalStateException("There is no last response to wait for."); - } - return waitForResponse(this.lastResponseID); - } - else if (name.equals("getLastResponseID")) { - if (this.lastResponseID == null) { - throw new IllegalStateException("There is no last response ID."); - } - return this.lastResponseID; - } - else if (name.equals("waitForResponse")) { - if (!this.transmitReturnValue && !this.transmitExceptions && this.isAsync) { - throw new IllegalStateException("This RemoteObject is currently set to ignore all responses."); - } - return waitForResponse((Byte) args[0]); - } - - // Should never happen, for debugging purposes only! - throw new Exception("Invocation handler could not find RemoteObject method for " + name); - } - else if (!this.enableToString && declaringClass == Object.class && method.getName() - .equals("toString")) { - return proxyString; - } - - InvokeMethod invokeMethod = new InvokeMethod(); - invokeMethod.objectID = this.rmiObjectId; - invokeMethod.args = args; - - // which method do we access? We always want to access the IMPLEMENTATION (if available!) - CachedMethod[] cachedMethods = connection.getEndPoint() - .getSerialization() - .getMethods(ID); - - for (int i = 0, n = cachedMethods.length; i < n; i++) { - CachedMethod cachedMethod = cachedMethods[i]; - Method checkMethod = cachedMethod.method; - - if (checkMethod.equals(method)) { - invokeMethod.cachedMethod = cachedMethod; - break; - } - } - - if (invokeMethod.cachedMethod == null) { - String msg = "Method not found: " + method; - logger.error(msg); - return msg; - } - - - byte responseID = (byte) 0; - Class returnType = method.getReturnType(); - - // If the method return type is 'void', then we don't have to explicitly set 'transmitReturnValue' to false - boolean shouldReturnValue = returnType != void.class || this.transmitReturnValue; - - // If there are no checked exceptions thrown, then we don't have to explicitly set 'transmitExceptions' to false - boolean shouldTransmitExceptions = (method.getExceptionTypes().length != 0 || method.getGenericExceptionTypes().length != 0) || this.transmitExceptions; - - // If we are async (but still have a return type or throw checked exceptions) then we ignore the response - // If we are 'void' return type and do not throw checked exceptions then we ignore the response - boolean ignoreResponse = (this.isAsync || returnType == void.class) && !(shouldReturnValue || shouldTransmitExceptions); - - if (ignoreResponse) { - invokeMethod.responseData = (byte) 0; // 0 means do not respond. - } - else { - synchronized (this) { - // Increment the response counter and put it into the low bits of the responseID. - responseID = this.nextResponseId++; - if (this.nextResponseId > RmiBridge.responseIdMask) { - this.nextResponseId = (byte) 1; - } - this.pendingResponses[responseID] = true; - } - // Pack other data into the high bits. - byte responseData = responseID; - if (shouldReturnValue) { - responseData |= (byte) RmiBridge.returnValueMask; - } - if (shouldTransmitExceptions) { - responseData |= (byte) RmiBridge.returnExceptionMask; - } - invokeMethod.responseData = responseData; - } - - byte lastResponseID = (byte) (invokeMethod.responseData & RmiBridge.responseIdMask); - this.lastResponseID = lastResponseID; - - // Sends our invokeMethod to the remote connection, which the RmiBridge listens for - if (this.udp) { - // flush is necessary in case this is called outside of a network worker thread - this.connection.UDP(invokeMethod).flush(); - } - else { - // flush is necessary in case this is called outside of a network worker thread - this.connection.send(invokeMethod).flush(); - } - - if (logger.isTraceEnabled()) { - String argString = ""; - if (args != null) { - argString = Arrays.deepToString(args); - argString = argString.substring(1, argString.length() - 1); - } - logger.trace(this.connection + " sent: " + method.getDeclaringClass() - .getSimpleName() + - "#" + method.getName() + "(" + argString + ")"); - } - - // MUST use 'waitForLastResponse()' or 'waitForResponse'('getLastResponseID()') to get the response - // If we are async then we return immediately - // If we are 'void' return type and do not throw checked exceptions then we return immediately - boolean respondImmediately = this.isAsync || (returnType == void.class) && !(shouldReturnValue || shouldTransmitExceptions); - if (respondImmediately) { - if (returnType.isPrimitive()) { - if (returnType == int.class) { - return 0; - } - if (returnType == boolean.class) { - return Boolean.FALSE; - } - if (returnType == float.class) { - return 0.0f; - } - if (returnType == char.class) { - return (char) 0; - } - if (returnType == long.class) { - return 0L; - } - if (returnType == short.class) { - return (short) 0; - } - if (returnType == byte.class) { - return (byte) 0; - } - if (returnType == double.class) { - return 0.0d; - } - } - return null; - } - - try { - Object result = waitForResponse(lastResponseID); - if (result instanceof Exception) { - throw (Exception) result; - } - else { - return result; - } - } catch (TimeoutException ex) { - throw new TimeoutException("Response timed out: " + method.getDeclaringClass() - .getName() + "." + method.getName()); - } finally { - synchronized (this) { - this.pendingResponses[responseID] = false; - this.responseTable[responseID] = null; - } - } - } - - /** - * A timeout of 0 means that we want to disable waiting, otherwise - it waits in milliseconds - */ - private - Object waitForResponse(final byte responseID) throws IOException { - // if timeout == 0, we wait "forever" - long remaining; - long endTime; - - if (this.timeoutMillis != 0) { - remaining = this.timeoutMillis; - endTime = System.currentTimeMillis() + remaining; - } else { - // not forever, but close enough - remaining = Long.MAX_VALUE; - endTime = Long.MAX_VALUE; - } - - // wait for the specified time - while (remaining > 0) { - InvokeMethodResult invokeMethodResult; - synchronized (this) { - invokeMethodResult = this.responseTable[responseID]; - } - - if (invokeMethodResult != null) { - this.lastResponseID = null; - return invokeMethodResult.result; - } - else { - this.lock.lock(); - try { - this.responseCondition.await(remaining, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread() - .interrupt(); - throw new IOException("Response timed out.", e); - } finally { - this.lock.unlock(); - } - } - - remaining = endTime - System.currentTimeMillis(); - } - - // only get here if we timeout - throw new TimeoutException("Response timed out."); - } - - @Override - public - int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + this.rmiObjectId; - return result; - } - - @Override - public - boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - RmiProxyNetworkHandler other = (RmiProxyNetworkHandler) obj; - return this.rmiObjectId == other.rmiObjectId; - } -} diff --git a/src/dorkbox/network/rmi/RmiRegistrationSerializer.java b/src/dorkbox/network/rmi/RmiRegistrationSerializer.java index e6c82b58..5d7e7d4d 100644 --- a/src/dorkbox/network/rmi/RmiRegistrationSerializer.java +++ b/src/dorkbox/network/rmi/RmiRegistrationSerializer.java @@ -45,7 +45,7 @@ class RmiRegistrationSerializer extends Serializer { if (object.remoteObject != null) { KryoExtra kryoExtra = (KryoExtra) kryo; - id = kryoExtra.connection.getRegisteredId(object.remoteObject); + id = kryoExtra.rmiSupport.getRegisteredId(object.remoteObject); } else { // can be < 0 or >= RmiBridge.INVALID_RMI (Integer.MAX_VALUE) id = -1; @@ -72,7 +72,7 @@ class RmiRegistrationSerializer extends Serializer { Object remoteObject = null; if (remoteObjectId >= 0 && remoteObjectId < RmiBridge.INVALID_RMI) { KryoExtra kryoExtra = (KryoExtra) kryo; - remoteObject = kryoExtra.connection.getProxyObject(remoteObjectId, iface); + remoteObject = kryoExtra.rmiSupport.getProxyObject(remoteObjectId, iface); } RmiRegistration rmiRegistration = new RmiRegistration(iface, rmiId, callbackId, remoteObject); diff --git a/src/dorkbox/network/serialization/ClassRegistration.java b/src/dorkbox/network/serialization/ClassRegistration.java index aae00eeb..f480f10e 100644 --- a/src/dorkbox/network/serialization/ClassRegistration.java +++ b/src/dorkbox/network/serialization/ClassRegistration.java @@ -20,7 +20,6 @@ import org.slf4j.Logger; import com.esotericsoftware.kryo.Registration; import com.esotericsoftware.kryo.Serializer; -import dorkbox.network.connection.CryptoConnection; import dorkbox.network.connection.KryoExtra; import dorkbox.network.rmi.RemoteObjectSerializer; @@ -33,7 +32,7 @@ class ClassRegistration { this.clazz = clazz; } - void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { + void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { Registration registration; if (clazz.isInterface()) { diff --git a/src/dorkbox/network/serialization/ClassSerializer.java b/src/dorkbox/network/serialization/ClassSerializer.java index 9a5e4258..470012b4 100644 --- a/src/dorkbox/network/serialization/ClassSerializer.java +++ b/src/dorkbox/network/serialization/ClassSerializer.java @@ -19,7 +19,6 @@ import org.slf4j.Logger; import com.esotericsoftware.kryo.Serializer; -import dorkbox.network.connection.CryptoConnection; import dorkbox.network.connection.KryoExtra; import dorkbox.network.rmi.RemoteObjectSerializer; @@ -29,7 +28,7 @@ class ClassSerializer extends ClassRegistration { this.serializer = serializer; } - void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { + void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { id = kryo.register(clazz, serializer).getId(); } diff --git a/src/dorkbox/network/serialization/ClassSerializer1.java b/src/dorkbox/network/serialization/ClassSerializer1.java index 9f27cb48..ed68c1d3 100644 --- a/src/dorkbox/network/serialization/ClassSerializer1.java +++ b/src/dorkbox/network/serialization/ClassSerializer1.java @@ -17,7 +17,6 @@ package dorkbox.network.serialization; import org.slf4j.Logger; -import dorkbox.network.connection.CryptoConnection; import dorkbox.network.connection.KryoExtra; import dorkbox.network.rmi.RemoteObjectSerializer; @@ -27,7 +26,7 @@ class ClassSerializer1 extends ClassRegistration { this.id = id; } - void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { + void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { kryo.register(clazz, id); } diff --git a/src/dorkbox/network/serialization/ClassSerializer2.java b/src/dorkbox/network/serialization/ClassSerializer2.java index 40cf3a2e..97a87736 100644 --- a/src/dorkbox/network/serialization/ClassSerializer2.java +++ b/src/dorkbox/network/serialization/ClassSerializer2.java @@ -19,7 +19,6 @@ import org.slf4j.Logger; import com.esotericsoftware.kryo.Serializer; -import dorkbox.network.connection.CryptoConnection; import dorkbox.network.connection.KryoExtra; import dorkbox.network.rmi.RemoteObjectSerializer; @@ -30,7 +29,7 @@ class ClassSerializer2 extends ClassRegistration { this.id = id; } - void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { + void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { kryo.register(clazz, serializer, id); } diff --git a/src/dorkbox/network/serialization/ClassSerializerRmi.java b/src/dorkbox/network/serialization/ClassSerializerRmi.java index 5932d1dd..c9e80c76 100644 --- a/src/dorkbox/network/serialization/ClassSerializerRmi.java +++ b/src/dorkbox/network/serialization/ClassSerializerRmi.java @@ -17,7 +17,6 @@ package dorkbox.network.serialization; import org.slf4j.Logger; -import dorkbox.network.connection.CryptoConnection; import dorkbox.network.connection.KryoExtra; import dorkbox.network.rmi.RemoteObjectSerializer; @@ -29,7 +28,7 @@ class ClassSerializerRmi extends ClassRegistration { this.implClass = implClass; } - void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { + void register(final KryoExtra kryo, final RemoteObjectSerializer remoteObjectSerializer) { this.id = kryo.register(clazz, remoteObjectSerializer).getId(); this.serializer = remoteObjectSerializer; } diff --git a/src/dorkbox/network/serialization/CryptoSerializationManager.java b/src/dorkbox/network/serialization/CryptoSerializationManager.java deleted file mode 100644 index cf865faf..00000000 --- a/src/dorkbox/network/serialization/CryptoSerializationManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2010 dorkbox, llc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dorkbox.network.serialization; - -import java.io.IOException; - -import dorkbox.network.connection.CryptoConnection; -import io.netty.buffer.ByteBuf; - -/** - * Threads reading/writing, it messes up a single instance. it is possible to use a single kryo with the use of synchronize, however - that - * defeats the point of multi-threaded - */ -public -interface CryptoSerializationManager extends RmiSerializationManager { - - /** - * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. - *

- * There is a small speed penalty if there were no kryo's available to use. - */ - void writeWithCrypto(C connection, ByteBuf buffer, Object message) throws IOException; - - /** - * Reads an object from the buffer. - *

- * Crypto + sequence number - * - * @param connection - * can be NULL - * @param length - * should ALWAYS be the length of the expected object! - */ - Object readWithCrypto(C connection, ByteBuf buffer, int length) throws IOException; -} diff --git a/src/dorkbox/network/serialization/RmiSerializationManager.java b/src/dorkbox/network/serialization/NetworkSerializationManager.java similarity index 79% rename from src/dorkbox/network/serialization/RmiSerializationManager.java rename to src/dorkbox/network/serialization/NetworkSerializationManager.java index 32700885..56bbc3f4 100644 --- a/src/dorkbox/network/serialization/RmiSerializationManager.java +++ b/src/dorkbox/network/serialization/NetworkSerializationManager.java @@ -15,16 +15,39 @@ */ package dorkbox.network.serialization; +import java.io.IOException; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; +import dorkbox.network.connection.Connection_; import dorkbox.network.connection.KryoExtra; import dorkbox.network.rmi.CachedMethod; import dorkbox.util.serialization.SerializationManager; +import io.netty.buffer.ByteBuf; public -interface RmiSerializationManager extends SerializationManager { +interface NetworkSerializationManager extends SerializationManager { + + /** + * Waits until a kryo is available to write, using CAS operations to prevent having to synchronize. + *

+ * There is a small speed penalty if there were no kryo's available to use. + */ + void writeWithCrypto(Connection_ connection, ByteBuf buffer, Object message) throws IOException; + + /** + * Reads an object from the buffer. + *

+ * Crypto + sequence number + * + * @param connection + * can be NULL + * @param length + * should ALWAYS be the length of the expected object! + */ + Object readWithCrypto(Connection_ connection, ByteBuf buffer, int length) throws IOException; /** * Registers the class using the lowest, next available integer ID and the {@link Kryo#getDefaultSerializer(Class) default serializer}. @@ -36,7 +59,7 @@ interface RmiSerializationManager extends SerializationManager { * method. The order must be the same at deserialization as it was for serialization. */ @Override - RmiSerializationManager register(Class clazz); + NetworkSerializationManager register(Class clazz); /** * Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID @@ -50,7 +73,7 @@ interface RmiSerializationManager extends SerializationManager { * these IDs can be repurposed. */ @Override - RmiSerializationManager register(Class clazz, int id); + NetworkSerializationManager register(Class clazz, int id); /** * Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered, @@ -62,7 +85,7 @@ interface RmiSerializationManager extends SerializationManager { * method. The order must be the same at deserialization as it was for serialization. */ @Override - RmiSerializationManager register(Class clazz, Serializer serializer); + NetworkSerializationManager register(Class clazz, Serializer serializer); /** * Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is @@ -76,7 +99,7 @@ interface RmiSerializationManager extends SerializationManager { * these IDs can be repurposed. */ @Override - RmiSerializationManager register(Class clazz, Serializer serializer, int id); + NetworkSerializationManager register(Class clazz, Serializer serializer, int id); /** * Necessary to register classes for RMI, only called once when the RMI bridge is created. @@ -117,7 +140,7 @@ interface RmiSerializationManager extends SerializationManager { * Enable a "remote client" to access methods and create objects (RMI) for this endpoint. This is NOT bi-directional, and this endpoint cannot access or \ * create remote objects on the "remote client". *

- * Calling this method with a null parameter for the implementation class is the same as calling {@link RmiSerializationManager#registerRmi(Class)} + * Calling this method with a null parameter for the implementation class is the same as calling {@link NetworkSerializationManager#registerRmi(Class)} *

* There is additional overhead to using RMI. *

@@ -129,7 +152,7 @@ interface RmiSerializationManager extends SerializationManager { * * @throws IllegalArgumentException if the iface/impl have previously been overridden */ - RmiSerializationManager registerRmi(Class ifaceClass, Class implClass); + NetworkSerializationManager registerRmi(Class ifaceClass, Class implClass); /** * Gets the cached methods for the specified class ID diff --git a/src/dorkbox/network/serialization/Serialization.java b/src/dorkbox/network/serialization/Serialization.java index 2810f95b..561c4e0d 100644 --- a/src/dorkbox/network/serialization/Serialization.java +++ b/src/dorkbox/network/serialization/Serialization.java @@ -37,11 +37,10 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; import com.esotericsoftware.kryo.serializers.FieldSerializer; import com.esotericsoftware.kryo.util.IdentityMap; -import com.esotericsoftware.kryo.util.IntMap; import com.esotericsoftware.kryo.util.MapReferenceResolver; import com.esotericsoftware.kryo.util.Util; -import dorkbox.network.connection.CryptoConnection; +import dorkbox.network.connection.Connection_; import dorkbox.network.connection.KryoExtra; import dorkbox.network.connection.ping.PingMessage; import dorkbox.network.rmi.CachedMethod; @@ -57,6 +56,7 @@ import dorkbox.network.rmi.RmiUtils; import dorkbox.objectPool.ObjectPool; import dorkbox.objectPool.PoolableObject; import dorkbox.util.Property; +import dorkbox.util.collections.IntMap; import dorkbox.util.serialization.ArraysAsListSerializer; import dorkbox.util.serialization.EccPrivateKeySerializer; import dorkbox.util.serialization.EccPublicKeySerializer; @@ -77,7 +77,7 @@ import io.netty.buffer.Unpooled; */ @SuppressWarnings({"StaticNonFinalField"}) public -class Serialization implements CryptoSerializationManager { +class Serialization implements NetworkSerializationManager { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Serialization.class.getSimpleName()); @@ -115,9 +115,9 @@ class Serialization implements CryptoSerializationMa * Kryo#newDefaultSerializer(Class) */ public static - Serialization DEFAULT(final boolean references, final boolean registrationRequired, final SerializerFactory factory) { + Serialization DEFAULT(final boolean references, final boolean registrationRequired, final SerializerFactory factory) { - final Serialization serialization = new Serialization(references, + final Serialization serialization = new Serialization(references, registrationRequired, factory); @@ -147,7 +147,7 @@ class Serialization implements CryptoSerializationMa } private boolean initialized = false; - private final ObjectPool> kryoPool; + private final ObjectPool kryoPool; private final boolean registrationRequired; @@ -207,13 +207,13 @@ class Serialization implements CryptoSerializationMa this.registrationRequired = registrationRequired; - this.kryoPool = ObjectPool.NonBlockingSoftReference(new PoolableObject>() { + this.kryoPool = ObjectPool.NonBlockingSoftReference(new PoolableObject() { @Override public - KryoExtra create() { + KryoExtra create() { synchronized (Serialization.this) { // we HAVE to pre-allocate the KRYOs - KryoExtra kryo = new KryoExtra(Serialization.this); + KryoExtra kryo = new KryoExtra(Serialization.this); kryo.getFieldSerializerConfig().setUseAsm(useAsm); kryo.setRegistrationRequired(registrationRequired); @@ -261,7 +261,7 @@ class Serialization implements CryptoSerializationMa */ @Override public synchronized - RmiSerializationManager register(Class clazz) { + NetworkSerializationManager register(Class clazz) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class) call."); } @@ -290,7 +290,7 @@ class Serialization implements CryptoSerializationMa */ @Override public synchronized - RmiSerializationManager register(Class clazz, int id) { + NetworkSerializationManager register(Class clazz, int id) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, int) call."); } @@ -315,7 +315,7 @@ class Serialization implements CryptoSerializationMa */ @Override public synchronized - RmiSerializationManager register(Class clazz, Serializer serializer) { + NetworkSerializationManager register(Class clazz, Serializer serializer) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer) call."); } @@ -342,7 +342,7 @@ class Serialization implements CryptoSerializationMa */ @Override public synchronized - RmiSerializationManager register(Class clazz, Serializer serializer, int id) { + NetworkSerializationManager register(Class clazz, Serializer serializer, int id) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate register(Class, Serializer, int) call."); } @@ -372,7 +372,7 @@ class Serialization implements CryptoSerializationMa */ @Override public synchronized - RmiSerializationManager registerRmi(Class ifaceClass, Class implClass) { + NetworkSerializationManager registerRmi(Class ifaceClass, Class implClass) { if (initialized) { logger.warn("Serialization manager already initialized. Ignoring duplicate registerRmiImplementation(Class, Class) call."); return this; @@ -442,7 +442,7 @@ class Serialization implements CryptoSerializationMa // initialize the kryo pool with at least 1 kryo instance. This ALSO makes sure that all of our class registration is done // correctly and (if not) we are are notified on the initial thread (instead of on the network update thread) - KryoExtra kryo = kryoPool.take(); + KryoExtra kryo = kryoPool.take(); try { // now MERGE all of the registrations (since we can have registrations overwrite newer/specific registrations @@ -502,7 +502,7 @@ class Serialization implements CryptoSerializationMa kryo.setRegistrationRequired(false); try { - kryo.writeCompressed(null, buffer, registrationDetails); + kryo.writeCompressed(buffer, registrationDetails); } catch (Exception e) { logger.error("Unable to write compressed data for registration details"); } @@ -562,7 +562,7 @@ class Serialization implements CryptoSerializationMa try { kryo.setRegistrationRequired(false); @SuppressWarnings("unchecked") - Object[][] classRegistrations = (Object[][]) kryo.readCompressed(null, byteBuf, otherRegistrationData.length); + Object[][] classRegistrations = (Object[][]) kryo.readCompressed(byteBuf, otherRegistrationData.length); int lengthOrg = mergedRegistrations.length; @@ -671,7 +671,7 @@ class Serialization implements CryptoSerializationMa @Override public final void write(final ByteBuf buffer, final Object message) throws IOException { - final KryoExtra kryo = kryoPool.take(); + final KryoExtra kryo = kryoPool.take(); try { if (wireWriteLogger.isTraceEnabled()) { int start = buffer.writerIndex(); @@ -698,7 +698,7 @@ class Serialization implements CryptoSerializationMa @Override public final Object read(final ByteBuf buffer, final int length) throws IOException { - final KryoExtra kryo = kryoPool.take(); + final KryoExtra kryo = kryoPool.take(); try { if (wireReadLogger.isTraceEnabled()) { int start = buffer.readerIndex(); @@ -723,7 +723,7 @@ class Serialization implements CryptoSerializationMa @Override public void writeFullClassAndObject(final Output output, final Object value) throws IOException { - KryoExtra kryo = kryoPool.take(); + KryoExtra kryo = kryoPool.take(); boolean prev = false; try { @@ -748,7 +748,7 @@ class Serialization implements CryptoSerializationMa @Override public Object readFullClassAndObject(final Input input) throws IOException { - KryoExtra kryo = kryoPool.take(); + KryoExtra kryo = kryoPool.take(); boolean prev = false; try { @@ -774,8 +774,8 @@ class Serialization implements CryptoSerializationMa */ @Override public final - void writeWithCrypto(final C connection, final ByteBuf buffer, final Object message) throws IOException { - final KryoExtra kryo = kryoPool.take(); + void writeWithCrypto(final Connection_ connection, final ByteBuf buffer, final Object message) throws IOException { + final KryoExtra kryo = kryoPool.take(); try { // we only need to encrypt when NOT on loopback, since encrypting on loopback is a waste of CPU if (connection.isLoopback()) { @@ -818,8 +818,8 @@ class Serialization implements CryptoSerializationMa @SuppressWarnings("Duplicates") @Override public final - Object readWithCrypto(final C connection, final ByteBuf buffer, final int length) throws IOException { - final KryoExtra kryo = kryoPool.take(); + Object readWithCrypto(final Connection_ connection, final ByteBuf buffer, final int length) throws IOException { + final KryoExtra kryo = kryoPool.take(); try { // we only need to encrypt when NOT on loopback, since encrypting on loopback is a waste of CPU if (connection.isLoopback()) { diff --git a/test/dorkbox/network/rmi/RmiGlobalTest.java b/test/dorkbox/network/rmi/RmiGlobalTest.java index 9a34cdfb..2536d10c 100644 --- a/test/dorkbox/network/rmi/RmiGlobalTest.java +++ b/test/dorkbox/network/rmi/RmiGlobalTest.java @@ -50,6 +50,7 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.ConnectionImpl; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.Listener; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.network.serialization.Serialization; import dorkbox.util.exceptions.SecurityException; @@ -182,7 +183,7 @@ class RmiGlobalTest extends BaseTest { public static - void register(dorkbox.network.serialization.CryptoSerializationManager manager) { + void register(NetworkSerializationManager manager) { manager.register(Object.class); // Needed for Object#toString, hashCode, etc. manager.register(MessageWithTestCow.class); manager.register(UnsupportedOperationException.class); diff --git a/test/dorkbox/network/rmi/RmiObjectIdExhaustionTest.java b/test/dorkbox/network/rmi/RmiObjectIdExhaustionTest.java index ddd5d25a..8a8cb913 100644 --- a/test/dorkbox/network/rmi/RmiObjectIdExhaustionTest.java +++ b/test/dorkbox/network/rmi/RmiObjectIdExhaustionTest.java @@ -25,6 +25,7 @@ import dorkbox.network.Server; import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.Listener; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.network.serialization.Serialization; import dorkbox.util.exceptions.SecurityException; @@ -35,7 +36,7 @@ class RmiObjectIdExhaustionTest extends BaseTest { private AtomicInteger objectCounter = new AtomicInteger(1); public static - void register(dorkbox.network.serialization.CryptoSerializationManager manager) { + void register(NetworkSerializationManager manager) { manager.register(Object.class); // Needed for Object#toString, hashCode, etc. manager.register(MessageWithTestCow.class); manager.register(UnsupportedOperationException.class); diff --git a/test/dorkbox/network/rmi/RmiTest.java b/test/dorkbox/network/rmi/RmiTest.java index 90f37453..2185c213 100644 --- a/test/dorkbox/network/rmi/RmiTest.java +++ b/test/dorkbox/network/rmi/RmiTest.java @@ -50,6 +50,7 @@ import dorkbox.network.connection.Connection; import dorkbox.network.connection.EndPoint; import dorkbox.network.connection.Listener; import dorkbox.network.connection.Listeners; +import dorkbox.network.serialization.NetworkSerializationManager; import dorkbox.network.serialization.Serialization; import dorkbox.util.exceptions.SecurityException; @@ -169,7 +170,7 @@ class RmiTest extends BaseTest { } public static - void register(dorkbox.network.serialization.CryptoSerializationManager manager) { + void register(NetworkSerializationManager manager) { manager.register(Object.class); // Needed for Object#toString, hashCode, etc. manager.register(TestCow.class); manager.register(MessageWithTestCow.class);