Removed dead code

This commit is contained in:
Robinson 2023-07-11 11:50:32 +02:00
parent f1ebd076bf
commit 90830128e6
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
5 changed files with 0 additions and 1236 deletions

View File

@ -1,159 +0,0 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
public
class LargeResizeBufferTest extends BaseTest {
private static final int OBJ_SIZE = 1024 * 100;
private volatile int finalCheckAmount = 0;
private volatile int serverCheck = -1;
private volatile int clientCheck = -1;
@Test
public
void manyLargeMessages() throws SecurityException, IOException {
final int messageCount = 1024;
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
register(configuration.serialization);
Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
server.listeners()
.add(new Listener.OnMessageReceived<Connection, LargeMessage>() {
AtomicInteger received = new AtomicInteger();
AtomicInteger receivedBytes = new AtomicInteger();
@Override
public
void received(Connection connection, LargeMessage object) {
// System.err.println("Server ack message: " + received.get());
connection.send()
.TCP(object);
this.receivedBytes.addAndGet(object.bytes.length);
if (this.received.incrementAndGet() == messageCount) {
System.out.println("Server received all " + messageCount + " messages!");
System.out.println("Server received and sent " + this.receivedBytes.get() + " bytes.");
LargeResizeBufferTest.this.serverCheck = LargeResizeBufferTest.this.finalCheckAmount - this.receivedBytes.get();
System.out.println("Server missed " + LargeResizeBufferTest.this.serverCheck + " bytes.");
stopEndPoints();
}
}
});
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener.OnMessageReceived<Connection, LargeMessage>() {
AtomicInteger received = new AtomicInteger();
AtomicInteger receivedBytes = new AtomicInteger();
@Override
public
void received(Connection connection, LargeMessage object) {
this.receivedBytes.addAndGet(object.bytes.length);
int count = this.received.getAndIncrement();
// System.out.println("Client received message: " + count);
if (count == messageCount) {
System.out.println("Client received all " + messageCount + " messages!");
System.out.println("Client received and sent " + this.receivedBytes.get() + " bytes.");
LargeResizeBufferTest.this.clientCheck = LargeResizeBufferTest.this.finalCheckAmount - this.receivedBytes.get();
System.out.println("Client missed " + LargeResizeBufferTest.this.clientCheck + " bytes.");
}
}
});
client.connect(5000);
SecureRandom random = new SecureRandom();
System.err.println(" Client sending " + messageCount + " messages");
for (int i = 0; i < messageCount; i++) {
this.finalCheckAmount += OBJ_SIZE; // keep increasing size
byte[] b = new byte[OBJ_SIZE];
random.nextBytes(b);
// set some of the bytes to be all `244`, just so some compression can occur (to test that as well)
for (int j = 0; j < 400; j++) {
b[j] = (byte) 244;
}
// System.err.println("Sending " + b.length + " bytes");
client.send()
.TCP(new LargeMessage(b));
}
System.err.println("Client has queued " + messageCount + " messages.");
waitForThreads();
if (this.clientCheck > 0) {
fail("Client missed " + this.clientCheck + " bytes.");
}
if (this.serverCheck > 0) {
fail("Server missed " + this.serverCheck + " bytes.");
}
}
private
void register(SerializationManager manager) {
manager.register(byte[].class);
manager.register(LargeMessage.class);
}
public static
class LargeMessage {
public byte[] bytes;
public
LargeMessage() {
}
public
LargeMessage(byte[] bytes) {
this.bytes = bytes;
}
}
}

View File

