Removed coroutines from parts of the networking message processing.

old_release Version_5.28
Robinson 2022-07-27 00:20:34 +02:00
parent 5c59a142ff
commit 3ef3d28d24
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
37 changed files with 3108 additions and 405 deletions

482
LICENSE
View File

@ -351,162 +351,6 @@
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- NetworkUtils - Utilities for managing network configurations, IP/MAC address conversion, and ping (via OS native commands)
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/NetworkUtils
Copyright 2022
Dorkbox LLC
Extra license information
- Netty -
[The Apache Software License, Version 2.0]
https://netty.io/
Copyright 2014
The Netty Project
This product contains a modified portion of Netty Network Utils
- Apache Harmony -
[The Apache Software License, Version 2.0]
http://archive.apache.org/dist/harmony/
Copyright 2010
The Apache Software Foundation
This product contains a modified portion of 'Apache Harmony', an open source Java SE
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2022
QOS.ch
- JNA - Simplified native library access for Java.
[The Apache Software License, Version 2.0]
https://github.com/twall/jna
Copyright 2022
Timothy Wall
- JNA-Platform - Mappings for a number of commonly used platform functions
[The Apache Software License, Version 2.0]
https://github.com/twall/jna
Copyright 2022
Timothy Wall
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- Executor - Shell, JVM, and SSH command execution on Linux, MacOS, or Windows for Java 8+
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Executor
Copyright 2022
Dorkbox LLC
Extra license information
- ZT Process Executor -
[The Apache Software License, Version 2.0]
https://github.com/zeroturnaround/zt-exec
Copyright 2014
ZeroTurnaround LLC
- Apache Commons Exec -
[The Apache Software License, Version 2.0]
https://commons.apache.org/proper/commons-exec/
Copyright 2014
The Apache Software Foundation
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- kotlinx.coroutines - Library support for Kotlin coroutines with multiplatform support
[The Apache Software License, Version 2.0]
https://github.com/Kotlin/kotlinx.coroutines
Copyright 2022
JetBrains s.r.o.
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2022
QOS.ch
- Logback - Logback is a logging framework for Java applications
[The Apache Software License, Version 2.0]
http://logback.qos.ch
Copyright 2022
QOS.ch
- SSHJ - SSHv2 library for Java
[The Apache Software License, Version 2.0]
https://github.com/hierynomus/sshj
Copyright 2022
Jeroen van Erp
SSHJ Contributors
Extra license information
- Apache MINA -
[The Apache Software License, Version 2.0]
https://mina.apache.org/sshd-project/
The Apache Software Foundation
- Apache Commons-Net -
[The Apache Software License, Version 2.0]
https://commons.apache.org/proper/commons-net/
The Apache Software Foundation
- JZlib -
[The Apache Software License, Version 2.0]
http://www.jcraft.com/jzlib
Atsuhiko Yamanaka
JCraft, Inc.
- Bouncy Castle Crypto -
[The Apache Software License, Version 2.0]
http://www.bouncycastle.org
The Legion of the Bouncy Castle Inc
- ed25519-java -
[Public Domain, per Creative Commons CC0]
https://github.com/str4d/ed25519-java
https://github.com/str4d
- Updates - Software Update Management
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Updates
Copyright 2021
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- Updates - Software Update Management
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Updates
Copyright 2021
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- OS - Information about the system, Java runtime, OS, Window Manager, and Desktop Environment.
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/OS
@ -887,6 +731,169 @@
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- NetworkUtils - Utilities for managing network configurations, IP/MAC address conversion, and ping (via OS native commands)
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/NetworkUtils
Copyright 2022
Dorkbox LLC
Extra license information
- Netty -
[The Apache Software License, Version 2.0]
https://netty.io/
Copyright 2014
The Netty Project
This product contains a modified portion of Netty Network Utils
- Apache Harmony -
[The Apache Software License, Version 2.0]
http://archive.apache.org/dist/harmony/
Copyright 2010
The Apache Software Foundation
This product contains a modified portion of 'Apache Harmony', an open source Java SE
- Apache HTTP Utils -
[The Apache Software License, Version 2.0]
http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/psl/
Copyright 2010
The Apache Software Foundation
This product contains a modified portion of 'PublicSuffixDomainFilter.java'
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2022
QOS.ch
- JNA - Simplified native library access for Java.
[The Apache Software License, Version 2.0]
https://github.com/twall/jna
Copyright 2022
Timothy Wall
- JNA-Platform - Mappings for a number of commonly used platform functions
[The Apache Software License, Version 2.0]
https://github.com/twall/jna
Copyright 2022
Timothy Wall
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- Executor - Shell, JVM, and SSH command execution on Linux, MacOS, or Windows for Java 8+
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Executor
Copyright 2022
Dorkbox LLC
Extra license information
- ZT Process Executor -
[The Apache Software License, Version 2.0]
https://github.com/zeroturnaround/zt-exec
Copyright 2014
ZeroTurnaround LLC
- Apache Commons Exec -
[The Apache Software License, Version 2.0]
https://commons.apache.org/proper/commons-exec/
Copyright 2014
The Apache Software Foundation
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- kotlinx.coroutines - Library support for Kotlin coroutines with multiplatform support
[The Apache Software License, Version 2.0]
https://github.com/Kotlin/kotlinx.coroutines
Copyright 2022
JetBrains s.r.o.
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2022
QOS.ch
- Logback - Logback is a logging framework for Java applications
[The Apache Software License, Version 2.0]
http://logback.qos.ch
Copyright 2022
QOS.ch
- SSHJ - SSHv2 library for Java
[The Apache Software License, Version 2.0]
https://github.com/hierynomus/sshj
Copyright 2022
Jeroen van Erp
SSHJ Contributors
Extra license information
- Apache MINA -
[The Apache Software License, Version 2.0]
https://mina.apache.org/sshd-project/
The Apache Software Foundation
- Apache Commons-Net -
[The Apache Software License, Version 2.0]
https://commons.apache.org/proper/commons-net/
The Apache Software Foundation
- JZlib -
[The Apache Software License, Version 2.0]
http://www.jcraft.com/jzlib
Atsuhiko Yamanaka
JCraft, Inc.
- Bouncy Castle Crypto -
[The Apache Software License, Version 2.0]
http://www.bouncycastle.org
The Legion of the Bouncy Castle Inc
- ed25519-java -
[Public Domain, per Creative Commons CC0]
https://github.com/str4d/ed25519-java
https://github.com/str4d
- Updates - Software Update Management
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Updates
Copyright 2021
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- Updates - Software Update Management
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Updates
Copyright 2021
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- OS - Information about the system, Java runtime, OS, Window Manager, and Desktop Environment.
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/OS
@ -947,6 +954,169 @@
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- NetworkUtils - Utilities for managing network configurations, IP/MAC address conversion, and ping (via OS native commands)
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/NetworkUtils
Copyright 2022
Dorkbox LLC
Extra license information
- Netty -
[The Apache Software License, Version 2.0]
https://netty.io/
Copyright 2014
The Netty Project
This product contains a modified portion of Netty Network Utils
- Apache Harmony -
[The Apache Software License, Version 2.0]
http://archive.apache.org/dist/harmony/
Copyright 2010
The Apache Software Foundation
This product contains a modified portion of 'Apache Harmony', an open source Java SE
- Apache HTTP Utils -
[The Apache Software License, Version 2.0]
http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/httpclient5/src/main/java/org/apache/hc/client5/http/psl/
Copyright 2010
The Apache Software Foundation
This product contains a modified portion of 'PublicSuffixDomainFilter.java'
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2022
QOS.ch
- JNA - Simplified native library access for Java.
[The Apache Software License, Version 2.0]
https://github.com/twall/jna
Copyright 2022
Timothy Wall
- JNA-Platform - Mappings for a number of commonly used platform functions
[The Apache Software License, Version 2.0]
https://github.com/twall/jna
Copyright 2022
Timothy Wall
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- Executor - Shell, JVM, and SSH command execution on Linux, MacOS, or Windows for Java 8+
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Executor
Copyright 2022
Dorkbox LLC
Extra license information
- ZT Process Executor -
[The Apache Software License, Version 2.0]
https://github.com/zeroturnaround/zt-exec
Copyright 2014
ZeroTurnaround LLC
- Apache Commons Exec -
[The Apache Software License, Version 2.0]
https://commons.apache.org/proper/commons-exec/
Copyright 2014
The Apache Software Foundation
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- kotlinx.coroutines - Library support for Kotlin coroutines with multiplatform support
[The Apache Software License, Version 2.0]
https://github.com/Kotlin/kotlinx.coroutines
Copyright 2022
JetBrains s.r.o.
- SLF4J - Simple facade or abstraction for various logging frameworks
[MIT License]
http://www.slf4j.org
Copyright 2022
QOS.ch
- Logback - Logback is a logging framework for Java applications
[The Apache Software License, Version 2.0]
http://logback.qos.ch
Copyright 2022
QOS.ch
- SSHJ - SSHv2 library for Java
[The Apache Software License, Version 2.0]
https://github.com/hierynomus/sshj
Copyright 2022
Jeroen van Erp
SSHJ Contributors
Extra license information
- Apache MINA -
[The Apache Software License, Version 2.0]
https://mina.apache.org/sshd-project/
The Apache Software Foundation
- Apache Commons-Net -
[The Apache Software License, Version 2.0]
https://commons.apache.org/proper/commons-net/
The Apache Software Foundation
- JZlib -
[The Apache Software License, Version 2.0]
http://www.jcraft.com/jzlib
Atsuhiko Yamanaka
JCraft, Inc.
- Bouncy Castle Crypto -
[The Apache Software License, Version 2.0]
http://www.bouncycastle.org
The Legion of the Bouncy Castle Inc
- ed25519-java -
[Public Domain, per Creative Commons CC0]
https://github.com/str4d/ed25519-java
https://github.com/str4d
- Updates - Software Update Management
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Updates
Copyright 2021
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- Updates - Software Update Management
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/Updates
Copyright 2021
Dorkbox LLC
Extra license information
- Kotlin -
[The Apache Software License, Version 2.0]
https://github.com/JetBrains/kotlin
Copyright 2020
JetBrains s.r.o. and Kotlin Programming Language contributors
Kotlin Compiler, Test Data+Libraries, and Tools repository contain third-party code, to which different licenses may apply
See: https://github.com/JetBrains/kotlin/blob/master/license/README.md
- NetworkUtils - Utilities for managing network configurations, IP/MAC address conversion, and ping (via OS native commands)
[The Apache Software License, Version 2.0]
https://git.dorkbox.com/dorkbox/NetworkUtils

View File