@ -1,242 +0,0 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listeners;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
public
class MultipleThreadTest extends BaseTest {
private final Object lock = new Object();
private volatile boolean stillRunning = false;
private final Object finalRunLock = new Object();
private volatile boolean finalStillRunning = false;
private final int messageCount = 150;
private final int threadCount = 15;
private final int clientCount = 13;
private final List<Client> clients = new ArrayList<Client>(this.clientCount);
int perClientReceiveTotal = (this.messageCount * this.threadCount);
int serverReceiveTotal = perClientReceiveTotal * this.clientCount;
AtomicInteger sent = new AtomicInteger(0);
AtomicInteger totalClientReceived = new AtomicInteger(0);
AtomicInteger receivedServer = new AtomicInteger(1);
ConcurrentHashMap<Integer, DataClass> sentStringsToClientDebug = new ConcurrentHashMap<Integer, DataClass>();
@Test
public
void multipleThreads() throws SecurityException, IOException {
// our clients should receive messageCount * threadCount * clientCount TOTAL messages
final int totalClientReceivedCountExpected = this.clientCount * this.messageCount * this.threadCount;
final int totalServerReceivedCountExpected = this.clientCount * this.messageCount;
System.err.println("CLIENT RECEIVES: " + totalClientReceivedCountExpected);
System.err.println("SERVER RECEIVES: " + totalServerReceivedCountExpected);
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;
configuration.serialization.register(String[].class);
configuration.serialization.register(DataClass.class);
final Server server = new Server(configuration);
server.disableRemoteKeyValidation();
addEndPoint(server);
server.bind(false);
final Listeners listeners = server.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
System.err.println("Client connected to server.");
// kickoff however many threads we need, and send data to the client.
for (int i = 1; i <= MultipleThreadTest.this.threadCount; i++) {
final int index = i;
new Thread() {
@Override
public
void run() {
for (int i = 1; i <= MultipleThreadTest.this.messageCount; i++) {
int incrementAndGet = MultipleThreadTest.this.sent.getAndIncrement();
DataClass dataClass = new DataClass("Server -> client. Thread #" + index + " message# " + incrementAndGet,
incrementAndGet);
//System.err.println(dataClass.data);
MultipleThreadTest.this.sentStringsToClientDebug.put(incrementAndGet, dataClass);
connection.send()
.TCP(dataClass)
.flush();
}
}
}.start();
}
}
});
listeners.add(new Listener.OnMessageReceived<Connection, DataClass>() {
@Override
public
void received(Connection connection, DataClass object) {
int incrementAndGet = MultipleThreadTest.this.receivedServer.getAndIncrement();
//System.err.println("server #" + incrementAndGet);
if (incrementAndGet % MultipleThreadTest.this.messageCount == 0) {
System.err.println("Server receive DONE for client " + incrementAndGet);
stillRunning = false;
synchronized (MultipleThreadTest.this.lock) {
MultipleThreadTest.this.lock.notifyAll();
}
}
if (incrementAndGet == totalServerReceivedCountExpected) {
System.err.println("Server DONE: " + incrementAndGet);
finalStillRunning = false;
synchronized (MultipleThreadTest.this.finalRunLock) {
MultipleThreadTest.this.finalRunLock.notifyAll();
}
}
}
});
// ----
finalStillRunning = true;
for (int i = 1; i <= this.clientCount; i++) {
final int index = i;
Client client = new Client(configuration);
this.clients.add(client);
addEndPoint(client);
client.listeners()
.add(new Listener.OnMessageReceived<Connection, DataClass>() {
final int clientIndex = index;
final AtomicInteger received = new AtomicInteger(1);
@Override
public
void received(Connection connection, DataClass object) {
totalClientReceived.getAndIncrement();
int clientLocalCounter = this.received.getAndIncrement();
MultipleThreadTest.this.sentStringsToClientDebug.remove(object.index);
//System.err.println(object.data);
// we finished!!
if (clientLocalCounter == perClientReceiveTotal) {
//System.err.println("Client #" + clientIndex + " received " + clientLocalCounter + " Sending back " +
// MultipleThreadTest.this.messageCount + " messages.");
// now spam back messages!
for (int i = 0; i < MultipleThreadTest.this.messageCount; i++) {
connection.send()
.TCP(new DataClass("Client #" + clientIndex + " -> Server message " + i, index));
}
}
}
});
stillRunning = true;
client.connect(5000);
while (stillRunning) {
synchronized (this.lock) {
try {
this.lock.wait(5 * 1000); // 5 secs
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
while (finalStillRunning) {
synchronized (this.finalRunLock) {
try {
this.finalRunLock.wait(5 * 1000); // 5 secs
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// CLIENT will wait until it's done connecting, but SERVER is async.
// the ONLY way to safely work in the server is with LISTENERS. Everything else can FAIL, because of it's async nature.
if (!this.sentStringsToClientDebug.isEmpty()) {
System.err.println("MISSED DATA: " + this.sentStringsToClientDebug.size());
for (Map.Entry<Integer, DataClass> i : this.sentStringsToClientDebug.entrySet()) {
System.err.println(i.getKey() + " : " + i.getValue().data);
}
}
stopEndPoints();
assertEquals(totalClientReceivedCountExpected, totalClientReceived.get());
// offset by 1 since we start at 1
assertEquals(totalServerReceivedCountExpected, receivedServer.get()-1);
}
public static
class DataClass {
public String data;
public Integer index;
public
DataClass() {
}
public
DataClass(String data, Integer index) {
this.data = data;
this.index = index;
}
}
}

View File

@ -1,326 +0,0 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listeners;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
public
class PingPongLocalTest extends BaseTest {
int tries = 10000;
private volatile String fail;
@Test
public void pingPongLocal() throws SecurityException, IOException {
this.fail = "Data not received.";
final Data dataLOCAL = new Data();
populateData(dataLOCAL);
Configuration configuration = Configuration.localOnly();
register(configuration.serialization);
Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
final Listeners listeners = server.listeners();
listeners.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongLocalTest.this.fail = "Error during processing. " + throwable;
}
});
listeners.add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public
void received(Connection connection, Data data) {
connection.id();
if (!data.equals(dataLOCAL)) {
PingPongLocalTest.this.fail = "data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail);
}
connection.send()
.TCP(data);
}
});
// ----
Client client = new Client(configuration);
addEndPoint(client);
final Listeners listeners1 = client.listeners();
listeners1.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
PingPongLocalTest.this.fail = null;
connection.send()
.TCP(dataLOCAL);
// connection.sendUDP(dataUDP); // TCP and UDP are the same for a local channel.
}
});
listeners1.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongLocalTest.this.fail = "Error during processing. " + throwable;
System.err.println(PingPongLocalTest.this.fail);
}
});
listeners1.add(new Listener.OnMessageReceived<Connection, Data>() {
AtomicInteger check = new AtomicInteger(0);
@Override
public
void received(Connection connection, Data data) {
if (!data.equals(dataLOCAL)) {
PingPongLocalTest.this.fail = "data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail);
}
if (this.check.getAndIncrement() <= PingPongLocalTest.this.tries) {
connection.send()
.TCP(data);
}
else {
System.err.println("Ran LOCAL " + PingPongLocalTest.this.tries + " times");
stopEndPoints();
}
}
});
client.connect(5000);
waitForThreads();
if (this.fail != null) {
fail(this.fail);
}
}
private void populateData(Data data) {
StringBuilder buffer = new StringBuilder();
for (int i = 0; i < 3000; i++) {
buffer.append('a');
}
data.string = buffer.toString();
data.strings = new String[] {"abcdefghijklmnopqrstuvwxyz0123456789","",null,"!@#$","<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>"};
data.ints = new int[] {-1234567,1234567,-1,0,1,Integer.MAX_VALUE,Integer.MIN_VALUE};
data.shorts = new short[] {-12345,12345,-1,0,1,Short.MAX_VALUE,Short.MIN_VALUE};
data.floats = new float[] {0,-0,1,-1,123456,-123456,0.1f,0.2f,-0.3f,(float) Math.PI,Float.MAX_VALUE,
Float.MIN_VALUE};
data.doubles = new double[] {0,-0,1,-1,123456,-123456,0.1d,0.2d,-0.3d,Math.PI,Double.MAX_VALUE,Double.MIN_VALUE};
data.longs = new long[] {0,-0,1,-1,123456,-123456,99999999999l,-99999999999l,Long.MAX_VALUE,Long.MIN_VALUE};
data.bytes = new byte[] {-123,123,-1,0,1,Byte.MAX_VALUE,Byte.MIN_VALUE};
data.chars = new char[] {32345,12345,0,1,63,Character.MAX_VALUE,Character.MIN_VALUE};
data.booleans = new boolean[] {true,false};
data.Ints = new Integer[] {-1234567,1234567,-1,0,1,Integer.MAX_VALUE,Integer.MIN_VALUE};
data.Shorts = new Short[] {-12345,12345,-1,0,1,Short.MAX_VALUE,Short.MIN_VALUE};
data.Floats = new Float[] {0f,-0f,1f,-1f,123456f,-123456f,0.1f,0.2f,-0.3f,(float) Math.PI,Float.MAX_VALUE,
Float.MIN_VALUE};
data.Doubles = new Double[] {0d,-0d,1d,-1d,123456d,-123456d,0.1d,0.2d,-0.3d,Math.PI,Double.MAX_VALUE,
Double.MIN_VALUE};
data.Longs = new Long[] {0l,-0l,1l,-1l,123456l,-123456l,99999999999l,-99999999999l,Long.MAX_VALUE,
Long.MIN_VALUE};
data.Bytes = new Byte[] {-123,123,-1,0,1,Byte.MAX_VALUE,Byte.MIN_VALUE};
data.Chars = new Character[] {32345,12345,0,1,63,Character.MAX_VALUE,Character.MIN_VALUE};
data.Booleans = new Boolean[] {true,false};
}
private void register(SerializationManager manager) {
manager.register(int[].class);
manager.register(short[].class);
manager.register(float[].class);
manager.register(double[].class);
manager.register(long[].class);
manager.register(byte[].class);
manager.register(char[].class);
manager.register(boolean[].class);
manager.register(String[].class);
manager.register(Integer[].class);
manager.register(Short[].class);
manager.register(Float[].class);
manager.register(Double[].class);
manager.register(Long[].class);
manager.register(Byte[].class);
manager.register(Character[].class);
manager.register(Boolean[].class);
manager.register(Data.class);
}
static public class Data {
public String string;
public String[] strings;
public int[] ints;
public short[] shorts;
public float[] floats;
public double[] doubles;
public long[] longs;
public byte[] bytes;
public char[] chars;
public boolean[] booleans;
public Integer[] Ints;
public Short[] Shorts;
public Float[] Floats;
public Double[] Doubles;
public Long[] Longs;
public Byte[] Bytes;
public Character[] Chars;
public Boolean[] Booleans;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(this.Booleans);
result = prime * result + Arrays.hashCode(this.Bytes);
result = prime * result + Arrays.hashCode(this.Chars);
result = prime * result + Arrays.hashCode(this.Doubles);
result = prime * result + Arrays.hashCode(this.Floats);
result = prime * result + Arrays.hashCode(this.Ints);
result = prime * result + Arrays.hashCode(this.Longs);
result = prime * result + Arrays.hashCode(this.Shorts);
result = prime * result + Arrays.hashCode(this.booleans);
result = prime * result + Arrays.hashCode(this.bytes);
result = prime * result + Arrays.hashCode(this.chars);
result = prime * result + Arrays.hashCode(this.doubles);
result = prime * result + Arrays.hashCode(this.floats);
result = prime * result + Arrays.hashCode(this.ints);
result = prime * result + Arrays.hashCode(this.longs);
result = prime * result + Arrays.hashCode(this.shorts);
result = prime * result + (this.string == null ? 0 : this.string.hashCode());
result = prime * result + Arrays.hashCode(this.strings);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Data other = (Data) obj;
if (!Arrays.equals(this.Booleans, other.Booleans)) {
return false;
}
if (!Arrays.equals(this.Bytes, other.Bytes)) {
return false;
}
if (!Arrays.equals(this.Chars, other.Chars)) {
return false;
}
if (!Arrays.equals(this.Doubles, other.Doubles)) {
return false;
}
if (!Arrays.equals(this.Floats, other.Floats)) {
return false;
}
if (!Arrays.equals(this.Ints, other.Ints)) {
return false;
}
if (!Arrays.equals(this.Longs, other.Longs)) {
return false;
}
if (!Arrays.equals(this.Shorts, other.Shorts)) {
return false;
}
if (!Arrays.equals(this.booleans, other.booleans)) {
return false;
}
if (!Arrays.equals(this.bytes, other.bytes)) {
return false;
}
if (!Arrays.equals(this.chars, other.chars)) {
return false;
}
if (!Arrays.equals(this.doubles, other.doubles)) {
return false;
}
if (!Arrays.equals(this.floats, other.floats)) {
return false;
}
if (!Arrays.equals(this.ints, other.ints)) {
return false;
}
if (!Arrays.equals(this.longs, other.longs)) {
return false;
}
if (!Arrays.equals(this.shorts, other.shorts)) {
return false;
}
if (this.string == null) {
if (other.string != null) {
return false;
}
} else if (!this.string.equals(other.string)) {
return false;
}
if (!Arrays.equals(this.strings, other.strings)) {
return false;
}
return true;
}
@Override
public String toString() {
return "Data";
}
}
}