@ -95,7 +95,7 @@ Maven Info
<dependency>
<groupId>com.dorkbox</groupId>
<artifactId>Network</artifactId>
<version>5.27</version>
<version>5.28</version>
</dependency>
</dependencies>
```
@ -105,7 +105,7 @@ Gradle Info
```
dependencies {
...
implementation("com.dorkbox:Network:5.27")
implementation("com.dorkbox:Network:5.28")
}
```

View File

@ -37,7 +37,7 @@ object Extras {
// set for the project
const val description = "High-performance, event-driven/reactive network stack for Java 11+"
const val group = "com.dorkbox"
const val version = "5.27"
const val version = "5.28"
// set as project.ext
const val name = "Network"
@ -147,14 +147,14 @@ dependencies {
api("com.dorkbox:ByteUtilities:1.5")
api("com.dorkbox:Collections:1.1")
api("com.dorkbox:MinLog:2.4")
api("com.dorkbox:NetworkDNS:2.7")
api("com.dorkbox:NetworkDNS:2.7.1")
api("com.dorkbox:NetworkUtils:2.18")
api("com.dorkbox:ObjectPool:4.0")
api("com.dorkbox:OS:1.0")
api("com.dorkbox:Serializers:2.7")
api("com.dorkbox:Storage:1.1")
api("com.dorkbox:Updates:1.1")
api("com.dorkbox:Utilities:1.28")
api("com.dorkbox:Utilities:1.29")
// we include ALL of aeron, in case we need to debug aeron behavior

View File

@ -0,0 +1,342 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import org.junit.Test;
import dorkbox.network.PingPongTest.TYPE;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.idle.IdleBridge;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
@SuppressWarnings("Duplicates")
public class ChunkedDataIdleTest extends BaseTest {
private volatile boolean success = false;
enum ConnectionType {
TCP,
UDP
}
// have to test sending objects
@Test
public void SendTcp() throws SecurityException, IOException {
final Data mainData = new Data();
populateData(mainData);
System.err.println("-- TCP");
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;
register(configuration.serialization);
sendObject(mainData, configuration, ConnectionType.TCP);
}
// have to test sending objects
@Test
public
void SendUdp() throws SecurityException, IOException {
final Data mainData = new Data();
populateData(mainData);
System.err.println("-- UDP");
Configuration configuration = new Configuration();
// configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
register(configuration.serialization);
sendObject(mainData, configuration, ConnectionType.UDP);
}
// have to test sending objects
@Test
public void SendTcpUdp_Udp() throws SecurityException, IOException {
final Data mainData = new Data();
populateData(mainData);
System.err.println("-- UDP (with TCP connection alive)");
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
register(configuration.serialization);
sendObject(mainData, configuration, ConnectionType.UDP);
}
// have to test sending objects
@Test
public void SendTcpUdp_Tcp() throws SecurityException, IOException {
final Data mainData = new Data();
populateData(mainData);
System.err.println("-- TCP/UDP");
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
register(configuration.serialization);
sendObject(mainData, configuration, ConnectionType.TCP);
}
private void sendObject(final Data mainData, Configuration configuration, final ConnectionType type)
throws SecurityException, IOException {
success = false;
Server server = new Server(configuration);
addEndPoint(server);
server.setIdleTimeout(10);
server.bind(false);
server.listeners().add(new Listener.OnConnected<Connection>() {
@Override
public void connected (Connection connection) {
Data data = new Data();
populateData(data);
IdleBridge sendOnIdle = connection.sendOnIdle(data);
switch (type) {
case TCP: sendOnIdle.TCP(); break;
case UDP: sendOnIdle.UDP(); break;
}
}
});
// ----
Client client = new Client(configuration);
client.setIdleTimeout(10);
addEndPoint(client);
client.listeners().add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public void received(Connection connection, Data object) {
if (mainData.equals(object)) {
ChunkedDataIdleTest.this.success = true;
}
System.err.println("finished!");
stopEndPoints();
}
});
client.connect(5000);
waitForThreads();
if (!this.success) {
fail();
}
}
private void populateData(Data data) {
StringBuilder buffer = new StringBuilder();
for (int i = 0; i < 3000; i++) {
buffer.append('a');
}
data.string = buffer.toString();
data.strings = new String[] {"abcdefghijklmnopqrstuvwxyz0123456789", "", null, "!@#$", "<22><><EFBFBD><EFBFBD><EFBFBD>"};
data.ints = new int[] {-1234567, 1234567, -1, 0, 1, Integer.MAX_VALUE, Integer.MIN_VALUE};
data.shorts = new short[] {-12345, 12345, -1, 0, 1, Short.MAX_VALUE, Short.MIN_VALUE};
data.floats = new float[] {0, -0, 1, -1, 123456, -123456, 0.1f, 0.2f, -0.3f, (float)Math.PI, Float.MAX_VALUE, Float.MIN_VALUE};
data.doubles = new double[] {0, -0, 1, -1, 123456, -123456, 0.1d, 0.2d, -0.3d, Math.PI, Double.MAX_VALUE, Double.MIN_VALUE};
data.longs = new long[] {0, -0, 1, -1, 123456, -123456, 99999999999L, -99999999999L, Long.MAX_VALUE, Long.MIN_VALUE};
data.bytes = new byte[] {-123, 123, -1, 0, 1, Byte.MAX_VALUE, Byte.MIN_VALUE};
data.chars = new char[] {32345, 12345, 0, 1, 63, Character.MAX_VALUE, Character.MIN_VALUE};
data.booleans = new boolean[] {true, false};
data.Ints = new Integer[] {-1234567, 1234567, -1, 0, 1, Integer.MAX_VALUE, Integer.MIN_VALUE};
data.Shorts = new Short[] {-12345, 12345, -1, 0, 1, Short.MAX_VALUE, Short.MIN_VALUE};
data.Floats = new Float[] {0f, -0f, 1f, -1f, 123456f, -123456f, 0.1f, 0.2f, -0.3f, (float)Math.PI, Float.MAX_VALUE, Float.MIN_VALUE};
data.Doubles = new Double[] {0d, -0d, 1d, -1d, 123456d, -123456d, 0.1d, 0.2d, -0.3d, Math.PI, Double.MAX_VALUE, Double.MIN_VALUE};
data.Longs = new Long[] {0L, -0L, 1L, -1L, 123456L, -123456L, 99999999999L, -99999999999L, Long.MAX_VALUE, Long.MIN_VALUE};
data.Bytes = new Byte[] {-123, 123, -1, 0, 1, Byte.MAX_VALUE, Byte.MIN_VALUE};
data.Chars = new Character[] {32345, 12345, 0, 1, 63, Character.MAX_VALUE, Character.MIN_VALUE};
data.Booleans = new Boolean[] {true, false};
}
private void register (SerializationManager manager) {
manager.register(int[].class);
manager.register(short[].class);
manager.register(float[].class);
manager.register(double[].class);
manager.register(long[].class);
manager.register(byte[].class);
manager.register(char[].class);
manager.register(boolean[].class);
manager.register(String[].class);
manager.register(Integer[].class);
manager.register(Short[].class);
manager.register(Float[].class);
manager.register(Double[].class);
manager.register(Long[].class);
manager.register(Byte[].class);
manager.register(Character[].class);
manager.register(Boolean[].class);
manager.register(Data.class);
manager.register(TYPE.class);
}
@SuppressWarnings("WeakerAccess")
static public class Data {
public String string;
public String[] strings;
public int[] ints;
public short[] shorts;
public float[] floats;
public double[] doubles;
public long[] longs;
public byte[] bytes;
public char[] chars;
public boolean[] booleans;
public Integer[] Ints;
public Short[] Shorts;
public Float[] Floats;
public Double[] Doubles;
public Long[] Longs;
public Byte[] Bytes;
public Character[] Chars;
public Boolean[] Booleans;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(this.Booleans);
result = prime * result + Arrays.hashCode(this.Bytes);
result = prime * result + Arrays.hashCode(this.Chars);
result = prime * result + Arrays.hashCode(this.Doubles);
result = prime * result + Arrays.hashCode(this.Floats);
result = prime * result + Arrays.hashCode(this.Ints);
result = prime * result + Arrays.hashCode(this.Longs);
result = prime * result + Arrays.hashCode(this.Shorts);
result = prime * result + Arrays.hashCode(this.booleans);
result = prime * result + Arrays.hashCode(this.bytes);
result = prime * result + Arrays.hashCode(this.chars);
result = prime * result + Arrays.hashCode(this.doubles);
result = prime * result + Arrays.hashCode(this.floats);
result = prime * result + Arrays.hashCode(this.ints);
result = prime * result + Arrays.hashCode(this.longs);
result = prime * result + Arrays.hashCode(this.shorts);
result = prime * result + (this.string == null ? 0 : this.string.hashCode());
result = prime * result + Arrays.hashCode(this.strings);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Data other = (Data) obj;
if (!Arrays.equals(this.Booleans, other.Booleans)) {
return false;
}
if (!Arrays.equals(this.Bytes, other.Bytes)) {
return false;
}
if (!Arrays.equals(this.Chars, other.Chars)) {
return false;
}
if (!Arrays.equals(this.Doubles, other.Doubles)) {
return false;
}
if (!Arrays.equals(this.Floats, other.Floats)) {
return false;
}
if (!Arrays.equals(this.Ints, other.Ints)) {
return false;
}
if (!Arrays.equals(this.Longs, other.Longs)) {
return false;
}
if (!Arrays.equals(this.Shorts, other.Shorts)) {
return false;
}
if (!Arrays.equals(this.booleans, other.booleans)) {
return false;
}
if (!Arrays.equals(this.bytes, other.bytes)) {
return false;
}
if (!Arrays.equals(this.chars, other.chars)) {
return false;
}
if (!Arrays.equals(this.doubles, other.doubles)) {
return false;
}
if (!Arrays.equals(this.floats, other.floats)) {
return false;
}
if (!Arrays.equals(this.ints, other.ints)) {
return false;
}
if (!Arrays.equals(this.longs, other.longs)) {
return false;
}
if (!Arrays.equals(this.shorts, other.shorts)) {
return false;
}
if (this.string == null) {
if (other.string != null) {
return false;
}
} else if (!this.string.equals(other.string)) {
return false;
}
if (!Arrays.equals(this.strings, other.strings)) {
return false;
}
return true;
}
@Override
public String toString () {
return "Data";
}
}
}

97
not-fixed/ClientSendTest.java Executable file
View File

@ -0,0 +1,97 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
public
class ClientSendTest extends BaseTest {
private AtomicBoolean checkPassed = new AtomicBoolean(false);
@Test
public
void sendDataFromClientClass() throws SecurityException, IOException {
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;
register(configuration.serialization);
Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
server.listeners()
.add(new Listener.OnMessageReceived<Connection, AMessage>() {
@Override
public
void received(Connection connection, AMessage object) {
System.err.println("Server received message from client. Bouncing back.");
connection.send()
.TCP(object);
}
});
Client client = new Client(configuration);
addEndPoint(client);
client.connect(5000);
client.listeners()
.add(new Listener.OnMessageReceived<Connection, AMessage>() {
@Override
public
void received(Connection connection, AMessage object) {
ClientSendTest.this.checkPassed.set(true);
stopEndPoints();
}
});
client.send()
.TCP(new AMessage());
waitForThreads();
if (!this.checkPassed.get()) {
fail("Client and server failed to send messages!");
}
}
private static
void register(SerializationManager manager) {
manager.register(AMessage.class);
}
public static
class AMessage {
public
AMessage() {
}
}
}

220
not-fixed/ConnectionTest.java Executable file
View File

@ -0,0 +1,220 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.EndPoint;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listener.OnConnected;
import dorkbox.network.connection.Listener.OnDisconnected;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
public
class ConnectionTest extends BaseTest {
private AtomicInteger successCount;
@Test
public
void connectLocal() throws SecurityException, IOException {
System.out.println("---- " + "Local");
successCount = new AtomicInteger(0);
Configuration configuration = new Configuration();
configuration.localChannelName = EndPoint.LOCAL_CHANNEL;
register(configuration.serialization);
startServer(configuration);
startClient(configuration);
waitForThreads(10);
Assert.assertEquals(6, successCount.get());
}
@Test
public
void connectTcp() throws SecurityException, IOException {
System.out.println("---- " + "TCP");
successCount = new AtomicInteger(0);
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
register(configuration.serialization);
startServer(configuration);
configuration.host = host;
startClient(configuration);
waitForThreads(10);
Assert.assertEquals(6, successCount.get());
}
@Test
public
void connectUdp() throws SecurityException, IOException {
System.out.println("---- " + "UDP");
successCount = new AtomicInteger(0);
Configuration configuration = new Configuration();
configuration.udpPort = udpPort;
register(configuration.serialization);
startServer(configuration);
configuration.host = host;
startClient(configuration);
waitForThreads(10);
Assert.assertEquals(6, successCount.get());
}
@Test
public
void connectTcpUdp() throws SecurityException, IOException {
System.out.println("---- " + "TCP UDP");
successCount = new AtomicInteger(0);
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
register(configuration.serialization);
startServer(configuration);
configuration.host = host;
startClient(configuration);
waitForThreads(10);
Assert.assertEquals(6, successCount.get());
}
private
Server startServer(final Configuration configuration) throws SecurityException {
final Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
server.listeners()
.add(new OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
successCount.getAndIncrement();
}
})
.add(new OnDisconnected<Connection>() {
@Override
public
void disconnected(Connection connection) {
successCount.getAndIncrement();
}
})
.add(new Listener.OnMessageReceived<Connection, Object>() {
@Override
public void received(Connection connection, Object message) {
System.err.println("Received message from client: " + message.getClass().getSimpleName());
successCount.getAndIncrement();
if (configuration.tcpPort > 0) {
connection.send()
.TCP(message);
}
else {
connection.send()
.UDP(message);
}
}
});
return server;
}
private
Client startClient(final Configuration configuration) throws SecurityException, IOException {
Client client;
if (configuration != null) {
client = new Client(configuration);
}
else {
client = new Client();
}
addEndPoint(client);
client.listeners()
.add(new OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
successCount.getAndIncrement();
}
})
.add(new OnDisconnected<Connection>() {
@Override
public
void disconnected(Connection connection) {
successCount.getAndIncrement();
}
})
.add(new Listener.OnMessageReceived<Connection, Object>() {
@Override
public
void received(Connection connection, Object message) {
System.err.println("Received message from server: " + message.getClass()
.getSimpleName());
System.err.println("Now disconnecting!");
successCount.getAndIncrement();
stopEndPoints();
}
});
client.connect(5000);
if (configuration.tcpPort > 0) {
client.send()
.TCP(new BMessage());
}
else {
client.send()
.UDP(new BMessage());
}
return client;
}
private
void register(SerializationManager manager) {
manager.register(BMessage.class);
}
public static
class BMessage {
public
BMessage() {
}
}
}

86
not-fixed/DiscoverHostTest.java Executable file
View File

@ -0,0 +1,86 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.IOException;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.pipeline.discovery.BroadcastResponse;
import dorkbox.util.exceptions.SecurityException;
public
class DiscoverHostTest extends BaseTest {
volatile boolean connected = false;
@Test
public
void broadcast() throws SecurityException, IOException {
Configuration configuration = new Configuration();
// configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
// ----
BroadcastResponse host = Broadcast.discoverHost(udpPort, 2000);
if (host == null) {
stopEndPoints();
fail("No servers found. Maybe you are behind a VPN service or your network is mis-configured?");
return;
}
// run it twice...
host = Broadcast.discoverHost(udpPort, 2000);
if (host == null) {
stopEndPoints();
fail("No servers found. Maybe you are behind a VPN service or your network is mis-configured?");
return;
}
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
DiscoverHostTest.this.connected = true;
stopEndPoints();
}
});
client.connect(2000);
waitForThreads(20);
if (!this.connected) {
fail("Unable to connect to server.");
}
}
}

440
not-fixed/IdleTest.java Executable file
View File

@ -0,0 +1,440 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.junit.Test;
import dorkbox.network.PingPongTest.TYPE;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
@SuppressWarnings({"rawtypes"})
public
class IdleTest extends BaseTest {
private volatile boolean success = false;
enum ConnectionType {
TCP,
UDP
}
@Test
public
void InputStreamSenderTCP() throws SecurityException, IOException {
final int largeDataSize = 12345;
System.err.println("-- TCP");
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;
configuration.serialization = Serialization.DEFAULT(false, false, null);
streamSpecificType(largeDataSize, configuration, ConnectionType.TCP);
}
@Test
public
void InputStreamSenderUDP() throws SecurityException, IOException {
final int largeDataSize = 12345;
System.err.println("-- UDP");
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
configuration.serialization = Serialization.DEFAULT(false, false, null);
streamSpecificType(largeDataSize, configuration, ConnectionType.UDP);
}
// have to test sending objects
@Test
public
void ObjectSenderTCP() throws SecurityException, IOException {
final Data mainData = new Data();
populateData(mainData);
System.err.println("-- TCP");
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;
register(configuration.serialization);
sendObject(mainData, configuration, ConnectionType.TCP);
}
// have to test sending objects
@Test
public
void ObjectSenderUDP() throws SecurityException, IOException {
final Data mainData = new Data();
populateData(mainData);
System.err.println("-- UDP");
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
register(configuration.serialization);
sendObject(mainData, configuration, ConnectionType.TCP);
}
private
void sendObject(final Data mainData, Configuration configuration, final ConnectionType type)
throws SecurityException, IOException {
Server server = new Server(configuration);
addEndPoint(server);
server.setIdleTimeout(100);
server.bind(false);
server.listeners()
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
IdleBridge sendOnIdle = connection.sendOnIdle(mainData);
switch (type) {
case TCP:
sendOnIdle.TCP();
break;
case UDP:
sendOnIdle.UDP();
break;
}
}
});
// ----
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public
void received(Connection connection, Data object) {
if (mainData.equals(object)) {
IdleTest.this.success = true;
}
System.err.println("finished!");
stopEndPoints();
}
});
client.connect(5000);
waitForThreads();
if (!this.success) {
fail();
}
}
private
void streamSpecificType(final int largeDataSize, Configuration configuration, final ConnectionType type)
throws SecurityException, IOException {
Server server = new Server(configuration);
addEndPoint(server);
server.setIdleTimeout(100);
server.bind(false);
server.listeners()
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
ByteArrayOutputStream output = new ByteArrayOutputStream(largeDataSize);
for (int i = 0; i < largeDataSize; i++) {
output.write(i);
}
ByteArrayInputStream input = new ByteArrayInputStream(output.toByteArray());
IdleListener<Connection, byte[]> listener = null;
switch (type) {
case TCP:
listener = new IdleListenerTCP<Connection, byte[]>();
break;
case UDP:
listener = new IdleListenerUDP<Connection, byte[]>();
break;
}
// Send data in 512 byte chunks.
IdleBridge sendOnIdle = connection.sendOnIdle(new InputStreamSender<Connection>(listener, input, 512) {
@Override
protected
void start() {
// Normally would send an object so the receiving side knows how to handle the chunks we are about to send.
System.err.println("starting");
}
@Override
protected
byte[] onNext(byte[] bytes) {
//System.out.println("sending " + bytes.length);
return bytes; // Normally would wrap the byte[] with an object so the receiving side knows how to handle it.
}
});
switch (type) {
case TCP:
sendOnIdle.TCP();
break;
case UDP:
sendOnIdle.UDP();
break;
}
}
});
// ----
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener.OnMessageReceived<Connection, byte[]>() {
int total;
@Override
public
void received(Connection connection, byte[] object) {
int length = object.length;
//System.err.println("received " + length);
this.total += length;
if (this.total == largeDataSize) {
IdleTest.this.success = true;
System.err.println("finished!");
stopEndPoints();
}
}
});
client.connect(5000);
waitForThreads();
if (!this.success) {
fail();
}
}
private static
void populateData(Data data) {
StringBuilder buffer = new StringBuilder(3001);
for (int i = 0; i < 3000; i++) {
buffer.append('a');
}
data.string = buffer.toString();
data.strings = new String[] {"abcdefghijklmnopqrstuvwxyz0123456789", "", null, "!@#$", "<22><><EFBFBD><EFBFBD><EFBFBD>"};
data.ints = new int[] {-1234567, 1234567, -1, 0, 1, Integer.MAX_VALUE, Integer.MIN_VALUE};
data.shorts = new short[] {-12345, 12345, -1, 0, 1, Short.MAX_VALUE, Short.MIN_VALUE};
data.floats = new float[] {0, -0, 1, -1, 123456, -123456, 0.1f, 0.2f, -0.3f, (float) Math.PI, Float.MAX_VALUE, Float.MIN_VALUE};
data.doubles = new double[] {0, -0, 1, -1, 123456, -123456, 0.1d, 0.2d, -0.3d, Math.PI, Double.MAX_VALUE, Double.MIN_VALUE};
data.longs = new long[] {0, -0, 1, -1, 123456, -123456, 99999999999l, -99999999999l, Long.MAX_VALUE, Long.MIN_VALUE};
data.bytes = new byte[] {-123, 123, -1, 0, 1, Byte.MAX_VALUE, Byte.MIN_VALUE};
data.chars = new char[] {32345, 12345, 0, 1, 63, Character.MAX_VALUE, Character.MIN_VALUE};
data.booleans = new boolean[] {true, false};
data.Ints = new Integer[] {-1234567, 1234567, -1, 0, 1, Integer.MAX_VALUE, Integer.MIN_VALUE};
data.Shorts = new Short[] {-12345, 12345, -1, 0, 1, Short.MAX_VALUE, Short.MIN_VALUE};
data.Floats = new Float[] {0f, -0f, 1f, -1f, 123456f, -123456f, 0.1f, 0.2f, -0.3f, (float) Math.PI, Float.MAX_VALUE,
Float.MIN_VALUE};
data.Doubles = new Double[] {0d, -0d, 1d, -1d, 123456d, -123456d, 0.1d, 0.2d, -0.3d, Math.PI, Double.MAX_VALUE, Double.MIN_VALUE};
data.Longs = new Long[] {0l, -0l, 1l, -1l, 123456l, -123456l, 99999999999l, -99999999999l, Long.MAX_VALUE, Long.MIN_VALUE};
data.Bytes = new Byte[] {-123, 123, -1, 0, 1, Byte.MAX_VALUE, Byte.MIN_VALUE};
data.Chars = new Character[] {32345, 12345, 0, 1, 63, Character.MAX_VALUE, Character.MIN_VALUE};
data.Booleans = new Boolean[] {true, false};
}
private static
void register(SerializationManager manager) {
manager.register(int[].class);
manager.register(short[].class);
manager.register(float[].class);
manager.register(double[].class);
manager.register(long[].class);
manager.register(byte[].class);
manager.register(char[].class);
manager.register(boolean[].class);
manager.register(String[].class);
manager.register(Integer[].class);
manager.register(Short[].class);
manager.register(Float[].class);
manager.register(Double[].class);
manager.register(Long[].class);
manager.register(Byte[].class);
manager.register(Character[].class);
manager.register(Boolean[].class);
manager.register(Data.class);
manager.register(TYPE.class);
}
public static
class Data {
public String string;
public String[] strings;
public int[] ints;
public short[] shorts;
public float[] floats;
public double[] doubles;
public long[] longs;
public byte[] bytes;
public char[] chars;
public boolean[] booleans;
public Integer[] Ints;
public Short[] Shorts;
public Float[] Floats;
public Double[] Doubles;
public Long[] Longs;
public Byte[] Bytes;
public Character[] Chars;
public Boolean[] Booleans;
@Override
public
int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(this.Booleans);
result = prime * result + Arrays.hashCode(this.Bytes);
result = prime * result + Arrays.hashCode(this.Chars);
result = prime * result + Arrays.hashCode(this.Doubles);
result = prime * result + Arrays.hashCode(this.Floats);
result = prime * result + Arrays.hashCode(this.Ints);
result = prime * result + Arrays.hashCode(this.Longs);
result = prime * result + Arrays.hashCode(this.Shorts);
result = prime * result + Arrays.hashCode(this.booleans);
result = prime * result + Arrays.hashCode(this.bytes);
result = prime * result + Arrays.hashCode(this.chars);
result = prime * result + Arrays.hashCode(this.doubles);
result = prime * result + Arrays.hashCode(this.floats);
result = prime * result + Arrays.hashCode(this.ints);
result = prime * result + Arrays.hashCode(this.longs);
result = prime * result + Arrays.hashCode(this.shorts);
result = prime * result + (this.string == null ? 0 : this.string.hashCode());
result = prime * result + Arrays.hashCode(this.strings);
return result;
}
@Override
public
boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Data other = (Data) obj;
if (!Arrays.equals(this.Booleans, other.Booleans)) {
return false;
}
if (!Arrays.equals(this.Bytes, other.Bytes)) {
return false;
}
if (!Arrays.equals(this.Chars, other.Chars)) {
return false;
}
if (!Arrays.equals(this.Doubles, other.Doubles)) {
return false;
}
if (!Arrays.equals(this.Floats, other.Floats)) {
return false;
}
if (!Arrays.equals(this.Ints, other.Ints)) {
return false;
}
if (!Arrays.equals(this.Longs, other.Longs)) {
return false;
}
if (!Arrays.equals(this.Shorts, other.Shorts)) {
return false;
}
if (!Arrays.equals(this.booleans, other.booleans)) {
return false;
}
if (!Arrays.equals(this.bytes, other.bytes)) {
return false;
}
if (!Arrays.equals(this.chars, other.chars)) {
return false;
}
if (!Arrays.equals(this.doubles, other.doubles)) {
return false;
}
if (!Arrays.equals(this.floats, other.floats)) {
return false;
}
if (!Arrays.equals(this.ints, other.ints)) {
return false;
}
if (!Arrays.equals(this.longs, other.longs)) {
return false;
}
if (!Arrays.equals(this.shorts, other.shorts)) {
return false;
}
if (this.string == null) {
if (other.string != null) {
return false;
}
}
else if (!this.string.equals(other.string)) {
return false;
}
if (!Arrays.equals(this.strings, other.strings)) {
return false;
}
return true;
}
@Override
public
String toString() {
return "Data";
}
}
}

View File

@ -0,0 +1,159 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
public
class LargeResizeBufferTest extends BaseTest {
private static final int OBJ_SIZE = 1024 * 100;
private volatile int finalCheckAmount = 0;
private volatile int serverCheck = -1;
private volatile int clientCheck = -1;
@Test
public
void manyLargeMessages() throws SecurityException, IOException {
final int messageCount = 1024;
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.udpPort = udpPort;
configuration.host = host;
register(configuration.serialization);
Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
server.listeners()
.add(new Listener.OnMessageReceived<Connection, LargeMessage>() {
AtomicInteger received = new AtomicInteger();
AtomicInteger receivedBytes = new AtomicInteger();
@Override
public
void received(Connection connection, LargeMessage object) {
// System.err.println("Server ack message: " + received.get());
connection.send()
.TCP(object);
this.receivedBytes.addAndGet(object.bytes.length);
if (this.received.incrementAndGet() == messageCount) {
System.out.println("Server received all " + messageCount + " messages!");
System.out.println("Server received and sent " + this.receivedBytes.get() + " bytes.");
LargeResizeBufferTest.this.serverCheck = LargeResizeBufferTest.this.finalCheckAmount - this.receivedBytes.get();
System.out.println("Server missed " + LargeResizeBufferTest.this.serverCheck + " bytes.");
stopEndPoints();
}
}
});
Client client = new Client(configuration);
addEndPoint(client);
client.listeners()
.add(new Listener.OnMessageReceived<Connection, LargeMessage>() {
AtomicInteger received = new AtomicInteger();
AtomicInteger receivedBytes = new AtomicInteger();
@Override
public
void received(Connection connection, LargeMessage object) {
this.receivedBytes.addAndGet(object.bytes.length);
int count = this.received.getAndIncrement();
// System.out.println("Client received message: " + count);
if (count == messageCount) {
System.out.println("Client received all " + messageCount + " messages!");
System.out.println("Client received and sent " + this.receivedBytes.get() + " bytes.");
LargeResizeBufferTest.this.clientCheck = LargeResizeBufferTest.this.finalCheckAmount - this.receivedBytes.get();
System.out.println("Client missed " + LargeResizeBufferTest.this.clientCheck + " bytes.");
}
}
});
client.connect(5000);
SecureRandom random = new SecureRandom();
System.err.println(" Client sending " + messageCount + " messages");
for (int i = 0; i < messageCount; i++) {
this.finalCheckAmount += OBJ_SIZE; // keep increasing size
byte[] b = new byte[OBJ_SIZE];
random.nextBytes(b);
// set some of the bytes to be all `244`, just so some compression can occur (to test that as well)
for (int j = 0; j < 400; j++) {
b[j] = (byte) 244;
}
// System.err.println("Sending " + b.length + " bytes");
client.send()
.TCP(new LargeMessage(b));
}
System.err.println("Client has queued " + messageCount + " messages.");
waitForThreads();
if (this.clientCheck > 0) {
fail("Client missed " + this.clientCheck + " bytes.");
}
if (this.serverCheck > 0) {
fail("Server missed " + this.serverCheck + " bytes.");
}
}
private
void register(SerializationManager manager) {
manager.register(byte[].class);
manager.register(LargeMessage.class);
}
public static
class LargeMessage {
public byte[] bytes;
public
LargeMessage() {
}
public
LargeMessage(byte[] bytes) {
this.bytes = bytes;
}
}
}

197
not-fixed/Misc.kt Executable file
View File

@ -0,0 +1,197 @@
package dorkbox.network.other
import kotlin.math.ceil
/**
*
*/
object Misc {
private fun annotations() {
// internal val classesWithRmiFields = IdentityMap<Class<*>, Array<Field>>()
// // get all classes that have fields with @Rmi field annotation.
// // THESE classes must be customized with our special RmiFieldSerializer serializer so that the @Rmi field is properly handled
//
// // SPECIFICALLY, these fields must also be an IFACE for the field type!
//
// // NOTE: The @Rmi field type will already have to be a registered type with kryo!
// // we can use this information on WHERE to scan for classes.
// val filesToScan = mutableSetOf<File>()
//
// classesToRegister.forEach { registration ->
// val clazz = registration.clazz
//
// // can't do anything if codeSource is null!
// val codeSource = clazz.protectionDomain.codeSource ?: return@forEach
// // file:/Users/home/java/libs/xyz-123.jar
// // file:/projects/classes
// val jarOrClassPath = codeSource.location.toString()
//
// if (jarOrClassPath.endsWith(".jar")) {
// val fileName: String = URLDecoder.decode(jarOrClassPath.substring("file:".length), Charset.defaultCharset())
// filesToScan.add(File(fileName).absoluteFile)
// } else {
// val classPath: String = URLDecoder.decode(jarOrClassPath.substring("file:".length), Charset.defaultCharset())
// filesToScan.add(File(classPath).absoluteFile)
// }
// }
//
// val toTypedArray = filesToScan.toTypedArray()
// if (logger.isTraceEnabled) {
// toTypedArray.forEach {
// logger.trace { "Adding location to annotation scanner: $it"}
// }
// }
//
//
//
// // now scan these jars/directories
// val fieldsWithRmiAnnotation = AnnotationDetector.scanFiles(*toTypedArray)
// .forAnnotations(Rmi::class.java)
// .on(ElementType.FIELD)
// .collect { cursor -> Pair(cursor.type, cursor.field!!) }
//
// // have to make sure that the field type is specified as an interface (and not an implementation)
// fieldsWithRmiAnnotation.forEach { pair ->
// require(pair.second.type.isInterface) { "@Rmi annotated fields must be an interface!" }
// }
//
// if (fieldsWithRmiAnnotation.isNotEmpty()) {
// logger.info { "Verifying scanned classes containing @Rmi field annotations" }
// }
//
// // have to put this in a map, so we can quickly lookup + get the fields later on.
// // NOTE: a single class can have MULTIPLE fields with @Rmi annotations!
// val rmiAnnotationMap = IdentityMap<Class<*>, MutableList<Field>>()
// fieldsWithRmiAnnotation.forEach {
// var fields = rmiAnnotationMap[it.first]
// if (fields == null) {
// fields = mutableListOf()
// }
//
// fields.add(it.second)
// rmiAnnotationMap.put(it.first, fields)
// }
//
// // now make it an array for fast lookup for the [parent class] -> [annotated fields]
// rmiAnnotationMap.forEach {
// classesWithRmiFields.put(it.key, it.value.toTypedArray())
// }
//
// // this will set up the class registration information
// initKryo()
//
// // now everything is REGISTERED, possibly with custom serializers, we have to go back and change them to use our RmiFieldSerializer
// fieldsWithRmiAnnotation.forEach FIELD_SCAN@{ pair ->
// // the parent class must be an IMPL. The reason is that THIS FIELD will be sent as a RMI object, and this can only
// // happen on objects that exist
//
// // NOTE: it IS necessary for the rmi-client to be aware of the @Rmi annotation (because it also has to have the correct serialization)
//
// // also, it is possible for the class that has the @Rmi field to be a NORMAL object (and not an RMI object)
// // this means we found the registration for the @Rmi field annotation
//
// val parentRmiRegistration = classesToRegister.firstOrNull { it is ClassRegistrationForRmi && it.implClass == pair.first}
//
//
// // if we have a parent-class registration, this means we are the rmi-server
// //
// // AND BECAUSE OF THIS
// //
// // we must also have the field type registered as RMI
// if (parentRmiRegistration != null) {
// // rmi-server
//
// // is the field type registered also?
// val fieldRmiRegistration = classesToRegister.firstOrNull { it.clazz == pair.second.type}
// require(fieldRmiRegistration is ClassRegistrationForRmi) { "${pair.second.type} is not registered for RMI! Unable to continue"}
//
// logger.trace { "Found @Rmi field annotation '${pair.second.type}' in class '${pair.first}'" }
// } else {
// // rmi-client
//
// // NOTE: rmi-server MUST have the field IMPL registered (ie: via RegisterRmi)
// // rmi-client will have the serialization updated from the rmi-server during connection handshake
// }
// }
}
/**
* Split array into chunks, max of 256 chunks.
* byte[0] = chunk ID
* byte[1] = total chunks (0-255) (where 0->1, 2->3, 127->127 because this is indexed by a byte)
*/
private fun divideArray(source: ByteArray, chunksize: Int): Array<ByteArray>? {
val fragments = ceil(source.size / chunksize.toDouble()).toInt()
if (fragments > 127) {
// cannot allow more than 127
return null
}
// pre-allocate the memory
val splitArray = Array(fragments) { ByteArray(chunksize + 2) }
var start = 0
for (i in splitArray.indices) {
var length = if (start + chunksize > source.size) {
source.size - start
} else {
chunksize
}
splitArray[i] = ByteArray(length + 2)
splitArray[i][0] = i.toByte() // index
splitArray[i][1] = fragments.toByte() // total number of fragments
System.arraycopy(source, start, splitArray[i], 2, length)
start += chunksize
}
return splitArray
}
}
// fun initClassRegistration(channel: Channel, registration: Registration): Boolean {
// val details = serialization.getKryoRegistrationDetails()
// val length = details.size
// if (length > Serialization.CLASS_REGISTRATION_VALIDATION_FRAGMENT_SIZE) {
// // it is too large to send in a single packet
//
// // child arrays have index 0 also as their 'index' and 1 is the total number of fragments
// val fragments = divideArray(details, Serialization.CLASS_REGISTRATION_VALIDATION_FRAGMENT_SIZE)
// if (fragments == null) {
// logger.error("Too many classes have been registered for Serialization. Please report this issue")
// return false
// }
// val allButLast = fragments.size - 1
// for (i in 0 until allButLast) {
// val fragment = fragments[i]
// val fragmentedRegistration = Registration.hello(registration.oneTimePad, config.settingsStore.getPublicKey())
// fragmentedRegistration.payload = fragment
//
// // tell the server we are fragmented
// fragmentedRegistration.upgradeType = UpgradeType.FRAGMENTED
//
// // tell the server we are upgraded (it will bounce back telling us to connect)
// fragmentedRegistration.upgraded = true
// channel.writeAndFlush(fragmentedRegistration)
// }
//
// // now tell the server we are done with the fragments
// val fragmentedRegistration = Registration.hello(registration.oneTimePad, config.settingsStore.getPublicKey())
// fragmentedRegistration.payload = fragments[allButLast]
//
// // tell the server we are fragmented
// fragmentedRegistration.upgradeType = UpgradeType.FRAGMENTED
//
// // tell the server we are upgraded (it will bounce back telling us to connect)
// fragmentedRegistration.upgraded = true
// channel.writeAndFlush(fragmentedRegistration)
// } else {
// registration.payload = details
//
// // tell the server we are upgraded (it will bounce back telling us to connect)
// registration.upgraded = true
// channel.writeAndFlush(registration)
// }
// return true
// }

242
not-fixed/MultipleThreadTest.java Executable file
View File

@ -0,0 +1,242 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listeners;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
public
class MultipleThreadTest extends BaseTest {
private final Object lock = new Object();
private volatile boolean stillRunning = false;
private final Object finalRunLock = new Object();
private volatile boolean finalStillRunning = false;
private final int messageCount = 150;
private final int threadCount = 15;
private final int clientCount = 13;
private final List<Client> clients = new ArrayList<Client>(this.clientCount);
int perClientReceiveTotal = (this.messageCount * this.threadCount);
int serverReceiveTotal = perClientReceiveTotal * this.clientCount;
AtomicInteger sent = new AtomicInteger(0);
AtomicInteger totalClientReceived = new AtomicInteger(0);
AtomicInteger receivedServer = new AtomicInteger(1);
ConcurrentHashMap<Integer, DataClass> sentStringsToClientDebug = new ConcurrentHashMap<Integer, DataClass>();
@Test
public
void multipleThreads() throws SecurityException, IOException {
// our clients should receive messageCount * threadCount * clientCount TOTAL messages
final int totalClientReceivedCountExpected = this.clientCount * this.messageCount * this.threadCount;
final int totalServerReceivedCountExpected = this.clientCount * this.messageCount;
System.err.println("CLIENT RECEIVES: " + totalClientReceivedCountExpected);
System.err.println("SERVER RECEIVES: " + totalServerReceivedCountExpected);
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;
configuration.serialization.register(String[].class);
configuration.serialization.register(DataClass.class);
final Server server = new Server(configuration);
server.disableRemoteKeyValidation();
addEndPoint(server);
server.bind(false);
final Listeners listeners = server.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
System.err.println("Client connected to server.");
// kickoff however many threads we need, and send data to the client.
for (int i = 1; i <= MultipleThreadTest.this.threadCount; i++) {
final int index = i;
new Thread() {
@Override
public
void run() {
for (int i = 1; i <= MultipleThreadTest.this.messageCount; i++) {
int incrementAndGet = MultipleThreadTest.this.sent.getAndIncrement();
DataClass dataClass = new DataClass("Server -> client. Thread #" + index + " message# " + incrementAndGet,
incrementAndGet);
//System.err.println(dataClass.data);
MultipleThreadTest.this.sentStringsToClientDebug.put(incrementAndGet, dataClass);
connection.send()
.TCP(dataClass)
.flush();
}
}
}.start();
}
}
});
listeners.add(new Listener.OnMessageReceived<Connection, DataClass>() {
@Override
public
void received(Connection connection, DataClass object) {
int incrementAndGet = MultipleThreadTest.this.receivedServer.getAndIncrement();
//System.err.println("server #" + incrementAndGet);
if (incrementAndGet % MultipleThreadTest.this.messageCount == 0) {
System.err.println("Server receive DONE for client " + incrementAndGet);
stillRunning = false;
synchronized (MultipleThreadTest.this.lock) {
MultipleThreadTest.this.lock.notifyAll();
}
}
if (incrementAndGet == totalServerReceivedCountExpected) {
System.err.println("Server DONE: " + incrementAndGet);
finalStillRunning = false;
synchronized (MultipleThreadTest.this.finalRunLock) {
MultipleThreadTest.this.finalRunLock.notifyAll();
}
}
}
});
// ----
finalStillRunning = true;
for (int i = 1; i <= this.clientCount; i++) {
final int index = i;
Client client = new Client(configuration);
this.clients.add(client);
addEndPoint(client);
client.listeners()
.add(new Listener.OnMessageReceived<Connection, DataClass>() {
final int clientIndex = index;
final AtomicInteger received = new AtomicInteger(1);
@Override
public
void received(Connection connection, DataClass object) {
totalClientReceived.getAndIncrement();
int clientLocalCounter = this.received.getAndIncrement();
MultipleThreadTest.this.sentStringsToClientDebug.remove(object.index);
//System.err.println(object.data);
// we finished!!
if (clientLocalCounter == perClientReceiveTotal) {
//System.err.println("Client #" + clientIndex + " received " + clientLocalCounter + " Sending back " +
// MultipleThreadTest.this.messageCount + " messages.");
// now spam back messages!
for (int i = 0; i < MultipleThreadTest.this.messageCount; i++) {
connection.send()
.TCP(new DataClass("Client #" + clientIndex + " -> Server message " + i, index));
}
}
}
});
stillRunning = true;
client.connect(5000);
while (stillRunning) {
synchronized (this.lock) {
try {
this.lock.wait(5 * 1000); // 5 secs
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
while (finalStillRunning) {
synchronized (this.finalRunLock) {
try {
this.finalRunLock.wait(5 * 1000); // 5 secs
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// CLIENT will wait until it's done connecting, but SERVER is async.
// the ONLY way to safely work in the server is with LISTENERS. Everything else can FAIL, because of it's async nature.
if (!this.sentStringsToClientDebug.isEmpty()) {
System.err.println("MISSED DATA: " + this.sentStringsToClientDebug.size());
for (Map.Entry<Integer, DataClass> i : this.sentStringsToClientDebug.entrySet()) {
System.err.println(i.getKey() + " : " + i.getValue().data);
}
}
stopEndPoints();
assertEquals(totalClientReceivedCountExpected, totalClientReceived.get());
// offset by 1 since we start at 1
assertEquals(totalServerReceivedCountExpected, receivedServer.get()-1);
}
public static
class DataClass {
public String data;
public Integer index;
public
DataClass() {
}
public
DataClass(String data, Integer index) {
this.data = data;
this.index = index;
}
}
}

326
not-fixed/PingPongLocalTest.java Executable file
View File

@ -0,0 +1,326 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listeners;
import dorkbox.network.serialization.Serialization;
import dorkbox.util.exceptions.SecurityException;
import dorkbox.util.serialization.SerializationManager;
public
class PingPongLocalTest extends BaseTest {
int tries = 10000;
private volatile String fail;
@Test
public void pingPongLocal() throws SecurityException, IOException {
this.fail = "Data not received.";
final Data dataLOCAL = new Data();
populateData(dataLOCAL);
Configuration configuration = Configuration.localOnly();
register(configuration.serialization);
Server server = new Server(configuration);
addEndPoint(server);
server.bind(false);
final Listeners listeners = server.listeners();
listeners.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongLocalTest.this.fail = "Error during processing. " + throwable;
}
});
listeners.add(new Listener.OnMessageReceived<Connection, Data>() {
@Override
public
void received(Connection connection, Data data) {
connection.id();
if (!data.equals(dataLOCAL)) {
PingPongLocalTest.this.fail = "data is not equal on server.";
throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail);
}
connection.send()
.TCP(data);
}
});
// ----
Client client = new Client(configuration);
addEndPoint(client);
final Listeners listeners1 = client.listeners();
listeners1.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
PingPongLocalTest.this.fail = null;
connection.send()
.TCP(dataLOCAL);
// connection.sendUDP(dataUDP); // TCP and UDP are the same for a local channel.
}
});
listeners1.add(new Listener.OnError<Connection>() {
@Override
public
void error(Connection connection, Throwable throwable) {
PingPongLocalTest.this.fail = "Error during processing. " + throwable;
System.err.println(PingPongLocalTest.this.fail);
}
});
listeners1.add(new Listener.OnMessageReceived<Connection, Data>() {
AtomicInteger check = new AtomicInteger(0);
@Override
public
void received(Connection connection, Data data) {
if (!data.equals(dataLOCAL)) {
PingPongLocalTest.this.fail = "data is not equal on client.";
throw new RuntimeException("Fail! " + PingPongLocalTest.this.fail);
}
if (this.check.getAndIncrement() <= PingPongLocalTest.this.tries) {
connection.send()
.TCP(data);
}
else {
System.err.println("Ran LOCAL " + PingPongLocalTest.this.tries + " times");
stopEndPoints();
}
}
});
client.connect(5000);
waitForThreads();
if (this.fail != null) {
fail(this.fail);
}
}
private void populateData(Data data) {
StringBuilder buffer = new StringBuilder();
for (int i = 0; i < 3000; i++) {
buffer.append('a');
}
data.string = buffer.toString();
data.strings = new String[] {"abcdefghijklmnopqrstuvwxyz0123456789","",null,"!@#$","<22><><EFBFBD><EFBFBD><EFBFBD>"};
data.ints = new int[] {-1234567,1234567,-1,0,1,Integer.MAX_VALUE,Integer.MIN_VALUE};
data.shorts = new short[] {-12345,12345,-1,0,1,Short.MAX_VALUE,Short.MIN_VALUE};
data.floats = new float[] {0,-0,1,-1,123456,-123456,0.1f,0.2f,-0.3f,(float) Math.PI,Float.MAX_VALUE,
Float.MIN_VALUE};
data.doubles = new double[] {0,-0,1,-1,123456,-123456,0.1d,0.2d,-0.3d,Math.PI,Double.MAX_VALUE,Double.MIN_VALUE};
data.longs = new long[] {0,-0,1,-1,123456,-123456,99999999999l,-99999999999l,Long.MAX_VALUE,Long.MIN_VALUE};
data.bytes = new byte[] {-123,123,-1,0,1,Byte.MAX_VALUE,Byte.MIN_VALUE};
data.chars = new char[] {32345,12345,0,1,63,Character.MAX_VALUE,Character.MIN_VALUE};
data.booleans = new boolean[] {true,false};
data.Ints = new Integer[] {-1234567,1234567,-1,0,1,Integer.MAX_VALUE,Integer.MIN_VALUE};
data.Shorts = new Short[] {-12345,12345,-1,0,1,Short.MAX_VALUE,Short.MIN_VALUE};
data.Floats = new Float[] {0f,-0f,1f,-1f,123456f,-123456f,0.1f,0.2f,-0.3f,(float) Math.PI,Float.MAX_VALUE,
Float.MIN_VALUE};
data.Doubles = new Double[] {0d,-0d,1d,-1d,123456d,-123456d,0.1d,0.2d,-0.3d,Math.PI,Double.MAX_VALUE,
Double.MIN_VALUE};
data.Longs = new Long[] {0l,-0l,1l,-1l,123456l,-123456l,99999999999l,-99999999999l,Long.MAX_VALUE,
Long.MIN_VALUE};
data.Bytes = new Byte[] {-123,123,-1,0,1,Byte.MAX_VALUE,Byte.MIN_VALUE};
data.Chars = new Character[] {32345,12345,0,1,63,Character.MAX_VALUE,Character.MIN_VALUE};
data.Booleans = new Boolean[] {true,false};
}
private void register(SerializationManager manager) {
manager.register(int[].class);
manager.register(short[].class);
manager.register(float[].class);
manager.register(double[].class);
manager.register(long[].class);
manager.register(byte[].class);
manager.register(char[].class);
manager.register(boolean[].class);
manager.register(String[].class);
manager.register(Integer[].class);
manager.register(Short[].class);
manager.register(Float[].class);
manager.register(Double[].class);
manager.register(Long[].class);
manager.register(Byte[].class);
manager.register(Character[].class);
manager.register(Boolean[].class);
manager.register(Data.class);
}
static public class Data {
public String string;
public String[] strings;
public int[] ints;
public short[] shorts;
public float[] floats;
public double[] doubles;
public long[] longs;
public byte[] bytes;
public char[] chars;
public boolean[] booleans;
public Integer[] Ints;
public Short[] Shorts;
public Float[] Floats;
public Double[] Doubles;
public Long[] Longs;
public Byte[] Bytes;
public Character[] Chars;
public Boolean[] Booleans;
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(this.Booleans);
result = prime * result + Arrays.hashCode(this.Bytes);
result = prime * result + Arrays.hashCode(this.Chars);
result = prime * result + Arrays.hashCode(this.Doubles);
result = prime * result + Arrays.hashCode(this.Floats);
result = prime * result + Arrays.hashCode(this.Ints);
result = prime * result + Arrays.hashCode(this.Longs);
result = prime * result + Arrays.hashCode(this.Shorts);
result = prime * result + Arrays.hashCode(this.booleans);
result = prime * result + Arrays.hashCode(this.bytes);
result = prime * result + Arrays.hashCode(this.chars);
result = prime * result + Arrays.hashCode(this.doubles);
result = prime * result + Arrays.hashCode(this.floats);
result = prime * result + Arrays.hashCode(this.ints);
result = prime * result + Arrays.hashCode(this.longs);
result = prime * result + Arrays.hashCode(this.shorts);
result = prime * result + (this.string == null ? 0 : this.string.hashCode());
result = prime * result + Arrays.hashCode(this.strings);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Data other = (Data) obj;
if (!Arrays.equals(this.Booleans, other.Booleans)) {
return false;
}
if (!Arrays.equals(this.Bytes, other.Bytes)) {
return false;
}
if (!Arrays.equals(this.Chars, other.Chars)) {
return false;
}
if (!Arrays.equals(this.Doubles, other.Doubles)) {
return false;
}
if (!Arrays.equals(this.Floats, other.Floats)) {
return false;
}
if (!Arrays.equals(this.Ints, other.Ints)) {
return false;
}
if (!Arrays.equals(this.Longs, other.Longs)) {
return false;
}
if (!Arrays.equals(this.Shorts, other.Shorts)) {
return false;
}
if (!Arrays.equals(this.booleans, other.booleans)) {
return false;
}
if (!Arrays.equals(this.bytes, other.bytes)) {
return false;
}
if (!Arrays.equals(this.chars, other.chars)) {
return false;
}
if (!Arrays.equals(this.doubles, other.doubles)) {
return false;
}
if (!Arrays.equals(this.floats, other.floats)) {
return false;
}
if (!Arrays.equals(this.ints, other.ints)) {
return false;
}
if (!Arrays.equals(this.longs, other.longs)) {
return false;
}
if (!Arrays.equals(this.shorts, other.shorts)) {
return false;
}
if (this.string == null) {
if (other.string != null) {
return false;
}
} else if (!this.string.equals(other.string)) {
return false;
}
if (!Arrays.equals(this.strings, other.strings)) {
return false;
}
return true;
}
@Override
public String toString() {
return "Data";
}
}
}

296
not-fixed/ReconnectTest.java Executable file
View File

@ -0,0 +1,296 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
*
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener;
import dorkbox.network.connection.Listeners;
import dorkbox.util.exceptions.SecurityException;
// NOTE: UDP is unreliable, EVEN ON LOOPBACK! So this can fail with UDP. TCP will never fail.
public
class ReconnectTest extends BaseTest {
private final AtomicInteger receivedCount = new AtomicInteger(0);
private static final Logger logger = LoggerFactory.getLogger(ReconnectTest.class.getSimpleName());
@Test
public
void socketReuseUDP() throws IOException, SecurityException {
socketReuse(false, true);
}
@Test
public
void socketReuseTCP() throws IOException, SecurityException {
socketReuse(true, false);
}
@Test
public
void socketReuseTCPUDP() throws IOException, SecurityException {
socketReuse(true, true);
}
private
void socketReuse(final boolean useTCP, final boolean useUDP) throws SecurityException, IOException {
receivedCount.set(0);
Configuration configuration = new Configuration();
configuration.host = host;
if (useTCP) {
configuration.tcpPort = tcpPort;
}
if (useUDP) {
configuration.udpPort = udpPort;
}
AtomicReference<CountDownLatch> latch = new AtomicReference<CountDownLatch>();
Server server = new Server(configuration);
addEndPoint(server);
final Listeners listeners = server.listeners();
listeners.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
if (useTCP) {
connection.send()
.TCP("-- TCP from server");
}
if (useUDP) {
connection.send()
.UDP("-- UDP from server");
}
}
});
listeners.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
logger.error("----- <S " + connection + "> " + incrementAndGet + " : " + object);
latch.get().countDown();
}
});
server.bind(false);
// ----
Client client = new Client(configuration);
addEndPoint(client);
final Listeners listeners1 = client.listeners();
listeners1.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
if (useTCP) {
connection.send()
.TCP("-- TCP from client");
}
if (useUDP) {
connection.send()
.UDP("-- UDP from client");
}
}
});
listeners1.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
logger.error("----- <C " + connection + "> " + incrementAndGet + " : " + object);
latch.get().countDown();
}
});
int latchCount = 2;
int count = 100;
int initialCount = 2;
if (useTCP && useUDP) {
initialCount += 2;
latchCount += 2;
}
try {
for (int i = 1; i < count + 1; i++) {
logger.error(".....");
latch.set(new CountDownLatch(latchCount));
try {
client.connect(5000);
} catch (IOException e) {
e.printStackTrace();
}
int retryCount = 20;
int lastRetryCount;
int target = i * initialCount;
boolean failed = false;
synchronized (receivedCount) {
while (this.receivedCount.get() != target) {
lastRetryCount = this.receivedCount.get();
try {
latch.get().await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// check to see if we changed at all...
if (lastRetryCount == this.receivedCount.get()) {
if (retryCount-- < 0) {
logger.error("Aborting unit test... wrong count!");
if (useUDP) {
// If TCP and UDP both fill the pipe, THERE WILL BE FRAGMENTATION and dropped UDP packets!
// it results in severe UDP packet loss and contention.
//
// http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM
// also, a google search on just "INET97/proceedings/F3/F3_1.HTM" turns up interesting problems.
// Usually it's with ISPs.
logger.error("NOTE: UDP can fail, even on loopback! See: http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM");
}
failed = true;
break;
}
} else {
retryCount = 20;
}
}
}
client.close();
logger.error(".....");
if (failed) {
break;
}
}
int specified = count * initialCount;
int received = this.receivedCount.get();
if (specified != received) {
logger.error("NOTE: UDP can fail, even on loopback! See: http://www.isoc.org/INET97/proceedings/F3/F3_1.HTM");
}
assertEquals(specified, received);
} finally {
stopEndPoints();
waitForThreads(10);
}
}
@Test
public
void localReuse() throws SecurityException, IOException {
receivedCount.set(0);
Server server = new Server();
addEndPoint(server);
server.listeners()
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.self("-- LOCAL from server");
}
});
server.listeners()
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
System.out.println("----- <S " + connection + "> " + incrementAndGet + " : " + object);
}
});
// ----
Client client = new Client();
addEndPoint(client);
client.listeners()
.add(new Listener.OnConnected<Connection>() {
@Override
public
void connected(Connection connection) {
connection.send()
.self("-- LOCAL from client");
}
});
client.listeners()
.add(new Listener.OnMessageReceived<Connection, String>() {
@Override
public
void received(Connection connection, String object) {
int incrementAndGet = ReconnectTest.this.receivedCount.incrementAndGet();
System.out.println("----- <C " + connection + "> " + incrementAndGet + " : " + object);
}
});
server.bind(false);
int count = 10;
for (int i = 1; i < count + 1; i++) {
client.connect(5000);
int target = i * 2;
while (this.receivedCount.get() != target) {
System.out.println("----- Waiting...");
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
client.close();
}
assertEquals(count * 2, this.receivedCount.get());
stopEndPoints();
waitForThreads(10);
}
}

104
not-fixed/SocketOpenTest.java Executable file
View File

@ -0,0 +1,104 @@
/*
* Copyright 2018 dorkbox, llc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import dorkbox.network.connection.Connection;
import dorkbox.network.connection.Listener.OnConnected;
import dorkbox.util.exceptions.SecurityException;
/**
*
*/
public
class SocketOpenTest extends BaseTest {
CountDownLatch latch = new CountDownLatch(1);
@Test
public
void socketConnect() throws SecurityException {
Configuration configuration = new Configuration();
configuration.tcpPort = tcpPort;
configuration.host = host;
Server server = new Server(configuration);
addEndPoint(server);
server.listeners()
.add(new OnConnected<Connection>() {
@Override
public
void connected(final Connection connection) {
latch.countDown();
}
});
server.bind(false);
boolean connectedSocket = false;
// since we check the socket, if we are NOT connected to a socket, then we're done.
Socket sock = null;
try {
sock = new Socket(host, tcpPort);
if (sock.isConnected()) {
// connected to server
connectedSocket = true;
sock.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (sock != null) {
sock.close();
}
} catch (IOException ignored) {
}
}
Assert.assertTrue(connectedSocket);
Client client = new Client(configuration);
addEndPoint(client);
try {
client.connect(5000);
} catch (IOException e) {
e.printStackTrace();
}
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Assert.fail("Interrupted while waiting for latch...");
}
stopEndPoints();
waitForThreads(10);
}
}

View File

@ -42,11 +42,11 @@ import dorkbox.network.handshake.ClientHandshake
import dorkbox.network.ping.Ping
import dorkbox.network.ping.PingManager
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.lang.Thread.sleep
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
@ -130,7 +130,7 @@ open class Client<CONNECTION : Connection>(
/**
* Gets the version number.
*/
const val version = "5.27"
const val version = "5.28"
/**
* Checks to see if a client (using the specified configuration) is running.
@ -214,8 +214,7 @@ open class Client<CONNECTION : Connection>(
remoteAddress: InetAddress,
connectionTimeoutSec: Int = 30,
reliable: Boolean = true)
= runBlocking {
{
val remoteAddressString = when (remoteAddress) {
is Inet4Address -> IPv4.toString(remoteAddress)
is Inet6Address -> IPv6.toString(remoteAddress, true)
@ -246,7 +245,7 @@ open class Client<CONNECTION : Connection>(
ipcPublicationId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_SUB,
ipcSubscriptionId: Int = AeronDriver.IPC_HANDSHAKE_STREAM_ID_PUB,
connectionTimeoutSec: Int = 30)
= runBlocking {
{
// Default IPC ports are flipped because they are in the perspective of the SERVER
@ -290,7 +289,7 @@ open class Client<CONNECTION : Connection>(
remoteAddress: String = "",
connectionTimeoutSec: Int = 30,
reliable: Boolean = true)
= runBlocking {
{
when {
// this is default IPC settings
@ -399,7 +398,7 @@ open class Client<CONNECTION : Connection>(
* @throws ClientException if there are misc errors
*/
@Suppress("DuplicatedCode")
private suspend fun connect(
private fun connect(
remoteAddress: InetAddress? = null,
remoteAddressString: String,
// Default IPC ports are flipped because they are in the perspective of the SERVER
@ -489,12 +488,15 @@ open class Client<CONNECTION : Connection>(
} catch (e: ClientRetryException) {
handshake.reset()
// maybe the aeron driver isn't running?
// maybe the aeron driver isn't running? (or isn't running correctly?)
aeronDriver.closeIfSingle()
aeronDriver.start()
// short delay, since it failed we want to limit the retry rate to something slower than "as fast as the CPU can do it"
// we also want to go at SLIGHTLY slower that the aeron driver timeout frequency, this way - if there are connection or handshake issues, the server has the chance to expire the connections.
delay(aeronDriver.driverTimeout()+1)
// If we go TOO FAST, then the server will EVENTUALLY have aeron errors (since it can't keep up per client). We literally
// want to have 1 in-flight handshake, per connection attempt, during the aeron connection timeout
sleep(aeronDriver.driverTimeout()+1)
if (logger.isTraceEnabled) {
logger.trace(e) { "Unable to connect to '$remoteAddressString', retrying..." }
} else {
@ -529,7 +531,7 @@ open class Client<CONNECTION : Connection>(
}
}
private suspend fun buildIpcHandshake(ipcSubscriptionId: Int, ipcPublicationId: Int, connectionTimeoutSec: Int, reliable: Boolean): MediaDriverConnection {
private fun buildIpcHandshake(ipcSubscriptionId: Int, ipcPublicationId: Int, connectionTimeoutSec: Int, reliable: Boolean): MediaDriverConnection {
if (remoteAddress == null) {
logger.info { "IPC enabled" }
} else {
@ -581,7 +583,7 @@ open class Client<CONNECTION : Connection>(
return udpConnection
}
private suspend fun buildUdpHandshake(connectionTimeoutSec: Int, reliable: Boolean): MediaDriverConnection {
private fun buildUdpHandshake(connectionTimeoutSec: Int, reliable: Boolean): MediaDriverConnection {
val test = UdpMediaDriverClientConnection(
address = remoteAddress!!,
publicationPort = config.subscriptionPort,
@ -598,7 +600,7 @@ open class Client<CONNECTION : Connection>(
}
// the handshake process might have to restart this connection process.
private suspend fun connect0(handshake: ClientHandshake<CONNECTION>, handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) {
private fun connect0(handshake: ClientHandshake<CONNECTION>, handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) {
// this will block until the connection timeout, and throw an exception if we were unable to connect with the server
val isUsingIPC = handshakeConnection is IpcMediaDriverConnection

View File

@ -18,21 +18,23 @@ package dorkbox.network
import dorkbox.netUtil.IPv4
import dorkbox.netUtil.IPv6
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.CoroutineBackoffIdleStrategy
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy
import dorkbox.network.connection.Connection
import dorkbox.network.serialization.Serialization
import dorkbox.os.OS
import dorkbox.storage.Storage
import dorkbox.util.NamedThreadFactory
import io.aeron.driver.Configuration
import io.aeron.driver.ThreadingMode
import io.aeron.exceptions.DriverTimeoutException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import mu.KLogger
import org.agrona.SystemUtil
import org.agrona.concurrent.AgentTerminationException
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.IdleStrategy
import org.agrona.concurrent.SleepingMillisIdleStrategy
import java.io.File
import java.net.BindException
import java.util.concurrent.*
@ -42,7 +44,7 @@ class ServerConfiguration : dorkbox.network.Configuration() {
/**
* Gets the version number.
*/
const val version = "5.27"
const val version = "5.28"
}
/**
@ -91,6 +93,16 @@ class ServerConfiguration : dorkbox.network.Configuration() {
field = value
}
/**
* Specifies the Java thread that will poll the underlying network for incoming messages
*/
var networkInterfaceEventDispatcher = Executors.newSingleThreadExecutor(
NamedThreadFactory( "Network Event Dispatcher", Thread.currentThread().threadGroup, Thread.NORM_PRIORITY, true))
set(value) {
require(!contextDefined) { errorMessage }
field = value
}
/**
* Allows the user to change how endpoint settings and public key information are saved.
*/
@ -120,7 +132,7 @@ class ServerConfiguration : dorkbox.network.Configuration() {
require(enableIpc || enableIPv4 || enableIPv6) { "At least one of IPC/IPv4/IPv6 must be enabled!" }
if (enableIpc) {
require(!aeronDirectoryForceUnique) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" }
require(!uniqueAeronDirectory) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" }
} else {
if (enableIPv4 && !enableIPv6) {
require(IPv4.isAvailable) { "IPC/IPv6 are disabled and IPv4 is enabled, but there is no IPv4 interface available!" }
@ -267,7 +279,7 @@ open class Configuration {
*
* Normally, events should be dispatched asynchronously across a thread pool, but in certain circumstances you may want to constrain this to a single thread dispatcher or other, custom dispatcher.
*/
var dispatch = CoroutineScope(networkDispatcher)
var dispatch = CoroutineScope(Dispatchers.Default)
/**
* Allows the user to change how endpoint settings and public key information are saved.
@ -310,7 +322,7 @@ open class Configuration {
* The main difference in strategies is how responsive to changes should the idler be when idle for a little bit of time and
* how much CPU should be consumed when no work is being done. There is an inherent tradeoff to consider.
*/
var pollIdleStrategy: CoroutineIdleStrategy = CoroutineBackoffIdleStrategy(maxSpins = 100, maxYields = 10, minParkPeriodMs = 1, maxParkPeriodMs = 100)
var pollIdleStrategy: IdleStrategy = BackoffIdleStrategy(100, 10, 1, 100)
set(value) {
require(!contextDefined) { errorMessage }
field = value
@ -327,7 +339,7 @@ open class Configuration {
* The main difference in strategies is how responsive to changes should the idler be when idle for a little bit of time and
* how much CPU should be consumed when no work is being done. There is an inherent tradeoff to consider.
*/
var sendIdleStrategy: CoroutineIdleStrategy = CoroutineSleepingMillisIdleStrategy(sleepPeriodMs = 100)
var sendIdleStrategy: IdleStrategy = SleepingMillisIdleStrategy(100)
set(value) {
require(!contextDefined) { errorMessage }
field = value
@ -375,7 +387,7 @@ open class Configuration {
/**
* Should we force the Aeron location to be unique for every instance? This is mutually exclusive with IPC.
*/
var aeronDirectoryForceUnique = false
var uniqueAeronDirectory = false
set(value) {
require(!contextDefined) { errorMessage }
field = value
@ -547,7 +559,7 @@ open class Configuration {
open fun validate() {
// have to do some basic validation of our configuration
require(!(enableIpc && aeronDirectoryForceUnique)) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" }
require(!(enableIpc && uniqueAeronDirectory)) { "IPC enabled and forcing a unique Aeron directory are incompatible (IPC requires shared Aeron directories)!" }
require(publicationPort > 0) { "configuration port must be > 0" }
require(publicationPort < 65535) { "configuration port must be < 65535" }

View File

@ -34,8 +34,6 @@ import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.net.InetAddress
import java.util.concurrent.*
@ -124,7 +122,7 @@ open class Server<CONNECTION : Connection>(
/**
* Gets the version number.
*/
const val version = "5.27"
const val version = "5.28"
/**
* Checks to see if a server (using the specified configuration) is running.
@ -154,8 +152,11 @@ open class Server<CONNECTION : Connection>(
/**
* These are run in lock-step to shutdown/close the server. Afterwards, bind() can be called again
*/
private val shutdownPollMutex = Mutex(locked = true)
private val shutdownEventMutex = Mutex(locked = true)
@Volatile
private var shutdownPollLatch = CountDownLatch(1)
@Volatile
private var shutdownEventLatch = CountDownLatch(1)
/**
@ -221,28 +222,31 @@ open class Server<CONNECTION : Connection>(
* Binds the server to AERON configuration
*/
@Suppress("DuplicatedCode")
fun bind() = runBlocking {
fun bind() {
if (bindAlreadyCalled.getAndSet(true)) {
logger.error { "Unable to bind when the server is already running!" }
return@runBlocking
return
}
try {
initEndpointState()
} catch (e: Exception) {
logger.error(e) { "Unable to initialize the endpoint state" }
return@runBlocking
return
}
shutdownPollLatch = CountDownLatch(1)
shutdownEventLatch = CountDownLatch(1)
config as ServerConfiguration
// we are done with initial configuration, now initialize aeron and the general state of this endpoint
// this forces the current thread to WAIT until poll system has started
val mutex = Mutex(locked = true)
val pollStartupLatch = CountDownLatch(1)
val server = this@Server
val ipcPoller: AeronPoller = ServerHandshakePollers.IPC(aeronDriver, config, server)
val ipcPoller: AeronPoller = ServerHandshakePollers.ipc(aeronDriver, config, server)
// if we are binding to WILDCARD, then we have to do something special if BOTH IPv4 and IPv6 are enabled!
val isWildcard = listenIPv4Address == IPv4.WILDCARD || listenIPv6Address == IPv6.WILDCARD
@ -265,11 +269,9 @@ open class Server<CONNECTION : Connection>(
}
actionDispatch.launch {
try {
mutex.unlock()
} catch (ignored: Exception) {}
val networkEventProcessor = Runnable {
pollStartupLatch.countDown()
val pollIdleStrategy = config.pollIdleStrategy
try {
@ -301,7 +303,10 @@ open class Server<CONNECTION : Connection>(
removeConnection(connection)
// this will call removeConnection again, but that is ok
connection.close()
runBlocking {
// this is blocking, because the connection MUST be removed in the same thread that is processing events
connection.close()
}
// have to manually notify the server-listenerManager that this connection was closed
// if the connection was MANUALLY closed (via calling connection.close()), then the connection-listenermanager is
@ -319,7 +324,7 @@ open class Server<CONNECTION : Connection>(
}
// we want to process **actual** close cleanup events on this thread as well, otherwise we will have threading problems
shutdownPollMutex.withLock { }
shutdownPollLatch.await()
// we have to manually cleanup the connections and call server-notifyDisconnect because otherwise this will never get called
val jobs = mutableListOf<Job>()
@ -330,7 +335,7 @@ open class Server<CONNECTION : Connection>(
connections.clear()
cons.forEach { connection ->
logger.error { "[${connection.id}] connection cleanup and close" }
logger.info { "[${connection.id}/${connection.streamId}] Connection cleanup and close" }
// make sure the connection is closed (close can only happen once, so a duplicate call does nothing!)
connection.close()
@ -349,7 +354,9 @@ open class Server<CONNECTION : Connection>(
// when we close a client or a server, we want to make sure that ALL notifications are finished.
// when it's just a connection getting closed, we don't care about this. We only care when it's "global" shutdown
jobs.forEach { it.join() }
runBlocking {
jobs.forEach { it.join() }
}
} catch (e: Exception) {
logger.error(e) { "Unexpected error during server message polling!" }
} finally {
@ -369,13 +376,14 @@ open class Server<CONNECTION : Connection>(
// finish closing -- this lets us make sure that we don't run into race conditions on the thread that calls close()
try {
shutdownEventMutex.unlock()
shutdownEventLatch.countDown()
} catch (ignored: Exception) {}
}
}
config.networkInterfaceEventDispatcher.submit(networkEventProcessor)
// wait for the polling coroutine to startup before letting bind() return
mutex.withLock { }
// wait for the polling thread to startup before letting bind() return
pollStartupLatch.await()
}
/**
@ -413,14 +421,10 @@ open class Server<CONNECTION : Connection>(
//
// Aeron + the Media Driver will have already been shutdown at this point.
if (bindAlreadyCalled.getAndSet(false)) {
runBlocking {
// These are run in lock-step
try {
shutdownPollMutex.unlock()
} catch (ignored: Exception) {}
shutdownEventMutex.withLock { }
}
// These are run in lock-step
shutdownPollLatch.countDown()
shutdownEventLatch.await()
}
}

View File

@ -155,12 +155,12 @@ class AeronContext(
}
// this is incompatible with IPC, and will not be set if IPC is enabled
if (config.aeronDirectoryForceUnique && isRunning) {
if (config.uniqueAeronDirectory && isRunning) {
val savedParent = aeronDir.parentFile
var retry = 0
val retryMax = 100
while (config.aeronDirectoryForceUnique && isRunning) {
while (config.uniqueAeronDirectory && isRunning) {
if (retry++ > retryMax) {
throw IllegalArgumentException("Unable to force unique aeron Directory. Tried $retryMax times and all tries were in use.")
}
@ -185,7 +185,7 @@ class AeronContext(
}
}
logger.info { "Aeron directory: '${context.aeronDirectory()}'" }
logger.info { "Aeron directory: '$aeronDir'" }
this.context = context
}

View File

@ -11,9 +11,7 @@ import io.aeron.Publication
import io.aeron.Subscription
import io.aeron.driver.MediaDriver
import io.aeron.samples.SamplesUtil
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.atomicfu.atomic
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
@ -61,7 +59,10 @@ class AeronDriver(
private const val AERON_PUBLICATION_LINGER_TIMEOUT = 5_000L // in MS
// prevents multiple instances, within the same JVM, from starting at the exact same time.
private val mutex = Mutex()
// since this is "global" and cannot be run in parallel, we DO NOT use coroutines!
private val lock = arrayOf(0)
private val mediaDriverUsageCount = atomic(0)
private val aeronClientUsageCount = atomic(0)
private fun setConfigDefaults(config: Configuration, logger: KLogger) {
// explicitly don't set defaults if we already have the context defined!
@ -249,8 +250,8 @@ class AeronDriver(
*
* @return true if we are successfully connected to the aeron client
*/
suspend fun start(): Boolean {
mutex.withLock {
fun start(): Boolean {
synchronized(lock) {
val mediaDriverLoaded = mediaDriverWasAlreadyRunning || mediaDriver != null
val isLoaded = mediaDriverLoaded && aeron != null && aeron?.isClosed == false
if (isLoaded) {
@ -266,7 +267,7 @@ class AeronDriver(
// SOMETIMES aeron is in the middle of shutting down, and this prevents us from trying to connect to
// that instance
logger.debug { "Aeron Media driver already running. Double checking status..." }
delay(context.driverTimeout/2)
sleep(context.driverTimeout/2)
running = isRunning()
}
@ -279,10 +280,11 @@ class AeronDriver(
try {
mediaDriver = MediaDriver.launch(context.context)
logger.debug { "Started the Aeron Media driver." }
mediaDriverUsageCount.getAndIncrement()
break
} catch (e: Exception) {
logger.warn(e) { "Unable to start the Aeron Media driver at ${context.driverDirectory}. Retrying $count more times..." }
delay(context.driverTimeout)
sleep(context.driverTimeout)
}
}
} else {
@ -326,11 +328,12 @@ class AeronDriver(
// this might succeed if we can connect to the media driver
aeron = Aeron.connect(aeronDriverContext)
logger.debug { "Connected to Aeron driver." }
aeronClientUsageCount.getAndIncrement()
return true
}
suspend fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication {
fun addPublication(publicationUri: ChannelUriStringBuilder, streamId: Int): Publication {
val uri = publicationUri.build()
// reasons we cannot add a pub/sub to aeron
@ -363,7 +366,7 @@ class AeronDriver(
return publication
}
suspend fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription {
fun addSubscription(subscriptionUri: ChannelUriStringBuilder, streamId: Int): Subscription {
val uri = subscriptionUri.build()
// reasons we cannot add a pub/sub to aeron
@ -391,7 +394,7 @@ class AeronDriver(
val subscription = aeron1.addSubscription(uri, streamId)
if (subscription == null) {
// there was an error connecting to the aeron client or media driver.
val ex = ClientRetryException("Error adding a subscript to the remote endpoint")
val ex = ClientRetryException("Error adding a subscription to the remote endpoint")
ListenerManager.cleanAllStackTrace(ex)
throw ex
}
@ -408,16 +411,30 @@ class AeronDriver(
return context.isRunning()
}
/**
* A safer way to try to close the media driver if in the ENTIRE JVM, our process is the only one using aeron with it's specific configuration
*
* NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
*/
fun closeIfSingle() {
if (aeronClientUsageCount.value == 1 && mediaDriverUsageCount.value == 1) {
close()
}
}
/**
* A safer way to try to close the media driver
*
* NOTE: We must be *super* careful trying to delete directories, because if we have multiple AERON/MEDIA DRIVERS connected to the
* same directory, deleting the directory will cause any other aeron connection to fail! (which makes sense).
*/
suspend fun close() {
mutex.withLock {
fun close() {
synchronized(lock) {
try {
aeron?.close()
aeronClientUsageCount.getAndDecrement()
} catch (e: Exception) {
logger.error(e) { "Error stopping aeron." }
}
@ -445,6 +462,7 @@ class AeronDriver(
// if we are the ones that started the media driver, then we must be the ones to close it
try {
mediaDriverUsageCount.getAndDecrement()
mediaDriver!!.close()
} catch (e: Exception) {
logger.error(e) { "Error closing the Aeron media driver" }
@ -475,8 +493,6 @@ class AeronDriver(
// make sure the context is also closed.
context.close()
context_ = null
try {
val deletedAeron = context.driverDirectory.deleteRecursively()
if (!deletedAeron) {
@ -485,6 +501,8 @@ class AeronDriver(
} catch (e: Exception) {
logger.error(e) { "Error deleting Aeron directory at: ${context.driverDirectory}"}
}
context_ = null
}
}

View File

@ -49,14 +49,14 @@ class BacklogStat
*/
fun snapshot(): Map<StreamCompositeKey, StreamBacklog> {
val streams: MutableMap<StreamCompositeKey, StreamBacklog> = HashMap()
counters.forEach { counterId: Int, typeId: Int, keyBuffer: DirectBuffer, label: String? ->
counters.forEach { counterId: Int, typeId: Int, keyBuffer: DirectBuffer, _: String? ->
if (typeId >= PublisherLimit.PUBLISHER_LIMIT_TYPE_ID && typeId <= ReceiverPos.RECEIVER_POS_TYPE_ID || typeId == SenderLimit.SENDER_LIMIT_TYPE_ID || typeId == PerImageIndicator.PER_IMAGE_TYPE_ID || typeId == PublisherPos.PUBLISHER_POS_TYPE_ID) {
val key = StreamCompositeKey(
keyBuffer.getInt(StreamCounter.SESSION_ID_OFFSET),
keyBuffer.getInt(StreamCounter.STREAM_ID_OFFSET),
keyBuffer.getStringAscii(StreamCounter.CHANNEL_OFFSET)
)
val streamBacklog = streams.computeIfAbsent(key) { ignore: StreamCompositeKey? -> StreamBacklog() }
val streamBacklog = streams.computeIfAbsent(key) { _: StreamCompositeKey? -> StreamBacklog() }
val registrationId = keyBuffer.getLong(StreamCounter.REGISTRATION_ID_OFFSET)
val value = counters.getCounterValue(counterId)
when (typeId) {
@ -215,14 +215,14 @@ class BacklogStat
/**
* {@inheritDoc}
*/
override fun equals(o: Any?): Boolean {
if (this === o) {
override fun equals(other: Any?): Boolean {
if (this === other) {
return true
}
if (o !is StreamCompositeKey) {
if (other !is StreamCompositeKey) {
return false
}
val that = o
val that = other
return sessionId == that.sessionId && streamId == that.streamId && channel == that.channel
}

View File

@ -20,8 +20,8 @@ import dorkbox.network.connection.ListenerManager
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
import java.lang.Thread.sleep
import java.util.concurrent.*
/**
@ -50,7 +50,7 @@ internal open class IpcMediaDriverConnection(
* @throws ClientRetryException if we need to retry to connect
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
override fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
// Create a publication at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
@ -86,7 +86,7 @@ internal open class IpcMediaDriverConnection(
break
}
delay(500L)
sleep(500L)
}
if (!success) {
@ -109,7 +109,7 @@ internal open class IpcMediaDriverConnection(
break
}
delay(500L)
sleep(500L)
}
if (!success) {
@ -131,7 +131,7 @@ internal open class IpcMediaDriverConnection(
*
* serverAddress is ignored for IPC
*/
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()

View File

@ -30,8 +30,10 @@ abstract class MediaDriverConnection(val publicationPort: Int, val subscriptionP
lateinit var publication: Publication
abstract suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger)
abstract suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false)
// We don't use 'suspend' for these, because we have to pump events from a NORMAL thread. If there are any suspend points, there is
// the potential for a live-lock due to coroutine scheduling
abstract fun buildClient(aeronDriver: AeronDriver, logger: KLogger)
abstract fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean = false)
abstract val clientInfo : String
abstract val serverInfo : String

View File

@ -22,8 +22,8 @@ import dorkbox.network.exceptions.ClientException
import dorkbox.network.exceptions.ClientRetryException
import dorkbox.network.exceptions.ClientTimedOutException
import io.aeron.ChannelUriStringBuilder
import kotlinx.coroutines.delay
import mu.KLogger
import java.lang.Thread.sleep
import java.net.Inet4Address
import java.net.InetAddress
import java.util.concurrent.*
@ -68,7 +68,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
* @throws ClientTimedOutException if we cannot connect to the server in the designated time
*/
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
override fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
val aeronAddressString = if (address is Inet4Address) {
address.hostAddress
} else {
@ -96,7 +96,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.controlEndpoint("$aeronAddressString:$subscriptionPort")
// .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC)
// .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC) // this allows us to listen for message from a different network without port forwarding
if (logger.isTraceEnabled) {
logger.trace("client sub URI: $ipType ${subscriptionUri.build()}")
@ -118,7 +118,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
break
}
delay(500L)
sleep(500L)
}
if (!success) {
@ -140,7 +140,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
break
}
delay(500L)
sleep(500L)
}
if (!success) {
@ -168,7 +168,7 @@ internal class UdpMediaDriverClientConnection(val address: InetAddress,
}
}
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
throw ClientException("Server info not implemented in Client MediaDriver Connection")
}
override val serverInfo: String

View File

@ -62,21 +62,22 @@ internal open class UdpMediaDriverServerConnection(val listenAddress: InetAddres
}
@Suppress("DuplicatedCode")
override suspend fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
override fun buildClient(aeronDriver: AeronDriver, logger: KLogger) {
throw ServerException("Client info not implemented in Server MediaDriver Connection")
}
override suspend fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
override fun buildServer(aeronDriver: AeronDriver, logger: KLogger, pairConnection: Boolean) {
val connectionString = aeronConnectionString(listenAddress)
// Create a publication with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
// Note: The Aeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.
val publicationUri = uri()
.controlEndpoint("$connectionString:$publicationPort")
.controlEndpoint("$connectionString:$publicationPort")
// .controlMode(CommonContext.MDC_CONTROL_MODE_DYNAMIC) // this allows us to listen for message from a different network without port forwarding
// Create a subscription with a control port (for dynamic MDC) at the given address and port, using the given stream ID.
val subscriptionUri = uri()
.endpoint("$connectionString:$subscriptionPort")
.endpoint("$connectionString:$subscriptionPort")

View File

@ -29,9 +29,8 @@ import io.aeron.Subscription
import io.aeron.logbuffer.Header
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.getAndUpdate
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.agrona.DirectBuffer
import java.lang.Thread.sleep
import java.net.InetAddress
import java.util.concurrent.*
@ -91,7 +90,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
private val isClosed = atomic(false)
// enableNotifyDisconnect : we don't always want to enable notifications on disconnect
internal var closeAction: suspend (enableNotifyDisconnect: Boolean) -> Unit = {}
internal var closeAction: (enableNotifyDisconnect: Boolean) -> Unit = {}
// only accessed on a single thread!
private var connectionLastCheckTimeNanos = 0L
@ -216,7 +215,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
*
* @return true if the message was successfully sent, false otherwise. Exceptions are caught and NOT rethrown!
*/
suspend fun send(message: Any): Boolean {
fun send(message: Any): Boolean {
messagesInProgress.getAndIncrement()
val success = endPoint.send(message, publication, this)
messagesInProgress.getAndDecrement()
@ -224,17 +223,6 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
return success
}
/**
* Safely sends objects to a destination.
*
* @return true if the message was successfully sent by aeron
*/
fun sendBlocking(message: Any): Boolean {
return runBlocking {
send(message)
}
}
/**
* Sends a "ping" packet to measure **ROUND TRIP** time to the remote connection.
*
@ -336,22 +324,15 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
/**
* Closes the connection, and removes all connection specific listeners
*/
suspend fun close() {
fun close() {
close(enableRemove = true,
enableNotifyDisconnect = true)
}
internal fun closeBlocking() {
runBlocking {
close()
}
}
/**
* Closes the connection, and removes all connection specific listeners
*/
internal suspend fun close(enableRemove: Boolean, enableNotifyDisconnect: Boolean) {
internal fun close(enableRemove: Boolean, enableNotifyDisconnect: Boolean) {
// there are 2 ways to call close.
// MANUALLY
// When a connection is disconnected via a timeout/expire.
@ -372,7 +353,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
// we do not want to close until AFTER all publications have been sent. Calling this WITHOUT waiting will instantly stop everything
// we want a timeout-check, otherwise this will run forever
while (messagesInProgress.value != 0 && System.nanoTime() - closeTimeoutTime < timoutInNanos) {
delay(50)
sleep(50)
}
// on close, we want to make sure this file is DELETED!
@ -385,7 +366,7 @@ open class Connection(connectionParameters: ConnectionParams<*>) {
if (logFile.delete()) {
break
}
delay(100)
sleep(100)
}
if (logFile.exists()) {

View File

@ -40,7 +40,7 @@ import javax.crypto.spec.SecretKeySpec
/**
* Management for all of the crypto stuff used
* Management for all the crypto stuff used
*/
internal class CryptoManagement(val logger: KLogger,
private val settingsStore: SettingsStore,
@ -87,7 +87,7 @@ internal class CryptoManagement(val logger: KLogger,
try {
// seed our RNG based off of this and create our ECC keys
val seedBytes = Entropy["There are no ECC keys for the ${type.simpleName} yet"]
logger.info("Now generating ECC ($curve25519) keys. Please wait!")
logger.debug("Now generating ECC ($curve25519) keys. Please wait!")
secureRandom.nextBytes(seedBytes)

View File

@ -21,7 +21,6 @@ import dorkbox.network.Server
import dorkbox.network.ServerConfiguration
import dorkbox.network.aeron.AeronDriver
import dorkbox.network.aeron.BacklogStat
import dorkbox.network.aeron.CoroutineIdleStrategy
import dorkbox.network.connection.streaming.StreamingControl
import dorkbox.network.connection.streaming.StreamingData
import dorkbox.network.connection.streaming.StreamingManager
@ -47,12 +46,12 @@ import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import mu.KLogger
import mu.KotlinLogging
import org.agrona.DirectBuffer
import org.agrona.MutableDirectBuffer
import org.agrona.concurrent.IdleStrategy
import java.util.concurrent.*
fun CoroutineScope.eventLoop(block: suspend CoroutineScope.() -> Unit): Job {
// UNDISPATCHED means that this coroutine will start as an event loop, instead of concurrently in a different thread
@ -136,8 +135,8 @@ internal constructor(val type: Class<*>,
private val handshakeKryo: KryoExtra<CONNECTION>
internal val sendIdleStrategy: CoroutineIdleStrategy
internal val pollIdleStrategy: CoroutineIdleStrategy
internal val sendIdleStrategy: IdleStrategy
internal val pollIdleStrategy: IdleStrategy
/**
* Crypto and signature management
@ -147,7 +146,7 @@ internal constructor(val type: Class<*>,
private val shutdown = atomic(false)
@Volatile
private var shutdownMutex = Mutex(locked = true)
private var shutdownLatch = CountDownLatch(1)
/**
* Returns the storage used by this endpoint. This is the backing data structure for key/value pairs, and can be a database, file, etc
@ -208,7 +207,7 @@ internal constructor(val type: Class<*>,
/**
* Only starts the media driver if we are NOT already running!
*/
suspend fun init() {
fun init() {
aeronDriver.start()
}
@ -216,9 +215,9 @@ internal constructor(val type: Class<*>,
/**
* @throws Exception if there is a problem starting the media driver
*/
internal suspend fun initEndpointState() {
internal fun initEndpointState() {
shutdown.getAndSet(false)
shutdownMutex = Mutex(locked = true)
shutdownLatch = CountDownLatch(1)
init()
}
@ -377,7 +376,7 @@ internal constructor(val type: Class<*>,
* @return true if the message was successfully sent by aeron
*/
@Suppress("DuplicatedCode")
internal suspend fun writeHandshakeMessage(publication: Publication, aeronLogInfo: String, message: HandshakeMessage) {
internal fun writeHandshakeMessage(publication: Publication, aeronLogInfo: String, message: HandshakeMessage) {
// The handshake sessionId IS NOT globally unique
logger.trace { "[$aeronLogInfo - ${message.connectKey}] send HS: $message" }
@ -570,7 +569,7 @@ internal constructor(val type: Class<*>,
* @return true if the message was successfully sent by aeron, false otherwise. Exceptions are caught and NOT rethrown!
*/
@Suppress("DuplicatedCode", "UNCHECKED_CAST")
internal suspend fun send(message: Any, publication: Publication, connection: Connection): Boolean {
internal fun send(message: Any, publication: Publication, connection: Connection): Boolean {
// The handshake sessionId IS NOT globally unique
logger.trace {
"[${publication.sessionId()}] send: ${message.javaClass.simpleName} : $message"
@ -620,7 +619,7 @@ internal constructor(val type: Class<*>,
}
// the actual bits that send data on the network.
internal suspend fun sendData(publication: Publication, internalBuffer: MutableDirectBuffer, offset: Int, objectSize: Int, connection: CONNECTION): Boolean {
internal fun sendData(publication: Publication, internalBuffer: MutableDirectBuffer, offset: Int, objectSize: Int, connection: CONNECTION): Boolean {
var result: Long
while (true) {
result = publication.offer(internalBuffer, offset, objectSize)
@ -724,8 +723,8 @@ internal constructor(val type: Class<*>,
/**
* Waits for this endpoint to be closed
*/
suspend fun waitForClose() {
shutdownMutex.withLock { }
fun waitForClose() {
shutdownLatch.await()
}
final override fun close() {
@ -733,13 +732,11 @@ internal constructor(val type: Class<*>,
logger.info { "Shutting down..." }
runBlocking {
aeronDriver.close()
// the server has to be able to call server.notifyDisconnect() on a list of connections. If we remove the connections
// inside of connection.close(), then the server does not have a list of connections to call the global notifyDisconnect()
val enableRemove = type == Client::class.java
connections.forEach {
logger.info { "Closing connection: ${it.id}" }
logger.info { "[${it.id}/${it.streamId}] Closing connection" }
it.close(enableRemove, true)
}
@ -754,9 +751,11 @@ internal constructor(val type: Class<*>,
close0()
aeronDriver.close()
// if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now)
try {
shutdownMutex.unlock()
shutdownLatch.countDown()
} catch (ignored: Exception) {}
logger.info { "Done shutting down..." }

View File

@ -21,6 +21,7 @@ import dorkbox.os.OS
import dorkbox.util.classes.ClassHelper
import dorkbox.util.classes.ClassHierarchy
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import mu.KLogger
@ -361,14 +362,16 @@ internal class ListenerManager<CONNECTION: Connection>(private val logger: KLogg
/**
* Invoked when a connection is first initialized, but BEFORE it's connected to the remote address.
*/
suspend fun notifyInit(connection: CONNECTION) {
onInitList.value.forEach {
try {
it(connection)
} catch (t: Throwable) {
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
cleanStackTrace(t)
logger.error("Connection ${connection.id} error", t)
fun notifyInit(connection: CONNECTION) {
runBlocking {
onInitList.value.forEach {
try {
it(connection)
} catch (t: Throwable) {
// NOTE: when we remove stuff, we ONLY want to remove the "tail" of the stacktrace, not ALL parts of the stacktrace
cleanStackTrace(t)
logger.error("Connection ${connection.id} error", t)
}
}
}
}

View File

@ -204,7 +204,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
}
}
private suspend fun sendFailMessageAndThrow(
private fun sendFailMessageAndThrow(
e: Exception,
streamSessionId: Long,
publication: Publication,
@ -244,7 +244,7 @@ internal class StreamingManager<CONNECTION : Connection>(private val logger: KLo
* @param internalBuffer this is the ORIGINAL object data that is to be "chunked" and sent across the wire
* @return true if ALL the message chunks were successfully sent by aeron, false otherwise. Exceptions are caught and rethrown!
*/
suspend fun send(
fun send(
publication: Publication,
internalBuffer: MutableDirectBuffer,
objectSize: Int,

View File

@ -26,9 +26,9 @@ import dorkbox.network.exceptions.ServerException
import io.aeron.FragmentAssembler
import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import kotlinx.coroutines.delay
import mu.KLogger
import org.agrona.DirectBuffer
import java.lang.Thread.sleep
import java.util.concurrent.*
internal class ClientHandshake<CONNECTION: Connection>(
@ -158,7 +158,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
// called from the connect thread
suspend fun hello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo {
fun hello(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) : ClientConnectionInfo {
failedException = null
connectKey = getSafeConnectKey()
val publicKey = endPoint.storage.getPublicKey()!!
@ -226,7 +226,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
}
// called from the connect thread
suspend fun done(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) {
fun done(handshakeConnection: MediaDriverConnection, connectionTimeoutSec: Int) {
val registrationMessage = HandshakeMessage.doneFromClient(connectKey)
val aeronLogInfo = "${handshakeConnection.sessionId}/${handshakeConnection.streamId}"
@ -264,7 +264,7 @@ internal class ClientHandshake<CONNECTION: Connection>(
startTime = System.nanoTime()
}
delay(100)
sleep(100L)
// 0 means we idle. >0 means reset and don't idle (because there are likely more)
pollIdleStrategy.idle(pollCount)

View File

@ -29,6 +29,7 @@ import dorkbox.network.exceptions.AllocationException
import io.aeron.Publication
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KLogger
import net.jodah.expiringmap.ExpirationPolicy
import net.jodah.expiringmap.ExpiringMap
@ -54,7 +55,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
.expirationListener<Long, CONNECTION> { clientConnectKey, connection ->
// this blocks until it fully runs (which is ok. this is fast)
logger.error { "[${clientConnectKey} Connection (${connection.id}) Timed out waiting for registration response from client" }
connection.closeBlocking()
connection.close()
}
.build<Long, CONNECTION>()
@ -69,7 +70,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
* @return true if we should continue parsing the incoming message, false if we should abort
*/
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD. ONLY RESPONSES ARE ON ACTION DISPATCH!
private suspend fun validateMessageTypeAndDoPending(
private fun validateMessageTypeAndDoPending(
server: Server<CONNECTION>,
actionDispatch: CoroutineScope,
handshakePublication: Publication,
@ -114,11 +115,14 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// called on connection.close()
existingConnection.closeAction = { enableNotifyDisconnect ->
// clean up the resources associated with this connection when it's closed
logger.debug { "[$existingAeronLogInfo] Freeing resources" }
logger.debug { "[$existingAeronLogInfo] freeing resources" }
existingConnection.cleanup(connectionsPerIpCounts, sessionIdAllocator, streamIdAllocator)
if (enableNotifyDisconnect) {
existingConnection.doNotifyDisconnect()
// this always has to be on event dispatch, otherwise we can have weird logic loops if we reconnect within a disconnect callback
actionDispatch.launch {
existingConnection.doNotifyDisconnect()
}
}
}
@ -151,7 +155,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
* @return true if we should continue parsing the incoming message, false if we should abort
*/
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
private suspend fun validateUdpConnectionInfo(
private fun validateUdpConnectionInfo(
server: Server<CONNECTION>,
handshakePublication: Publication,
config: ServerConfiguration,
@ -209,7 +213,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
suspend fun processIpcHandshakeMessageServer(
fun processIpcHandshakeMessageServer(
server: Server<CONNECTION>,
handshakePublication: Publication,
message: HandshakeMessage,
@ -356,7 +360,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
}
// note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
suspend fun processUdpHandshakeMessageServer(
fun processUdpHandshakeMessageServer(
server: Server<CONNECTION>,
handshakePublication: Publication,
remoteIpAndPort: String,
@ -498,7 +502,7 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
connectionsPerIpCounts.decrementSlow(clientAddress)
sessionIdAllocator.free(connectionSessionId)
streamIdAllocator.free(connectionStreamId)
connection.closeBlocking()
connection.close()
logger.error { "[$aeronLogInfo] Connection $clientAddressString was not permitted!" }
@ -572,11 +576,13 @@ internal class ServerHandshake<CONNECTION : Connection>(private val logger: KLog
*
* note: CANNOT be called in action dispatch. ALWAYS ON SAME THREAD
*/
suspend fun clear() {
pendingConnections.forEach { (k, v) ->
v.close()
}
fun clear() {
runBlocking {
pendingConnections.forEach { (_, v) ->
v.close()
}
pendingConnections.clear()
pendingConnections.clear()
}
}
}

View File

@ -12,7 +12,6 @@ import dorkbox.network.connection.Connection
import io.aeron.FragmentAssembler
import io.aeron.Image
import io.aeron.logbuffer.Header
import kotlinx.coroutines.runBlocking
import org.agrona.DirectBuffer
internal object ServerHandshakePollers {
@ -24,7 +23,7 @@ internal object ServerHandshakePollers {
}
}
suspend fun <CONNECTION : Connection> IPC(
fun <CONNECTION : Connection> ipc(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>): AeronPoller
@ -56,7 +55,6 @@ internal object ServerHandshakePollers {
val message = server.readHandshakeMessage(buffer, offset, length, header, aeronLogInfo)
runBlocking {
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
if (message !is HandshakeMessage) {
logger.error { "[$aeronLogInfo] Connection from IPC not allowed! Invalid connection request" }
@ -73,7 +71,6 @@ internal object ServerHandshakePollers {
aeronDriver, aeronLogInfo,
connectionFunc, logger
)
}
}
}
@ -97,7 +94,7 @@ internal object ServerHandshakePollers {
suspend fun <CONNECTION : Connection> ip4(
fun <CONNECTION : Connection> ip4(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>): AeronPoller
@ -145,30 +142,28 @@ internal object ServerHandshakePollers {
val message = server.readHandshakeMessage(buffer, offset, length, header, aeronLogInfo)
runBlocking {
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
if (message !is HandshakeMessage) {
logger.error {
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
if (message !is HandshakeMessage) {
logger.error {
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
"[$aeronLogInfo] Connection from $clientAddressString not allowed! Invalid connection request"
}
try {
server.writeHandshakeMessage(publication, aeronLogInfo,
HandshakeMessage.error("Invalid connection request"))
} catch (e: Exception) {
logger.error(e) { "[$aeronLogInfo] Handshake error!" }
}
} else {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message,
aeronDriver, aeronLogInfo, false,
connectionFunc, logger
)
"[$aeronLogInfo] Connection from $clientAddressString not allowed! Invalid connection request"
}
try {
server.writeHandshakeMessage(publication, aeronLogInfo,
HandshakeMessage.error("Invalid connection request"))
} catch (e: Exception) {
logger.error(e) { "[$aeronLogInfo] Handshake error!" }
}
} else {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message,
aeronDriver, aeronLogInfo, false,
connectionFunc, logger
)
}
}
@ -190,7 +185,7 @@ internal object ServerHandshakePollers {
return poller
}
suspend fun <CONNECTION : Connection> ip6(
fun <CONNECTION : Connection> ip6(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>): AeronPoller
@ -237,31 +232,29 @@ internal object ServerHandshakePollers {
val message = server.readHandshakeMessage(buffer, offset, length, header, aeronLogInfo)
runBlocking {
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
if (message !is HandshakeMessage) {
logger.error {
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
"[$sessionId] Connection from $clientAddressString not allowed! Invalid connection request"
}
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
if (message !is HandshakeMessage) {
logger.error {
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
"[$sessionId] Connection from $clientAddressString not allowed! Invalid connection request"
}
try {
server.writeHandshakeMessage(publication, aeronLogInfo,
HandshakeMessage.error("Invalid connection request"))
} catch (e: Exception) {
logger.error(e) { "[$aeronLogInfo] Handshake error!" }
}
}
else {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message,
aeronDriver, aeronLogInfo, false,
connectionFunc, logger
)
try {
server.writeHandshakeMessage(publication, aeronLogInfo,
HandshakeMessage.error("Invalid connection request"))
} catch (e: Exception) {
logger.error(e) { "[$aeronLogInfo] Handshake error!" }
}
}
else {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message,
aeronDriver, aeronLogInfo, false,
connectionFunc, logger
)
}
}
override fun poll(): Int {
@ -282,7 +275,7 @@ internal object ServerHandshakePollers {
return poller
}
suspend fun <CONNECTION : Connection> ip6Wildcard(
fun <CONNECTION : Connection> ip6Wildcard(
aeronDriver: AeronDriver,
config: ServerConfiguration,
server: Server<CONNECTION>
@ -329,29 +322,27 @@ internal object ServerHandshakePollers {
val message = server.readHandshakeMessage(buffer, offset, length, header, aeronLogInfo)
runBlocking {
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
if (message !is HandshakeMessage) {
logger.error {
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
"[$aeronLogInfo] Connection from $clientAddressString not allowed! Invalid connection request"
}
try {
server.writeHandshakeMessage(publication, aeronLogInfo,
HandshakeMessage.error("Invalid connection request"))
} catch (e: Exception) {
logger.error(e) { "[$aeronLogInfo] Handshake error!" }
}
} else {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message,
aeronDriver, aeronLogInfo, true,
connectionFunc, logger
)
// VALIDATE:: a Registration object is the only acceptable message during the connection phase
if (message !is HandshakeMessage) {
logger.error {
// split
val splitPoint = remoteIpAndPort.lastIndexOf(':')
val clientAddressString = remoteIpAndPort.substring(0, splitPoint)
"[$aeronLogInfo] Connection from $clientAddressString not allowed! Invalid connection request"
}
try {
server.writeHandshakeMessage(publication, aeronLogInfo,
HandshakeMessage.error("Invalid connection request"))
} catch (e: Exception) {
logger.error(e) { "[$aeronLogInfo] Handshake error!" }
}
} else {
handshake.processUdpHandshakeMessageServer(
server, publication, remoteIpAndPort, message,
aeronDriver, aeronLogInfo, true,
connectionFunc, logger
)
}
}

4
storage.db Normal file
View File

@ -0,0 +1,4 @@
#Storage Version: 0
#Tue Mar 08 23:18:08 CET 2022
__VERSION__=0
_salt=2JWC63svaJ4K3R6XYESDTeMVeMhcVgVtJ5dJ2rjgtAe8

View File

@ -111,6 +111,7 @@ object AeronClient {
configuration.subscriptionPort = 2000
configuration.publicationPort = 2001
configuration.enableIpc = false
configuration.uniqueAeronDirectory = true
val client = Client<Connection>(configuration)

View File

@ -71,7 +71,7 @@ class DisconnectReconnectTest : BaseTest() {
fun reconnectClientViaClientClose() {
run {
val configuration = serverConfig() {
aeronDirectoryForceUnique = true
uniqueAeronDirectory = true
}
val server: Server<Connection> = Server(configuration)
@ -81,7 +81,7 @@ class DisconnectReconnectTest : BaseTest() {
run {
val config = clientConfig() {
aeronDirectoryForceUnique = true
uniqueAeronDirectory = true
}
val client: Client<Connection> = Client(config)
@ -324,7 +324,7 @@ class DisconnectReconnectTest : BaseTest() {
run {
val config = serverConfig()
config.enableIpc = false
config.aeronDirectoryForceUnique = true
config.uniqueAeronDirectory = true
server = Server(config)
addEndPoint(server)
@ -339,7 +339,7 @@ class DisconnectReconnectTest : BaseTest() {
run {
val config = clientConfig()
config.enableIpc = false
config.aeronDirectoryForceUnique = true
config.uniqueAeronDirectory = true
client = Client(config)
addEndPoint(client)

View File

@ -163,8 +163,8 @@ class ListenerTest : BaseTest() {
Assert.assertTrue(overrideCheck.value)
Assert.assertTrue(serverOnMessage.value)
Assert.assertEquals(serverConnectionOnMessage.value, limitCheck)
Assert.assertEquals(serverDisconnectMessage.value, 1)
Assert.assertEquals(limitCheck, serverConnectionOnMessage.value)
Assert.assertEquals(1, serverDisconnectMessage.value)
Assert.assertTrue(serverConnect.value)
Assert.assertTrue(serverDisconnect.value)
Assert.assertTrue(clientConnect.value)

View File

@ -70,7 +70,7 @@ object TestClient {
config.settingsStore = Storage.Memory() // don't want to persist anything on disk!
config.enableRemoteSignatureValidation = false
config.enableIpc = false
config.aeronDirectoryForceUnique = true
config.uniqueAeronDirectory = true
val client = Client<Connection>(config)