View File

@ -1,213 +0,0 @@
/*
* Copyright 2014 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.network.other
import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.minlog.Log
import dorkbox.network.serialization.ClassRegistration
import dorkbox.network.serialization.ClassRegistration0
import dorkbox.network.serialization.ClassRegistration1
import dorkbox.network.serialization.ClassRegistration2
import dorkbox.network.serialization.ClassRegistration3
import dorkbox.network.serialization.KryoExtra
import dorkbox.util.serialization.SerializationDefaults
import kotlinx.atomicfu.atomic
class PooledSerialization {
companion object {
init {
Log.set(Log.LEVEL_ERROR)
}
}
private var initialized = atomic(false)
private val classesToRegister = mutableListOf<ClassRegistration>()
private var kryoPoolSize = 16
private val kryoInUse = atomic(0)
@Volatile
private var kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize)
/**
* If you customize anything, you will want to register custom types before init() is called!
*/
fun init() {
// NOTE: there are problems if our serializer is THE SAME serializer used by the network stack!
// We are explicitly differet types to prevent that form happening
initialized.value = true
}
private fun initKryo(): KryoExtra {
val kryo = KryoExtra()
SerializationDefaults.register(kryo)
classesToRegister.forEach { registration ->
registration.register(kryo)
}
return kryo
}
/**
* Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer].
* If the class is already registered, the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method.
*
* The order must be the same at deserialization as it was for serialization.
*
* This must happen before the creation of the client/server
*/
fun <T> register(clazz: Class<T>): PooledSerialization {
require(!initialized.value) { "Serialization 'register(class)' cannot happen after initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." }
classesToRegister.add(ClassRegistration3(clazz))
return this
}
/**
* Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID
* is already in use by a different type, an exception is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
* IDs must be the same at deserialization as they were for serialization.
*
* This must happen before the creation of the client/server
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
fun <T> register(clazz: Class<T>, id: Int): PooledSerialization {
require(!initialized.value) { "Serialization 'register(Class, int)' cannot happen after initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." }
classesToRegister.add(ClassRegistration1(clazz, id))
return this
}
/**
* Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered,
* the existing entry is updated with the new serializer.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
* method. The order must be the same at deserialization as it was for serialization.
*/
@Synchronized
fun <T> register(clazz: Class<T>, serializer: Serializer<T>): PooledSerialization {
require(!initialized.value) { "Serialization 'register(Class, Serializer)' cannot happen after initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register '${clazz.name}' with a serializer. It must be an implementation." }
classesToRegister.add(ClassRegistration0(clazz, serializer))
return this
}
/**
* Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is
* overwritten. If the ID is already in use by a different type, an exception is thrown.
*
*
* Registering a primitive also affects the corresponding primitive wrapper.
*
*
* IDs must be the same at deserialization as they were for serialization.
*
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
* these IDs can be repurposed.
*/
@Synchronized
fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): PooledSerialization {
require(!initialized.value) { "Serialization 'register(Class, Serializer, int)' cannot happen after initialization!" }
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
// with object types... EVEN IF THERE IS A SERIALIZER
require(!clazz.isInterface) { "Cannot register '${clazz.name}'. It must be an implementation." }
classesToRegister.add(ClassRegistration2(clazz, serializer, id))
return this
}
/**
* @return takes a kryo instance from the pool, or creates one if the pool was empty
*/
fun takeKryo(): KryoExtra {
kryoInUse.getAndIncrement()
// ALWAYS get as many as needed. We recycle them (with an auto-growing pool) to prevent too many getting created
return kryoPool.poll() ?: initKryo()
}
/**
* Returns a kryo instance to the pool for re-use later on
*/
fun returnKryo(kryo: KryoExtra) {
val kryoCount = kryoInUse.getAndDecrement()
if (kryoCount > kryoPoolSize) {
// this is CLEARLY a problem, as we have more kryos in use that our pool can support.
// This happens when we send messages REALLY fast.
//
// We fix this by increasing the size of the pool, so kryos aren't thrown away (and create a GC hit)
synchronized(kryoInUse) {
// we have a double check here on purpose. only 1 will work
if (kryoCount > kryoPoolSize) {
val oldPool = kryoPool
val oldSize = kryoPoolSize
val newSize = kryoPoolSize * 2
kryoPoolSize = newSize
kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize)
// take all of the old kryos and put them in the new one
val array = arrayOfNulls<KryoExtra>(oldSize)
val count = oldPool.remove(array)
for (i in 0 until count) {
kryoPool.offer(array[i])
}
}
}
}
kryoPool.offer(kryo)
}
}

View File

@ -1,296 +0,0 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listeners;
import dorkbox.util.exceptions.SecurityException;
// NOTE: UDP is unreliable, EVEN ON LOOPBACK! So this can fail with UDP. TCP will never fail.
public
class ReconnectTest extends BaseTest {
private final AtomicInteger receivedCount = new AtomicInteger(0);
private static final Logger logger = LoggerFactory.getLogger(ReconnectTest.class.getSimpleName());
@Test
public
void socketReuseUDP() throws IOException, SecurityException {
socketReuse(false, true);
}
@Test
public
void socketReuseTCP() throws IOException, SecurityException {
socketReuse(true, false);
}
@Test
public
void socketReuseTCPUDP() throws IOException, SecurityException {
socketReuse(true, true);
}
private
void socketReuse(final boolean useTCP, final boolean useUDP) throws SecurityException, IOException {
receivedCount.set(0);
Configuration configuration = new Configuration();
configuration.host = host;
if (useTCP) {
configuration.tcpPort = tcpPort;
}
if (useUDP) {
configuration.udpPort = udpPort;
}
AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>();
Server server = new Server(configuration);
addEndPoint(server);
final Listeners listeners = server.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
if (useTCP) {
connection.send()
.TCP("-- TCP from server");
}
if (useUDP) {
connection.send()
.UDP("-- UDP from server");
}
}
});
listeners.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
logger.error("----- <S " + connection + "> " + incrementAndGet + " : " + object);
latch.get().countDown();
}
});
server.bind(false);
// ----
Client client = new Client(configuration);
addEndPoint(client);
final Listeners listeners1 = client.listeners();
listeners1.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
if (useTCP) {
connection.send()
.TCP("-- TCP from client");
}
if (useUDP) {
connection.send()
.UDP("-- UDP from client");
}
}
});
listeners1.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
logger.error("----- <C " + connection + "> " + incrementAndGet + " : " + object);
latch.get().countDown();
}
});
int latchCount = 2;
int count = 100;
int initialCount = 2;
if (useTCP && useUDP) {
initialCount += 2;
latchCount += 2;
}
try {
for (int i = 1; i < count + 1; i++) {
logger.error(".....");
latch.set(new CountDownLatch(latchCount));
try {
client.connect(5000);
} catch (IOException e) {
e.printStackTrace();
}
int retryCount = 20;
int lastRetryCount;
int target = i * initialCount;
boolean failed = false;
synchronized (receivedCount) {
while (this.receivedCount.get() != target) {
lastRetryCount = this.receivedCount.get();
try {
latch.get().await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// check to see if we changed at all...
if (lastRetryCount == this.receivedCount.get()) {
if (retryCount-- < 0) {
logger.error("Aborting unit test... wrong count!");
if (useUDP) {
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
// it results in severe UDP packet loss and contention.
//
// http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM
// also, a google search on just "INET97/proceedings/F3/F3_1.HTM" turns up interesting problems.
// Usually it's with ISPs.
logger.error("NOTE: UDP can fail, even on loopback! See: http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM");
}
failed = true;
break;
}
} else {
retryCount = 20;
}
}
}
client.close();
logger.error(".....");
if (failed) {
break;
}
}
int specified = count * initialCount;
int received = this.receivedCount.get();
if (specified != received) {
logger.error("NOTE: UDP can fail, even on loopback! See: http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM");
}
assertEquals(specified, received);
} finally {
stopEndPoints();
waitForThreads(10);
}
}
@Test
public
void localReuse() throws SecurityException, IOException {
receivedCount.set(0);
Server server = new Server();
addEndPoint(server);
server.listeners()
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.self("-- LOCAL from server");
}
});
server.listeners()
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
System.out.println("----- <S " + connection + "> " + incrementAndGet + " : " + object);
}
});
// ----
Client client = new Client();
addEndPoint(client);
client.listeners()
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.self("-- LOCAL from client");
}
});
client.listeners()
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
System.out.println("----- <C " + connection + "> " + incrementAndGet + " : " + object);
}
});
server.bind(false);
int count = 10;
for (int i = 1; i < count + 1; i++) {
client.connect(5000);
int target = i * 2;
while (this.receivedCount.get() != target) {
System.out.println("----- Waiting...");
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
client.close();
}
assertEquals(count * 2, this.receivedCount.get());
stopEndPoints();
waitForThreads(10);
}
}