Changed how internal storage operates (major performance improvement), added more types of storage. Set the default to PropertyStore (so settings are saved in a property file)
This commit is contained in:
parent
50783a7ae9
commit
dc30c18e97
|
@ -24,9 +24,9 @@ import dorkbox.network.aeron.CoroutineSleepingMillisIdleStrategy
|
|||
import dorkbox.network.exceptions.ClientException
|
||||
import dorkbox.network.exceptions.ServerException
|
||||
import dorkbox.network.serialization.Serialization
|
||||
import dorkbox.network.storage.SettingsStore
|
||||
import dorkbox.network.storage.StorageType
|
||||
import dorkbox.network.storage.types.PropertyStore
|
||||
import dorkbox.os.OS
|
||||
import dorkbox.util.storage.StorageSystem
|
||||
import io.aeron.driver.Configuration
|
||||
import io.aeron.driver.MediaDriver
|
||||
import io.aeron.driver.ThreadingMode
|
||||
|
@ -130,11 +130,6 @@ open class Configuration {
|
|||
internal const val errorMessage = "Cannot set a property after the configuration context has been created!"
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal property that prohibits changing values after this configuration has been validated
|
||||
*/
|
||||
internal var isValidated = false
|
||||
|
||||
/**
|
||||
* Enables the ability to use the IPv4 network stack.
|
||||
*/
|
||||
|
@ -224,11 +219,17 @@ open class Configuration {
|
|||
}
|
||||
|
||||
/**
|
||||
* Allows the end user to change how endpoint settings are stored.
|
||||
* Allows the user to change how endpoint settings and public key information are saved.
|
||||
*
|
||||
* For example, a custom database instead of the default, in-memory storage. Another built-in option is StorageSystem.Disk()
|
||||
* For example, a custom database instead of the default, in-memory storage.
|
||||
*
|
||||
* Included types are:
|
||||
* * ChronicleMapStore.type(file) -- high performance, but non-transactional and not recommended to be shared
|
||||
* * LmdbStore.type(file) -- high performance, ACID, and can be shared
|
||||
* * MemoryStore.type() -- v. high performance, but not persistent
|
||||
* * PropertyStore.type(file) -- slow performance on write, but can easily be edited by user (similar to how openSSH server key info is)
|
||||
*/
|
||||
var settingsStore: SettingsStore = SettingsStore(StorageSystem.Memory())
|
||||
var settingsStore: StorageType = PropertyStore.type("settings.db")
|
||||
set(value) {
|
||||
require(context == null) { errorMessage }
|
||||
field = value
|
||||
|
@ -398,8 +399,15 @@ open class Configuration {
|
|||
/**
|
||||
* Internal property that tells us if this configuration has already been configured and used to create and start the Media Driver
|
||||
*/
|
||||
@Volatile
|
||||
internal var context: MediaDriver.Context? = null
|
||||
|
||||
/**
|
||||
* Internal property that tells us if this configuration has already been used in an endpoint
|
||||
*/
|
||||
@Volatile
|
||||
internal var previouslyUsed = false
|
||||
|
||||
/**
|
||||
* Depending on the OS, different base locations for the Aeron log directory are preferred.
|
||||
*/
|
||||
|
|
|
@ -34,7 +34,6 @@ import dorkbox.network.serialization.Serialization
|
|||
import dorkbox.network.storage.SettingsStore
|
||||
import dorkbox.util.NamedThreadFactory
|
||||
import dorkbox.util.exceptions.SecurityException
|
||||
import dorkbox.util.storage.StorageSystem
|
||||
import io.aeron.Aeron
|
||||
import io.aeron.Publication
|
||||
import io.aeron.driver.MediaDriver
|
||||
|
@ -99,7 +98,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
private val shutdown = atomic(false)
|
||||
|
||||
@Volatile
|
||||
private var shutdownLatch: SuspendWaiter = SuspendWaiter()
|
||||
private var shutdownWaiter: SuspendWaiter = SuspendWaiter()
|
||||
|
||||
// we only want one instance of these created. These will be called appropriately
|
||||
val settingsStore: SettingsStore
|
||||
|
@ -107,6 +106,9 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
internal val rmiGlobalSupport = RmiManagerGlobal<CONNECTION>(logger, actionDispatch, config.serialization)
|
||||
|
||||
init {
|
||||
require(!config.previouslyUsed) { "${type.simpleName} configuration cannot be reused!" }
|
||||
config.validate()
|
||||
|
||||
runBlocking {
|
||||
// our default onError handler. All error messages go though this
|
||||
listenerManager.onError { throwable ->
|
||||
|
@ -123,15 +125,8 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
sendIdleStrategy = config.sendIdleStrategy
|
||||
handshakeKryo = serialization.initHandshakeKryo()
|
||||
|
||||
// we have to be able to specify WHAT property store we want to use, since it can change!
|
||||
settingsStore = config.settingsStore
|
||||
settingsStore.init()
|
||||
|
||||
when (val builder = settingsStore.builder) {
|
||||
is StorageSystem.DiskBuilder -> logger.info("Disk storage system initialized at: '${builder.file}'")
|
||||
is StorageSystem.MemoryBuilder -> logger.info("Memory storage system initialized")
|
||||
else -> logger.info("${builder::class.java.simpleName} storage system initialized")
|
||||
}
|
||||
// we have to be able to specify the property store
|
||||
settingsStore = config.settingsStore.create(logger)
|
||||
|
||||
crypto = CryptoManagement(logger, settingsStore, type, config)
|
||||
|
||||
|
@ -177,7 +172,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
|
||||
internal fun initEndpointState(): Aeron {
|
||||
shutdown.getAndSet(false)
|
||||
shutdownLatch = SuspendWaiter()
|
||||
shutdownWaiter = SuspendWaiter()
|
||||
return aeron
|
||||
}
|
||||
|
||||
|
@ -634,7 +629,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
* Waits for this endpoint to be closed
|
||||
*/
|
||||
suspend fun waitForClose() {
|
||||
shutdownLatch.doWait()
|
||||
shutdownWaiter.doWait()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -663,9 +658,10 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
|
||||
aeron.close()
|
||||
(aeron.context().threadFactory() as NamedThreadFactory).group.destroy()
|
||||
AeronConfig.stopDriver(mediaDriver, logger)
|
||||
|
||||
runBlocking {
|
||||
AeronConfig.stopDriver(mediaDriver, logger)
|
||||
|
||||
connections.forEach {
|
||||
it.close()
|
||||
}
|
||||
|
@ -681,7 +677,7 @@ internal constructor(val type: Class<*>, internal val config: Configuration) : A
|
|||
close0()
|
||||
|
||||
// if we are waiting for shutdown, cancel the waiting thread (since we have shutdown now)
|
||||
shutdownLatch.cancel()
|
||||
shutdownWaiter.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
213
src/dorkbox/network/other/PooledSerialization.kt
Normal file
213
src/dorkbox/network/other/PooledSerialization.kt
Normal file
|
@ -0,0 +1,213 @@
|
|||
/*
|
||||
* Copyright 2014 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.other
|
||||
|
||||
import com.conversantmedia.util.concurrent.MultithreadConcurrentQueue
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.minlog.Log
|
||||
import dorkbox.network.serialization.ClassRegistration
|
||||
import dorkbox.network.serialization.ClassRegistration0
|
||||
import dorkbox.network.serialization.ClassRegistration1
|
||||
import dorkbox.network.serialization.ClassRegistration2
|
||||
import dorkbox.network.serialization.ClassRegistration3
|
||||
import dorkbox.network.serialization.KryoExtra
|
||||
import dorkbox.util.serialization.SerializationDefaults
|
||||
import kotlinx.atomicfu.atomic
|
||||
|
||||
class PooledSerialization {
|
||||
companion object {
|
||||
init {
|
||||
Log.set(Log.LEVEL_ERROR)
|
||||
}
|
||||
}
|
||||
|
||||
private var initialized = atomic(false)
|
||||
private val classesToRegister = mutableListOf<ClassRegistration>()
|
||||
|
||||
private var kryoPoolSize = 16
|
||||
private val kryoInUse = atomic(0)
|
||||
|
||||
@Volatile
|
||||
private var kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize)
|
||||
|
||||
/**
|
||||
* If you customize anything, you will want to register custom types before init() is called!
|
||||
*/
|
||||
fun init() {
|
||||
// NOTE: there are problems if our serializer is THE SAME serializer used by the network stack!
|
||||
// We are explicitly differet types to prevent that form happening
|
||||
|
||||
initialized.value = true
|
||||
}
|
||||
|
||||
private fun initKryo(): KryoExtra {
|
||||
val kryo = KryoExtra()
|
||||
|
||||
SerializationDefaults.register(kryo)
|
||||
|
||||
classesToRegister.forEach { registration ->
|
||||
registration.register(kryo)
|
||||
}
|
||||
|
||||
return kryo
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Registers the class using the lowest, next available integer ID and the [default serializer][Kryo.getDefaultSerializer].
|
||||
* If the class is already registered, the existing entry is updated with the new serializer.
|
||||
*
|
||||
*
|
||||
* Registering a primitive also affects the corresponding primitive wrapper.
|
||||
*
|
||||
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
|
||||
* method.
|
||||
*
|
||||
* The order must be the same at deserialization as it was for serialization.
|
||||
*
|
||||
* This must happen before the creation of the client/server
|
||||
*/
|
||||
fun <T> register(clazz: Class<T>): PooledSerialization {
|
||||
require(!initialized.value) { "Serialization 'register(class)' cannot happen after initialization!" }
|
||||
|
||||
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
|
||||
// with object types... EVEN IF THERE IS A SERIALIZER
|
||||
require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." }
|
||||
|
||||
classesToRegister.add(ClassRegistration3(clazz))
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the class using the specified ID. If the ID is already in use by the same type, the old entry is overwritten. If the ID
|
||||
* is already in use by a different type, an exception is thrown.
|
||||
*
|
||||
*
|
||||
* Registering a primitive also affects the corresponding primitive wrapper.
|
||||
*
|
||||
* IDs must be the same at deserialization as they were for serialization.
|
||||
*
|
||||
* This must happen before the creation of the client/server
|
||||
*
|
||||
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
|
||||
* these IDs can be repurposed.
|
||||
*/
|
||||
fun <T> register(clazz: Class<T>, id: Int): PooledSerialization {
|
||||
require(!initialized.value) { "Serialization 'register(Class, int)' cannot happen after initialization!" }
|
||||
|
||||
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
|
||||
// with object types... EVEN IF THERE IS A SERIALIZER
|
||||
require(!clazz.isInterface) { "Cannot register '${clazz}' with specified ID for serialization. It must be an implementation." }
|
||||
|
||||
classesToRegister.add(ClassRegistration1(clazz, id))
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already registered,
|
||||
* the existing entry is updated with the new serializer.
|
||||
*
|
||||
*
|
||||
* Registering a primitive also affects the corresponding primitive wrapper.
|
||||
*
|
||||
*
|
||||
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when using this
|
||||
* method. The order must be the same at deserialization as it was for serialization.
|
||||
*/
|
||||
@Synchronized
|
||||
fun <T> register(clazz: Class<T>, serializer: Serializer<T>): PooledSerialization {
|
||||
require(!initialized.value) { "Serialization 'register(Class, Serializer)' cannot happen after initialization!" }
|
||||
|
||||
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
|
||||
// with object types... EVEN IF THERE IS A SERIALIZER
|
||||
require(!clazz.isInterface) { "Cannot register '${clazz.name}' with a serializer. It must be an implementation." }
|
||||
|
||||
classesToRegister.add(ClassRegistration0(clazz, serializer))
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the class using the specified ID and serializer. If the ID is already in use by the same type, the old entry is
|
||||
* overwritten. If the ID is already in use by a different type, an exception is thrown.
|
||||
*
|
||||
*
|
||||
* Registering a primitive also affects the corresponding primitive wrapper.
|
||||
*
|
||||
*
|
||||
* IDs must be the same at deserialization as they were for serialization.
|
||||
*
|
||||
* @param id Must be >= 0. Smaller IDs are serialized more efficiently. IDs 0-8 are used by default for primitive types and String, but
|
||||
* these IDs can be repurposed.
|
||||
*/
|
||||
@Synchronized
|
||||
fun <T> register(clazz: Class<T>, serializer: Serializer<T>, id: Int): PooledSerialization {
|
||||
require(!initialized.value) { "Serialization 'register(Class, Serializer, int)' cannot happen after initialization!" }
|
||||
|
||||
// The reason it must be an implementation, is because the reflection serializer DOES NOT WORK with field types, but rather
|
||||
// with object types... EVEN IF THERE IS A SERIALIZER
|
||||
require(!clazz.isInterface) { "Cannot register '${clazz.name}'. It must be an implementation." }
|
||||
|
||||
classesToRegister.add(ClassRegistration2(clazz, serializer, id))
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* @return takes a kryo instance from the pool, or creates one if the pool was empty
|
||||
*/
|
||||
fun takeKryo(): KryoExtra {
|
||||
kryoInUse.getAndIncrement()
|
||||
|
||||
// ALWAYS get as many as needed. We recycle them (with an auto-growing pool) to prevent too many getting created
|
||||
return kryoPool.poll() ?: initKryo()
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a kryo instance to the pool for re-use later on
|
||||
*/
|
||||
fun returnKryo(kryo: KryoExtra) {
|
||||
val kryoCount = kryoInUse.getAndDecrement()
|
||||
if (kryoCount > kryoPoolSize) {
|
||||
// this is CLEARLY a problem, as we have more kryos in use that our pool can support.
|
||||
// This happens when we send messages REALLY fast.
|
||||
//
|
||||
// We fix this by increasing the size of the pool, so kryos aren't thrown away (and create a GC hit)
|
||||
|
||||
synchronized(kryoInUse) {
|
||||
// we have a double check here on purpose. only 1 will work
|
||||
if (kryoCount > kryoPoolSize) {
|
||||
val oldPool = kryoPool
|
||||
val oldSize = kryoPoolSize
|
||||
val newSize = kryoPoolSize * 2
|
||||
|
||||
kryoPoolSize = newSize
|
||||
kryoPool = MultithreadConcurrentQueue<KryoExtra>(kryoPoolSize)
|
||||
|
||||
|
||||
// take all of the old kryos and put them in the new one
|
||||
val array = arrayOfNulls<KryoExtra>(oldSize)
|
||||
val count = oldPool.remove(array)
|
||||
|
||||
for (i in 0 until count) {
|
||||
kryoPool.offer(array[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
kryoPool.offer(kryo)
|
||||
}
|
||||
}
|
|
@ -1,83 +0,0 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.storage
|
||||
|
||||
import dorkbox.util.storage.StorageKey
|
||||
|
||||
// must have empty constructor
|
||||
class DB_Server {
|
||||
companion object {
|
||||
/**
|
||||
* The storage key used to save all server connections
|
||||
*/
|
||||
val STORAGE_KEY = StorageKey("servers")
|
||||
}
|
||||
|
||||
|
||||
// salt + IP address is used for equals!
|
||||
var ipAddress: ByteArray? = null
|
||||
var salt: ByteArray? = null
|
||||
|
||||
var privateKey: ByteArray? = null
|
||||
var publicKey: ByteArray? = null
|
||||
|
||||
override fun hashCode(): Int {
|
||||
val prime = 31
|
||||
var result = 1
|
||||
result = prime * result + (ipAddress?.contentHashCode() ?: 0)
|
||||
return result
|
||||
}
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) {
|
||||
return true
|
||||
}
|
||||
if (other == null) {
|
||||
return false
|
||||
}
|
||||
if (javaClass != other.javaClass) {
|
||||
return false
|
||||
}
|
||||
val other1 = other as DB_Server
|
||||
|
||||
if (ipAddress == null) {
|
||||
if (other1.ipAddress != null) {
|
||||
return false
|
||||
}
|
||||
} else if (other1.ipAddress == null) {
|
||||
return false
|
||||
} else if (!ipAddress!!.contentEquals(other1.ipAddress!!)) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (salt == null) {
|
||||
if (other1.salt != null) {
|
||||
return false
|
||||
}
|
||||
} else if (other1.salt == null) {
|
||||
return false
|
||||
}
|
||||
|
||||
return salt!!.contentEquals(other1.salt!!)
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
val bytes = ipAddress
|
||||
return if (bytes != null) {
|
||||
"DB_Server " + bytes.contentToString()
|
||||
} else "DB_Server [no-ip-set]"
|
||||
}
|
||||
}
|
30
src/dorkbox/network/storage/GenericStore.kt
Normal file
30
src/dorkbox/network/storage/GenericStore.kt
Normal file
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.storage
|
||||
|
||||
interface GenericStore: AutoCloseable {
|
||||
/**
|
||||
* similar to map.get()
|
||||
*/
|
||||
operator fun get(key: Any): ByteArray?
|
||||
|
||||
/**
|
||||
* similar to map.set()
|
||||
*
|
||||
* Setting to NULL removes the value
|
||||
*/
|
||||
operator fun set(key: Any, bytes: ByteArray?)
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
package dorkbox.network.storage
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import java.net.Inet4Address
|
||||
import java.net.InetAddress
|
||||
|
||||
// NOTE: This only serializes the IP address, not the hostname!
|
||||
class Inet4AddressIpSerializer : Serializer<Inet4Address>() {
|
||||
init {
|
||||
isImmutable = true
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, `object`: Inet4Address) {
|
||||
output.write(`object`.address, 0, 4)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<out Inet4Address>): Inet4Address {
|
||||
return InetAddress.getByAddress("", input.readBytes(4)) as Inet4Address
|
||||
}
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
package dorkbox.network.storage
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import java.net.Inet6Address
|
||||
import java.net.InetAddress
|
||||
import java.net.UnknownHostException
|
||||
|
||||
// NOTE: This only serializes the IP address, not the hostname!
|
||||
class Inet6AddressIpSerializer : Serializer<Inet6Address>() {
|
||||
init {
|
||||
isImmutable = true
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, `object`: Inet6Address) {
|
||||
output.write(`object`.address, 0, 16)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<out Inet6Address>): Inet6Address {
|
||||
return try {
|
||||
InetAddress.getByAddress("", input.readBytes(16)) as Inet6Address
|
||||
} catch (e: UnknownHostException) {
|
||||
throw KryoException(e)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,204 +15,117 @@
|
|||
*/
|
||||
package dorkbox.network.storage
|
||||
|
||||
import com.esotericsoftware.kryo.serializers.MapSerializer
|
||||
import dorkbox.netUtil.IPv4
|
||||
import dorkbox.netUtil.IPv6
|
||||
import dorkbox.network.connection.CryptoManagement
|
||||
import dorkbox.util.bytes.ByteArrayWrapper
|
||||
import dorkbox.util.exceptions.SecurityException
|
||||
import dorkbox.util.storage.Storage
|
||||
import dorkbox.util.storage.StorageBuilder
|
||||
import dorkbox.util.storage.StorageSystem
|
||||
import org.agrona.collections.Object2NullableObjectHashMap
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.net.Inet4Address
|
||||
import java.net.Inet6Address
|
||||
import mu.KLogger
|
||||
import java.net.InetAddress
|
||||
import java.security.SecureRandom
|
||||
|
||||
/**
|
||||
* This class provides a way for the network stack to use the server's database, instead of a property file (which it uses when stand-alone)
|
||||
*
|
||||
* A static "create" method, with any number of parameters, is required to create this class (which is done via reflection)
|
||||
* This class provides a way for the network stack to use a database of some sort.
|
||||
*/
|
||||
open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
||||
private lateinit var storage: Storage
|
||||
private lateinit var servers: Object2NullableObjectHashMap<InetAddress, DB_Server>
|
||||
|
||||
@Suppress("unused")
|
||||
class SettingsStore(val logger: KLogger, val store: GenericStore) : AutoCloseable {
|
||||
companion object {
|
||||
/**
|
||||
* Address 0.0.0.0 or ::0 may be used as a source address for this host on this network.
|
||||
*
|
||||
* Because we assigned BOTH to the same thing, it doesn't matter which one we use
|
||||
* Because we assigned BOTH to the same thing, it doesn't REALLY matter which one we use, so we use BOTH!
|
||||
*/
|
||||
private val ipv4Host = IPv4.WILDCARD
|
||||
private val ipv6Host = IPv6.WILDCARD
|
||||
internal val local4Buffer = IPv4.WILDCARD
|
||||
internal val local6Buffer = IPv6.WILDCARD
|
||||
|
||||
/**
|
||||
* Initialize using the provided serialization manager.
|
||||
*/
|
||||
fun init() {
|
||||
if (builder is StorageSystem.DiskBuilder) {
|
||||
// NOTE: there are problems if our serializer is THE SAME serializer used by the network stack!
|
||||
// make sure our custom types are registered!
|
||||
builder.serializationManager.register(Object2NullableObjectHashMap::class.java, MapSerializer())
|
||||
builder.serializationManager.register(ByteArrayWrapper::class.java)
|
||||
builder.serializationManager.register(DB_Server::class.java)
|
||||
|
||||
// NOTE: These only serialize the IP address, not the hostname!
|
||||
builder.serializationManager.register(Inet4Address::class.java, Inet4AddressIpSerializer())
|
||||
builder.serializationManager.register(Inet6Address::class.java, Inet6AddressIpSerializer())
|
||||
internal const val saltKey = "_salt"
|
||||
internal const val privateKey = "_private"
|
||||
}
|
||||
|
||||
this.storage = builder.build()
|
||||
init {
|
||||
// have to init salt
|
||||
if (store[saltKey] == null) {
|
||||
val secureRandom = SecureRandom()
|
||||
|
||||
servers = this.storage.get(DB_Server.STORAGE_KEY, Object2NullableObjectHashMap())
|
||||
// server salt is used to salt usernames and other various connection handshake parameters
|
||||
val bytes = ByteArray(32) // same size as our public/private key info
|
||||
secureRandom.nextBytes(bytes)
|
||||
|
||||
// this will always be null and is here to help people that copy/paste code
|
||||
var localServer = servers[ipv4Host]
|
||||
if (localServer == null) {
|
||||
localServer = DB_Server()
|
||||
servers[ipv4Host] = localServer
|
||||
|
||||
// have to always specify what we are saving
|
||||
this.storage.put(DB_Server.STORAGE_KEY, servers)
|
||||
}
|
||||
|
||||
if (servers[ipv6Host] == null) {
|
||||
servers[ipv6Host] = localServer
|
||||
|
||||
// have to always specify what we are saving
|
||||
this.storage.put(DB_Server.STORAGE_KEY, servers)
|
||||
// have to explicitly set it (so it will save)
|
||||
store[saltKey] = bytes
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to register the different serialization registrations for this store
|
||||
*/
|
||||
fun getSerializationRegistrations(): List<Class<out Any>> {
|
||||
// make sure our custom types are registered
|
||||
// only register if not ALREADY initialized, since we can initialize in the server and in the client. This creates problems if
|
||||
// running inside the same JVM
|
||||
return listOf(Object2NullableObjectHashMap::class.java,
|
||||
ByteArrayWrapper::class.java,
|
||||
DB_Server::class.java
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the private key of the server
|
||||
*
|
||||
* @throws SecurityException
|
||||
*/
|
||||
@Synchronized
|
||||
fun getPrivateKey(): ByteArray? {
|
||||
checkAccess(CryptoManagement::class.java)
|
||||
return servers[ipv4Host]!!.privateKey
|
||||
return store[local4Buffer]
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves the private key of the server
|
||||
*
|
||||
* @throws SecurityException
|
||||
*/
|
||||
@Synchronized
|
||||
fun savePrivateKey(serverPrivateKey: ByteArray) {
|
||||
checkAccess(CryptoManagement::class.java)
|
||||
servers[ipv4Host]!!.privateKey = serverPrivateKey
|
||||
|
||||
// have to always specify what we are saving
|
||||
storage.put(DB_Server.STORAGE_KEY, servers)
|
||||
store[privateKey] = serverPrivateKey
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the public key of the server
|
||||
*
|
||||
* @throws SecurityException
|
||||
*/
|
||||
@Synchronized
|
||||
fun getPublicKey(): ByteArray? {
|
||||
return servers[ipv4Host]!!.publicKey
|
||||
return store[local4Buffer]
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves the public key of the server
|
||||
*
|
||||
* @throws SecurityException
|
||||
*/
|
||||
@Synchronized
|
||||
fun savePublicKey(serverPublicKey: ByteArray) {
|
||||
checkAccess(CryptoManagement::class.java)
|
||||
servers[ipv4Host]!!.publicKey = serverPublicKey
|
||||
|
||||
// have to always specify what we are saving
|
||||
storage.put(DB_Server.STORAGE_KEY, servers)
|
||||
store[local4Buffer] = serverPublicKey
|
||||
store[local6Buffer] = serverPublicKey
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the server salt
|
||||
*/
|
||||
@Synchronized
|
||||
fun getSalt(): ByteArray {
|
||||
val localServer = servers[ipv4Host]
|
||||
var salt = localServer!!.salt
|
||||
|
||||
// we don't care who gets the server salt
|
||||
if (salt == null) {
|
||||
val secureRandom = SecureRandom()
|
||||
|
||||
// server salt is used to salt usernames and other various connection handshake parameters
|
||||
val bytes = ByteArray(256)
|
||||
secureRandom.nextBytes(bytes)
|
||||
salt = bytes
|
||||
localServer.salt = bytes
|
||||
|
||||
// have to always specify what we are saving
|
||||
storage.put(DB_Server.STORAGE_KEY, servers)
|
||||
}
|
||||
|
||||
return salt
|
||||
return store[saltKey]!!
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a previously registered computer by host IP address
|
||||
*/
|
||||
@Synchronized
|
||||
fun getRegisteredServerKey(hostAddress: InetAddress): ByteArray? {
|
||||
return servers[hostAddress]?.publicKey
|
||||
return store[hostAddress]
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves a connected computer by host IP address and public key
|
||||
* Saves a registered computer by host IP address and public key
|
||||
*/
|
||||
@Synchronized
|
||||
fun addRegisteredServerKey(hostAddress: InetAddress, publicKey: ByteArray) {
|
||||
// checkAccess(RegistrationWrapper.class);
|
||||
var db_server = servers[hostAddress]
|
||||
if (db_server == null) {
|
||||
db_server = DB_Server()
|
||||
}
|
||||
|
||||
db_server.publicKey = publicKey
|
||||
servers[hostAddress] = db_server
|
||||
|
||||
// have to always specify what we are saving
|
||||
storage.put(DB_Server.STORAGE_KEY, servers)
|
||||
store[hostAddress] = publicKey
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a registered computer by host IP address
|
||||
*
|
||||
* @return true if successful, false if there were problems (or it didn't exist)
|
||||
*/
|
||||
@Synchronized
|
||||
fun removeRegisteredServerKey(hostAddress: InetAddress): Boolean {
|
||||
// checkAccess(RegistrationWrapper.class);
|
||||
val db_server = servers.remove(hostAddress)
|
||||
|
||||
// have to always specify what we are saving
|
||||
storage.put(DB_Server.STORAGE_KEY, servers)
|
||||
|
||||
return db_server != null
|
||||
fun removeRegisteredServerKey(hostAddress: InetAddress) {
|
||||
store[hostAddress] = null
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Take the proper steps to close the storage system.
|
||||
*/
|
||||
override fun close() {
|
||||
storage.close()
|
||||
store.close()
|
||||
}
|
||||
|
||||
|
||||
|
@ -226,7 +139,7 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
* OPTIMIZED METHOD
|
||||
*/
|
||||
@Throws(SecurityException::class)
|
||||
protected fun checkAccess(callingClass: Class<*>) {
|
||||
internal fun checkAccess(callingClass: Class<*>) {
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass).skip(2).findFirst()
|
||||
}.get()
|
||||
|
@ -234,7 +147,6 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
// starts with will allow for anonymous inner classes.
|
||||
if (callerClass !== callingClass) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
throw SecurityException(message)
|
||||
}
|
||||
|
@ -250,7 +162,7 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
* OPTIMIZED METHOD
|
||||
*/
|
||||
@Throws(SecurityException::class)
|
||||
protected fun checkAccess(callingClass1: Class<*>, callingClass2: Class<*>) {
|
||||
internal fun checkAccess(callingClass1: Class<*>, callingClass2: Class<*>) {
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass)
|
||||
.skip(2)
|
||||
|
@ -261,7 +173,6 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
val ok = callerClass === callingClass1 || callerClass === callingClass2
|
||||
if (!ok) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
throw SecurityException(message)
|
||||
}
|
||||
|
@ -277,7 +188,7 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
* OPTIMIZED METHOD
|
||||
*/
|
||||
@Throws(SecurityException::class)
|
||||
protected fun checkAccess(callingClass1: Class<*>, callingClass2: Class<*>, callingClass3: Class<*>) {
|
||||
internal fun checkAccess(callingClass1: Class<*>, callingClass2: Class<*>, callingClass3: Class<*>) {
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass)
|
||||
.skip(2)
|
||||
|
@ -288,7 +199,6 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
val ok = callerClass === callingClass1 || callerClass === callingClass2 || callerClass === callingClass3
|
||||
if (!ok) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
throw SecurityException(message)
|
||||
}
|
||||
|
@ -300,8 +210,9 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
*
|
||||
* (ie, not just any class can call certain admin actions.
|
||||
*/
|
||||
@Suppress("DuplicatedCode")
|
||||
@Throws(SecurityException::class)
|
||||
protected fun checkAccess(vararg callingClasses: Class<*>) {
|
||||
internal fun checkAccess(vararg callingClasses: Class<*>) {
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass)
|
||||
.skip(2)
|
||||
|
@ -319,7 +230,6 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
|
||||
if (!ok) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
throw SecurityException(message)
|
||||
}
|
||||
|
@ -336,7 +246,7 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
*
|
||||
* @return true if allowed access.
|
||||
*/
|
||||
protected fun checkAccessNoExit(callingClass: Class<*>): Boolean {
|
||||
internal fun checkAccessNoExit(callingClass: Class<*>): Boolean {
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass)
|
||||
.skip(2)
|
||||
|
@ -346,7 +256,6 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
// starts with will allow for anonymous inner classes.
|
||||
if (callerClass !== callingClass) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
return false
|
||||
}
|
||||
|
@ -364,7 +273,7 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
*
|
||||
* @return true if allowed access.
|
||||
*/
|
||||
protected fun checkAccessNoExit(callingClass1: Class<*>, callingClass2: Class<*>): Boolean {
|
||||
internal fun checkAccessNoExit(callingClass1: Class<*>, callingClass2: Class<*>): Boolean {
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass)
|
||||
.skip(2)
|
||||
|
@ -372,10 +281,9 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
}.get()
|
||||
|
||||
// starts with will allow for anonymous inner classes.
|
||||
var ok = callerClass === callingClass1 || callerClass === callingClass2
|
||||
val ok = callerClass === callingClass1 || callerClass === callingClass2
|
||||
if (!ok) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
return false
|
||||
}
|
||||
|
@ -393,7 +301,7 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
*
|
||||
* @return true if allowed access.
|
||||
*/
|
||||
protected fun checkAccessNoExit(callingClass1: Class<*>, callingClass2: Class<*>, callingClass3: Class<*>): Boolean {
|
||||
internal fun checkAccessNoExit(callingClass1: Class<*>, callingClass2: Class<*>, callingClass3: Class<*>): Boolean {
|
||||
// val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).callerClass
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass)
|
||||
|
@ -405,7 +313,6 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
val ok = callerClass === callingClass1 || callerClass === callingClass2 || callerClass === callingClass3
|
||||
if (!ok) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
return false
|
||||
}
|
||||
|
@ -420,7 +327,8 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
*
|
||||
* @return true if allowed access.
|
||||
*/
|
||||
protected fun checkAccessNoExit(vararg callingClasses: Class<*>): Boolean {
|
||||
@Suppress("DuplicatedCode")
|
||||
internal fun checkAccessNoExit(vararg callingClasses: Class<*>): Boolean {
|
||||
// val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).callerClass
|
||||
val callerClass = StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE).walk { s ->
|
||||
s.map(StackWalker.StackFrame::getDeclaringClass)
|
||||
|
@ -439,7 +347,6 @@ open class SettingsStore(internal val builder: StorageBuilder) : AutoCloseable {
|
|||
|
||||
if (!ok) {
|
||||
val message = "Security violation by: $callerClass"
|
||||
val logger = LoggerFactory.getLogger(SettingsStore::class.java)
|
||||
logger.error(message)
|
||||
return false
|
||||
}
|
||||
|
|
26
src/dorkbox/network/storage/StorageType.kt
Normal file
26
src/dorkbox/network/storage/StorageType.kt
Normal file
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.storage
|
||||
|
||||
import mu.KLogger
|
||||
import mu.KotlinLogging
|
||||
|
||||
interface StorageType {
|
||||
/**
|
||||
* Creates the custom storage type
|
||||
*/
|
||||
fun create(logger: KLogger = KotlinLogging.logger("StorageType")): SettingsStore
|
||||
}
|
90
src/dorkbox/network/storage/types/ChronicleMapStore.kt
Normal file
90
src/dorkbox/network/storage/types/ChronicleMapStore.kt
Normal file
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.storage.types
|
||||
|
||||
import dorkbox.network.storage.GenericStore
|
||||
import dorkbox.network.storage.SettingsStore
|
||||
import dorkbox.network.storage.StorageType
|
||||
import mu.KLogger
|
||||
import net.openhft.chronicle.map.ChronicleMap
|
||||
import java.io.File
|
||||
import java.net.InetAddress
|
||||
|
||||
|
||||
class ChronicleMapStore(val dbFile: File, val logger: KLogger): GenericStore {
|
||||
companion object {
|
||||
fun type(dbFile: String) : StorageType {
|
||||
return type(File(dbFile))
|
||||
}
|
||||
|
||||
fun type(dbFile: File) = object : StorageType {
|
||||
override fun create(logger: KLogger): SettingsStore {
|
||||
return SettingsStore(logger, ChronicleMapStore(dbFile.absoluteFile, logger))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val map = ChronicleMap.of(ByteArray::class.java, ByteArray::class.java)
|
||||
.name("machine-keys")
|
||||
.entries(1_000_000)
|
||||
.constantValueSizeBySample(ByteArray(32))
|
||||
.averageKeySize(16.0)
|
||||
.createPersistedTo(dbFile)
|
||||
|
||||
// byte 0 is SALT
|
||||
private val saltBuffer = ByteArray(1) {0}
|
||||
|
||||
// byte 1 is private key
|
||||
private val privateKeyBuffer = ByteArray(1) {1}
|
||||
|
||||
init {
|
||||
logger.info("ChronicleMap storage initialized at: '$dbFile'")
|
||||
}
|
||||
|
||||
private fun getBytes(key: Any): ByteArray {
|
||||
return when (key) {
|
||||
is InetAddress -> key.address
|
||||
SettingsStore.saltKey -> saltBuffer
|
||||
SettingsStore.privateKey -> privateKeyBuffer
|
||||
else -> throw IllegalArgumentException("Unable to manage property: $key")
|
||||
}
|
||||
}
|
||||
|
||||
override fun get(key: Any): ByteArray? {
|
||||
return map[getBytes(key)]
|
||||
}
|
||||
|
||||
/**
|
||||
* Setting to NULL removes it
|
||||
*/
|
||||
@Suppress("DuplicatedCode")
|
||||
override fun set(key: Any, bytes: ByteArray?) {
|
||||
val keyBytes = getBytes(key)
|
||||
|
||||
if (bytes == null) {
|
||||
map.remove(keyBytes)
|
||||
}
|
||||
else {
|
||||
map[keyBytes] = bytes
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
map.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
121
src/dorkbox/network/storage/types/LmdbStore.kt
Normal file
121
src/dorkbox/network/storage/types/LmdbStore.kt
Normal file
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.storage.types
|
||||
|
||||
import dorkbox.network.storage.GenericStore
|
||||
import dorkbox.network.storage.SettingsStore
|
||||
import dorkbox.network.storage.StorageType
|
||||
import mu.KLogger
|
||||
import org.lmdbjava.ByteArrayProxy
|
||||
import org.lmdbjava.Dbi
|
||||
import org.lmdbjava.DbiFlags
|
||||
import org.lmdbjava.Env
|
||||
import org.lmdbjava.EnvFlags
|
||||
import java.io.File
|
||||
import java.net.InetAddress
|
||||
|
||||
|
||||
class LmdbStore(val dbFile: File, val logger: KLogger): GenericStore {
|
||||
companion object {
|
||||
fun type(dbFile: String) : StorageType {
|
||||
return type(File(dbFile))
|
||||
}
|
||||
|
||||
fun type(dbFile: File) = object : StorageType {
|
||||
override fun create(logger: KLogger): SettingsStore {
|
||||
return SettingsStore(logger, LmdbStore(dbFile.absoluteFile, logger))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val env: Env<ByteArray>
|
||||
private val db: Dbi<ByteArray>
|
||||
|
||||
init {
|
||||
val prep = Env.create(ByteArrayProxy.PROXY_BA).setMapSize(1048760).setMaxDbs(1)
|
||||
|
||||
env = if (dbFile.isDirectory) {
|
||||
prep.open(dbFile)
|
||||
}
|
||||
else {
|
||||
// The database lock file is the path with "-lock" appended.
|
||||
prep.open(dbFile, EnvFlags.MDB_NOSUBDIR)
|
||||
}
|
||||
|
||||
db = env.openDbi("machine-keys", DbiFlags.MDB_CREATE)
|
||||
|
||||
logger.info("LMDB storage initialized at: '$dbFile'")
|
||||
}
|
||||
|
||||
// byte 0 is SALT
|
||||
private val saltBuffer = ByteArray(1) {0}
|
||||
|
||||
// byte 1 is private key
|
||||
private val privateKeyBuffer = ByteArray(1) {1}
|
||||
|
||||
private fun getBytes(key: Any): ByteArray {
|
||||
return when (key) {
|
||||
is InetAddress -> key.address
|
||||
SettingsStore.saltKey -> saltBuffer
|
||||
SettingsStore.privateKey -> privateKeyBuffer
|
||||
else -> throw IllegalArgumentException("Unable to manage property: $key")
|
||||
}
|
||||
}
|
||||
|
||||
override fun get(key: Any): ByteArray? {
|
||||
val keyBytes = getBytes(key)
|
||||
|
||||
return env.txnRead().use { txn ->
|
||||
db.get(txn, keyBytes) ?: return null
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Setting to NULL removes it
|
||||
*/
|
||||
@Suppress("DuplicatedCode")
|
||||
override fun set(key: Any, bytes: ByteArray?) {
|
||||
val keyBytes = getBytes(key)
|
||||
|
||||
if (bytes == null) {
|
||||
env.txnWrite().use { txn ->
|
||||
db.delete(txn, keyBytes)
|
||||
|
||||
// An explicit commit is required, otherwise Txn.close() rolls it back.
|
||||
txn.commit()
|
||||
}
|
||||
}
|
||||
else {
|
||||
env.txnWrite().use { txn ->
|
||||
try {
|
||||
db.put(txn, keyBytes, bytes)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Unable to save to LMDB!", e)
|
||||
}
|
||||
|
||||
// An explicit commit is required, otherwise Txn.close() rolls it back.
|
||||
txn.commit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
db.close()
|
||||
env.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
51
src/dorkbox/network/storage/types/MemoryStore.kt
Normal file
51
src/dorkbox/network/storage/types/MemoryStore.kt
Normal file
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.storage.types
|
||||
|
||||
import dorkbox.network.storage.GenericStore
|
||||
import dorkbox.network.storage.SettingsStore
|
||||
import dorkbox.network.storage.StorageType
|
||||
import mu.KLogger
|
||||
import org.agrona.collections.Object2ObjectHashMap
|
||||
|
||||
object MemoryStore {
|
||||
fun type() = object : StorageType {
|
||||
override fun create(logger: KLogger): SettingsStore {
|
||||
return SettingsStore(logger, MemoryAccess(logger))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class MemoryAccess(val logger: KLogger): GenericStore {
|
||||
private val map = Object2ObjectHashMap<Any, ByteArray>()
|
||||
|
||||
init {
|
||||
logger.info("Memory storage initialized")
|
||||
}
|
||||
|
||||
override fun get(key: Any): ByteArray? {
|
||||
return map[key]
|
||||
}
|
||||
|
||||
override fun set(key: Any, bytes: ByteArray?) {
|
||||
map[key] = bytes
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
}
|
||||
}
|
||||
|
163
src/dorkbox/network/storage/types/PropertyStore.kt
Normal file
163
src/dorkbox/network/storage/types/PropertyStore.kt
Normal file
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkbox.network.storage.types
|
||||
|
||||
import dorkbox.netUtil.IP
|
||||
import dorkbox.network.storage.GenericStore
|
||||
import dorkbox.network.storage.SettingsStore
|
||||
import dorkbox.network.storage.StorageType
|
||||
import dorkbox.util.Sys
|
||||
import dorkbox.util.properties.SortedProperties
|
||||
import mu.KLogger
|
||||
import org.agrona.collections.Object2ObjectHashMap
|
||||
import java.io.File
|
||||
import java.io.FileInputStream
|
||||
import java.io.FileOutputStream
|
||||
import java.io.IOException
|
||||
import java.net.InetAddress
|
||||
import java.util.*
|
||||
|
||||
class PropertyStore(val dbFile: File, val logger: KLogger): GenericStore {
|
||||
companion object {
|
||||
fun type(dbFile: String) : StorageType {
|
||||
return LmdbStore.type(File(dbFile))
|
||||
}
|
||||
|
||||
fun type(dbFile: File) = object : StorageType {
|
||||
override fun create(logger: KLogger): SettingsStore {
|
||||
return SettingsStore(logger, PropertyStore (dbFile, logger))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Volatile
|
||||
var lastModifiedTime = 0L
|
||||
val loadedProps = Object2ObjectHashMap<Any, ByteArray>()
|
||||
|
||||
init {
|
||||
load()
|
||||
logger.info("Property file storage initialized at: '$dbFile'")
|
||||
}
|
||||
|
||||
|
||||
private fun load() {
|
||||
// if we cannot load, then we create a properties file.
|
||||
if (!dbFile.canRead() && !dbFile.createNewFile()) {
|
||||
throw IOException("Cannot create file")
|
||||
}
|
||||
|
||||
val input = FileInputStream(dbFile)
|
||||
|
||||
try {
|
||||
val properties = Properties()
|
||||
properties.load(input)
|
||||
lastModifiedTime = dbFile.lastModified()
|
||||
|
||||
properties.entries.forEach {
|
||||
val key = it.key as String
|
||||
val value = it.value as String
|
||||
|
||||
when (key) {
|
||||
"_salt" -> loadedProps["_salt"] = Sys.hexToBytes(value)
|
||||
"_private" -> loadedProps["_private"] = Sys.hexToBytes(value)
|
||||
else -> {
|
||||
val address: InetAddress? = IP.fromString(key)
|
||||
if (address != null) {
|
||||
loadedProps[address] = Sys.hexToBytes(value)
|
||||
} else {
|
||||
logger.error("Unable to parse property file: $dbFile $key $value")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
properties.clear()
|
||||
} catch (e: IOException) {
|
||||
logger.error("Cannot load properties!", e)
|
||||
e.printStackTrace()
|
||||
} finally {
|
||||
input.close()
|
||||
}
|
||||
}
|
||||
|
||||
override operator fun get(key: Any): ByteArray? {
|
||||
// we want to check the last modified time when getting, because if we edit the on-disk file, we want to those changes
|
||||
val lastModifiedTime = dbFile.lastModified()
|
||||
if (this.lastModifiedTime != lastModifiedTime) {
|
||||
// we want to reload the info
|
||||
load()
|
||||
}
|
||||
|
||||
|
||||
val any = loadedProps[key]
|
||||
if (any != null) {
|
||||
return any
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Setting to NULL removes it
|
||||
*/
|
||||
override operator fun set(key: Any, bytes: ByteArray?) {
|
||||
if (bytes == null) {
|
||||
loadedProps.remove(key)
|
||||
} else {
|
||||
loadedProps[key] = bytes
|
||||
}
|
||||
|
||||
// every time we set info, we want to save it to disk (so the file on disk will ALWAYS be current, and so we can modify it as we choose)
|
||||
save()
|
||||
}
|
||||
|
||||
fun save() {
|
||||
var fos: FileOutputStream? = null
|
||||
try {
|
||||
fos = FileOutputStream(dbFile, false)
|
||||
|
||||
val properties = SortedProperties()
|
||||
|
||||
loadedProps.forEach { (key, value) ->
|
||||
when (key) {
|
||||
"_salt" -> properties[key] = Sys.bytesToHex(value)
|
||||
"_private" -> properties[key] = Sys.bytesToHex(value)
|
||||
is InetAddress -> properties[IP.toString(key)] = Sys.bytesToHex(value)
|
||||
else -> logger.error("Unable to parse property [$key] $value")
|
||||
}
|
||||
}
|
||||
|
||||
properties.store(fos, "Server salt, public/private keys, and remote computer public Keys")
|
||||
fos.flush()
|
||||
properties.clear()
|
||||
lastModifiedTime = dbFile.lastModified()
|
||||
} catch (e: IOException) {
|
||||
logger.error("Properties cannot save to: $dbFile", e)
|
||||
} finally {
|
||||
if (fos != null) {
|
||||
try {
|
||||
fos.close()
|
||||
} catch (ignored: IOException) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
save()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -24,6 +24,7 @@ import ch.qos.logback.core.ConsoleAppender
|
|||
import dorkbox.network.Client
|
||||
import dorkbox.network.Configuration
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.storage.types.MemoryStore
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.slf4j.LoggerFactory
|
||||
import sun.misc.Unsafe
|
||||
|
@ -95,6 +96,7 @@ object AeronClient {
|
|||
@JvmStatic
|
||||
fun main(args: Array<String>) {
|
||||
val configuration = Configuration()
|
||||
configuration.settingsStore = MemoryStore.type() // don't want to persist anything on disk!
|
||||
configuration.subscriptionPort = 2000
|
||||
configuration.publicationPort = 2001
|
||||
val client = Client<Connection>(configuration)
|
||||
|
|
|
@ -24,6 +24,7 @@ import ch.qos.logback.core.ConsoleAppender
|
|||
import dorkbox.network.Server
|
||||
import dorkbox.network.ServerConfiguration
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.storage.types.MemoryStore
|
||||
import org.slf4j.LoggerFactory
|
||||
import sun.misc.Unsafe
|
||||
import java.lang.reflect.Field
|
||||
|
@ -74,6 +75,7 @@ object AeronServer {
|
|||
@JvmStatic
|
||||
fun main(args: Array<String>) {
|
||||
val configuration = ServerConfiguration()
|
||||
configuration.settingsStore = MemoryStore.type() // don't want to persist anything on disk!
|
||||
configuration.listenIpAddress = "127.0.0.1"
|
||||
configuration.subscriptionPort = 2000
|
||||
configuration.publicationPort = 2001
|
||||
|
|
|
@ -45,6 +45,7 @@ import dorkbox.network.Configuration
|
|||
import dorkbox.network.Server
|
||||
import dorkbox.network.ServerConfiguration
|
||||
import dorkbox.network.connection.EndPoint
|
||||
import dorkbox.network.storage.types.MemoryStore
|
||||
import dorkbox.os.OS
|
||||
import dorkbox.util.entropy.Entropy
|
||||
import dorkbox.util.entropy.SimpleEntropy
|
||||
|
@ -72,6 +73,7 @@ abstract class BaseTest {
|
|||
const val LOOPBACK = "loopback"
|
||||
fun clientConfig(): Configuration {
|
||||
val configuration = Configuration()
|
||||
configuration.settingsStore = MemoryStore.type() // don't want to persist anything on disk!
|
||||
configuration.subscriptionPort = 2000
|
||||
configuration.publicationPort = 2001
|
||||
|
||||
|
@ -80,6 +82,7 @@ abstract class BaseTest {
|
|||
|
||||
fun serverConfig(): ServerConfiguration {
|
||||
val configuration = ServerConfiguration()
|
||||
configuration.settingsStore = MemoryStore.type() // don't want to persist anything on disk!
|
||||
|
||||
configuration.subscriptionPort = 2000
|
||||
configuration.publicationPort = 2001
|
||||
|
|
96
test/dorkboxTest/network/StorageTest.kt
Normal file
96
test/dorkboxTest/network/StorageTest.kt
Normal file
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Copyright 2020 dorkbox, llc
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dorkboxTest.network
|
||||
|
||||
import dorkbox.network.Server
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.storage.types.ChronicleMapStore
|
||||
import dorkbox.network.storage.types.LmdbStore
|
||||
import dorkbox.network.storage.types.MemoryStore
|
||||
import dorkbox.network.storage.types.PropertyStore
|
||||
import org.junit.Assert
|
||||
import org.junit.Test
|
||||
import java.io.File
|
||||
|
||||
class StorageTest : BaseTest() {
|
||||
@Test
|
||||
fun memoryTest() {
|
||||
val salt1 = MemoryStore.type().create().use { it.getSalt() }
|
||||
val salt2 = Server<Connection>(serverConfig()).use { it.settingsStore.getSalt() }
|
||||
val salt3 = Server<Connection>(serverConfig()).use { it.settingsStore.getSalt() }
|
||||
|
||||
Assert.assertFalse(salt1.contentEquals(salt2))
|
||||
Assert.assertFalse(salt1.contentEquals(salt3))
|
||||
Assert.assertFalse(salt2.contentEquals(salt3))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun lmdbTest() {
|
||||
val file = File("test.db").absoluteFile
|
||||
val fileLock = File("test.db-lock").absoluteFile
|
||||
|
||||
val salt1 = LmdbStore.type(file).create().use { it.getSalt() }
|
||||
val salt2 = LmdbStore.type(file).create().use { it.getSalt() }
|
||||
|
||||
Assert.assertArrayEquals(salt1, salt2)
|
||||
file.delete()
|
||||
fileLock.delete()
|
||||
|
||||
val salt3 = Server<Connection>(serverConfig().apply { settingsStore = LmdbStore.type(file) }).use { it.settingsStore.getSalt() }
|
||||
val salt4 = Server<Connection>(serverConfig().apply { settingsStore = LmdbStore.type(file) }).use { it.settingsStore.getSalt() }
|
||||
|
||||
Assert.assertArrayEquals(salt3, salt4)
|
||||
Assert.assertFalse(salt1.contentEquals(salt4))
|
||||
file.delete()
|
||||
fileLock.delete()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun propFileTest() {
|
||||
val file = File("test.db").absoluteFile
|
||||
|
||||
val salt1 = PropertyStore.type(file).create().use { it.getSalt() }
|
||||
val salt2 = PropertyStore.type(file).create().use { it.getSalt() }
|
||||
|
||||
Assert.assertArrayEquals(salt1, salt2)
|
||||
file.delete()
|
||||
|
||||
val salt3 = Server<Connection>(serverConfig().apply { settingsStore = PropertyStore.type(file) }).use { it.settingsStore.getSalt() }
|
||||
val salt4 = Server<Connection>(serverConfig().apply { settingsStore = PropertyStore.type(file) }).use { it.settingsStore.getSalt() }
|
||||
|
||||
Assert.assertArrayEquals(salt3, salt4)
|
||||
Assert.assertFalse(salt1.contentEquals(salt4))
|
||||
file.delete()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun chronicleMapTest() {
|
||||
val file = File("test.db").absoluteFile
|
||||
|
||||
val salt1 = ChronicleMapStore.type(file).create().use { it.getSalt() }
|
||||
val salt2 = ChronicleMapStore.type(file).create().use { it.getSalt() }
|
||||
|
||||
Assert.assertArrayEquals(salt1, salt2)
|
||||
file.delete()
|
||||
|
||||
val salt3 = Server<Connection>(serverConfig().apply { settingsStore = ChronicleMapStore.type(file) }).use { it.settingsStore.getSalt() }
|
||||
val salt4 = Server<Connection>(serverConfig().apply { settingsStore = ChronicleMapStore.type(file) }).use { it.settingsStore.getSalt() }
|
||||
|
||||
Assert.assertArrayEquals(salt3, salt4)
|
||||
Assert.assertFalse(salt1.contentEquals(salt4))
|
||||
file.delete()
|
||||
}
|
||||
}
|
486
test/dorkboxTest/network/lmdb/TutorialTest.kt
Normal file
486
test/dorkboxTest/network/lmdb/TutorialTest.kt
Normal file
|
@ -0,0 +1,486 @@
|
|||
/*-
|
||||
* #%L
|
||||
* LmdbJava
|
||||
* %%
|
||||
* Copyright (C) 2016 - 2020 The LmdbJava Open Source Project
|
||||
* %%
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* #L%
|
||||
*/
|
||||
package dorkboxTest.network.lmdb
|
||||
|
||||
import org.agrona.MutableDirectBuffer
|
||||
import org.agrona.concurrent.UnsafeBuffer
|
||||
import org.hamcrest.CoreMatchers
|
||||
import org.hamcrest.MatcherAssert
|
||||
import org.junit.Assert
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.lmdbjava.ByteBufferProxy
|
||||
import org.lmdbjava.DbiFlags
|
||||
import org.lmdbjava.DirectBufferProxy
|
||||
import org.lmdbjava.Env
|
||||
import org.lmdbjava.GetOp
|
||||
import org.lmdbjava.KeyRange
|
||||
import org.lmdbjava.SeekOp
|
||||
import org.lmdbjava.Verifier
|
||||
import java.io.File
|
||||
import java.io.IOException
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Welcome to LmdbJava!
|
||||
*
|
||||
*
|
||||
*
|
||||
* This short tutorial will walk you through using LmdbJava step-by-step.
|
||||
*
|
||||
*
|
||||
*
|
||||
* If you are using a 64-bit Windows, Linux or OS X machine, you can simply run
|
||||
* this tutorial by adding the LmdbJava JAR to your classpath. It includes the
|
||||
* required system libraries. If you are using another 64-bit platform, you'll
|
||||
* need to install the LMDB system library yourself. 32-bit platforms are not
|
||||
* supported.
|
||||
*/
|
||||
@Ignore
|
||||
class TutorialTest {
|
||||
|
||||
private val folder = TemporaryFolder()
|
||||
|
||||
@Rule
|
||||
fun tmp(): TemporaryFolder {
|
||||
return folder
|
||||
}
|
||||
|
||||
/**
|
||||
* In this first tutorial we will use LmdbJava with some basic defaults.
|
||||
*
|
||||
* @throws IOException if a path was unavailable for memory mapping
|
||||
*/
|
||||
@Test
|
||||
@Throws(IOException::class)
|
||||
fun tutorial1() {
|
||||
// We need a storage directory first.
|
||||
// The path cannot be on a remote file system.
|
||||
val path = tmp().newFolder()
|
||||
|
||||
// We always need an Env. An Env owns a physical on-disk storage file. One
|
||||
// Env can store many different databases (ie sorted maps).
|
||||
val env = Env.create() // LMDB also needs to know how large our DB might be. Over-estimating is OK.
|
||||
.setMapSize(10485760) // LMDB also needs to know how many DBs (Dbi) we want to store in this Env.
|
||||
.setMaxDbs(1) // Now let's open the Env. The same path can be concurrently opened and
|
||||
// used in different processes, but do not open the same path twice in
|
||||
// the same process at the same time.
|
||||
.open(path)
|
||||
|
||||
// We need a Dbi for each DB. A Dbi roughly equates to a sorted map. The
|
||||
// MDB_CREATE flag causes the DB to be created if it doesn't already exist.
|
||||
val db = env.openDbi(DB_NAME, DbiFlags.MDB_CREATE)
|
||||
|
||||
// We want to store some data, so we will need a direct ByteBuffer.
|
||||
// Note that LMDB keys cannot exceed maxKeySize bytes (511 bytes by default).
|
||||
// Values can be larger.
|
||||
val key = ByteBuffer.allocateDirect(env.maxKeySize)
|
||||
val `val` = ByteBuffer.allocateDirect(700)
|
||||
key.put("greeting".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
`val`.put("Hello world".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
val valSize = `val`.remaining()
|
||||
|
||||
// Now store it. Dbi.put() internally begins and commits a transaction (Txn).
|
||||
db.put(key, `val`)
|
||||
|
||||
// To fetch any data from LMDB we need a Txn. A Txn is very important in
|
||||
// LmdbJava because it offers ACID characteristics and internally holds a
|
||||
// read-only key buffer and read-only value buffer. These read-only buffers
|
||||
// are always the same two Java objects, but point to different LMDB-managed
|
||||
// memory as we use Dbi (and Cursor) methods. These read-only buffers remain
|
||||
// valid only until the Txn is released or the next Dbi or Cursor call. If
|
||||
// you need data afterwards, you should copy the bytes to your own buffer.
|
||||
env.txnRead().use { txn ->
|
||||
val found = db[txn, key]
|
||||
Assert.assertNotNull(found)
|
||||
|
||||
// The fetchedVal is read-only and points to LMDB memory
|
||||
val fetchedVal = txn.`val`()
|
||||
MatcherAssert.assertThat(fetchedVal.remaining(), CoreMatchers.`is`(valSize))
|
||||
|
||||
// Let's double-check the fetched value is correct
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(fetchedVal).toString(), CoreMatchers.`is`("Hello world"))
|
||||
}
|
||||
|
||||
// We can also delete. The simplest way is to let Dbi allocate a new Txn...
|
||||
db.delete(key)
|
||||
env.txnRead().use { txn -> Assert.assertNull(db[txn, key]) }
|
||||
env.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* In this second tutorial we'll learn more about LMDB's ACID Txns.
|
||||
*
|
||||
* @throws IOException if a path was unavailable for memory mapping
|
||||
* @throws InterruptedException if executor shutdown interrupted
|
||||
*/
|
||||
@Test
|
||||
@Throws(IOException::class, InterruptedException::class)
|
||||
fun tutorial2() {
|
||||
val env = createSimpleEnv(tmp().newFolder())
|
||||
val db = env.openDbi(DB_NAME, DbiFlags.MDB_CREATE)
|
||||
val key = ByteBuffer.allocateDirect(env.maxKeySize)
|
||||
val `val` = ByteBuffer.allocateDirect(700)
|
||||
|
||||
// Let's write and commit "key1" via a Txn. A Txn can include multiple Dbis.
|
||||
// Note write Txns block other write Txns, due to writes being serialized.
|
||||
// It's therefore important to avoid unnecessarily long-lived write Txns.
|
||||
env.txnWrite().use { txn ->
|
||||
key.put("key1".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
`val`.put("lmdb".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
db.put(txn, key, `val`)
|
||||
|
||||
// We can read data too, even though this is a write Txn.
|
||||
val found = db[txn, key]
|
||||
Assert.assertNotNull(found)
|
||||
|
||||
// An explicit commit is required, otherwise Txn.close() rolls it back.
|
||||
txn.commit()
|
||||
}
|
||||
|
||||
// Open a read-only Txn. It only sees data that existed at Txn creation time.
|
||||
val rtx = env.txnRead()
|
||||
|
||||
// Our read Txn can fetch key1 without problem, as it existed at Txn creation.
|
||||
var found = db[rtx, key]
|
||||
Assert.assertNotNull(found)
|
||||
|
||||
// Note that our main test thread holds the Txn. Only one Txn per thread is
|
||||
// typically permitted (the exception is a read-only Env with MDB_NOTLS).
|
||||
//
|
||||
// Let's write out a "key2" via a new write Txn in a different thread.
|
||||
val es = Executors.newCachedThreadPool()
|
||||
es.execute {
|
||||
env.txnWrite().use { txn ->
|
||||
key.clear()
|
||||
key.put("key2".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
db.put(txn, key, `val`)
|
||||
txn.commit()
|
||||
}
|
||||
}
|
||||
es.shutdown()
|
||||
es.awaitTermination(10, TimeUnit.SECONDS)
|
||||
|
||||
// Even though key2 has been committed, our read Txn still can't see it.
|
||||
found = db[rtx, key]
|
||||
Assert.assertNull(found)
|
||||
|
||||
// To see key2, we could create a new Txn. But a reset/renew is much faster.
|
||||
// Reset/renew is also important to avoid long-lived read Txns, as these
|
||||
// prevent the re-use of free pages by write Txns (ie the DB will grow).
|
||||
rtx.reset()
|
||||
// ... potentially long operation here ...
|
||||
rtx.renew()
|
||||
found = db[rtx, key]
|
||||
Assert.assertNotNull(found)
|
||||
|
||||
// Don't forget to close the read Txn now we're completely finished. We could
|
||||
// have avoided this if we used a try-with-resources block, but we wanted to
|
||||
// play around with multiple concurrent Txns to demonstrate the "I" in ACID.
|
||||
rtx.close()
|
||||
env.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* In this third tutorial we'll have a look at the Cursor. Up until now we've
|
||||
* just used Dbi, which is good enough for simple cases but unsuitable if you
|
||||
* don't know the key to fetch, or want to iterate over all the data etc.
|
||||
*
|
||||
* @throws IOException if a path was unavailable for memory mapping
|
||||
*/
|
||||
@Test
|
||||
@Throws(IOException::class)
|
||||
fun tutorial3() {
|
||||
val env = createSimpleEnv(tmp().newFolder())
|
||||
val db = env.openDbi(DB_NAME, DbiFlags.MDB_CREATE)
|
||||
val key = ByteBuffer.allocateDirect(env.maxKeySize)
|
||||
val `val` = ByteBuffer.allocateDirect(700)
|
||||
env.txnWrite().use { txn ->
|
||||
// A cursor always belongs to a particular Dbi.
|
||||
val c = db.openCursor(txn)
|
||||
|
||||
// We can put via a Cursor. Note we're adding keys in a strange order,
|
||||
// as we want to show you that LMDB returns them in sorted order.
|
||||
key.put("zzz".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
`val`.put("lmdb".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
c.put(key, `val`)
|
||||
key.clear()
|
||||
key.put("aaa".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
c.put(key, `val`)
|
||||
key.clear()
|
||||
key.put("ccc".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
c.put(key, `val`)
|
||||
|
||||
// We can read from the Cursor by key.
|
||||
c[key, GetOp.MDB_SET]
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(c.key()).toString(), CoreMatchers.`is`("ccc"))
|
||||
|
||||
// Let's see that LMDB provides the keys in appropriate order....
|
||||
c.seek(SeekOp.MDB_FIRST)
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(c.key()).toString(), CoreMatchers.`is`("aaa"))
|
||||
|
||||
c.seek(SeekOp.MDB_LAST)
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(c.key()).toString(), CoreMatchers.`is`("zzz"))
|
||||
|
||||
c.seek(SeekOp.MDB_PREV)
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(c.key()).toString(), CoreMatchers.`is`("ccc"))
|
||||
|
||||
// Cursors can also delete the current key.
|
||||
c.delete()
|
||||
|
||||
c.close()
|
||||
txn.commit()
|
||||
}
|
||||
|
||||
// A read-only Cursor can survive its original Txn being closed. This is
|
||||
// useful if you want to close the original Txn (eg maybe you created the
|
||||
// Cursor during the constructor of a singleton with a throw-away Txn). Of
|
||||
// course, you cannot use the Cursor if its Txn is closed or currently reset.
|
||||
val tx1 = env.txnRead()
|
||||
val c = db.openCursor(tx1)
|
||||
tx1.close()
|
||||
|
||||
// The Cursor becomes usable again by "renewing" it with an active read Txn.
|
||||
val tx2 = env.txnRead()
|
||||
c.renew(tx2)
|
||||
c.seek(SeekOp.MDB_FIRST)
|
||||
|
||||
// As usual with read Txns, we can reset and renew them. The Cursor does
|
||||
// not need any special handling if we do this.
|
||||
tx2.reset()
|
||||
|
||||
// ... potentially long operation here ...
|
||||
tx2.renew()
|
||||
c.seek(SeekOp.MDB_LAST)
|
||||
tx2.close()
|
||||
env.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* In this fourth tutorial we'll take a quick look at the iterators. These are
|
||||
* a more Java idiomatic form of using the Cursors we looked at in tutorial 3.
|
||||
*
|
||||
* @throws IOException if a path was unavailable for memory mapping
|
||||
*/
|
||||
@Test
|
||||
@Throws(IOException::class)
|
||||
fun tutorial4() {
|
||||
val env = createSimpleEnv(tmp().newFolder())
|
||||
val db = env.openDbi(DB_NAME, DbiFlags.MDB_CREATE)
|
||||
env.txnWrite().use { txn ->
|
||||
val key = ByteBuffer.allocateDirect(env.maxKeySize)
|
||||
val `val` = ByteBuffer.allocateDirect(700)
|
||||
|
||||
// Insert some data. Note that ByteBuffer order defaults to Big Endian.
|
||||
// LMDB does not persist the byte order, but it's critical to sort keys.
|
||||
// If your numeric keys don't sort as expected, review buffer byte order.
|
||||
`val`.putInt(100)
|
||||
key.putInt(1)
|
||||
db.put(txn, key, `val`)
|
||||
key.clear()
|
||||
key.putInt(2)
|
||||
db.put(txn, key, `val`)
|
||||
key.clear()
|
||||
|
||||
// Each iterable uses a cursor and must be closed when finished. Iterate
|
||||
// forward in terms of key ordering starting with the first key.
|
||||
db.iterate(txn, KeyRange.all()).use { ci ->
|
||||
for (kv in ci) {
|
||||
MatcherAssert.assertThat(kv.key(), CoreMatchers.notNullValue())
|
||||
MatcherAssert.assertThat(kv.`val`(), CoreMatchers.notNullValue())
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate backward in terms of key ordering starting with the last key.
|
||||
db.iterate(txn, KeyRange.allBackward()).use { ci ->
|
||||
for (kv in ci) {
|
||||
MatcherAssert.assertThat(kv.key(), CoreMatchers.notNullValue())
|
||||
MatcherAssert.assertThat(kv.`val`(), CoreMatchers.notNullValue())
|
||||
}
|
||||
}
|
||||
|
||||
// There are many ways to control the desired key range via KeyRange, such
|
||||
// as arbitrary start and stop values, direction etc. We've adopted Guava's
|
||||
// terminology for our range classes (see KeyRangeType for further details).
|
||||
key.putInt(1)
|
||||
val range = KeyRange.atLeastBackward(key)
|
||||
db.iterate(txn, range).use { ci ->
|
||||
for (kv in ci) {
|
||||
MatcherAssert.assertThat(kv.key(), CoreMatchers.notNullValue())
|
||||
MatcherAssert.assertThat(kv.`val`(), CoreMatchers.notNullValue())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
env.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* In this fifth tutorial we'll explore multiple values sharing a single key.
|
||||
*
|
||||
* @throws IOException if a path was unavailable for memory mapping
|
||||
*/
|
||||
@Test
|
||||
@Throws(IOException::class)
|
||||
fun tutorial5() {
|
||||
val env = createSimpleEnv(tmp().newFolder())
|
||||
|
||||
// This time we're going to tell the Dbi it can store > 1 value per key.
|
||||
// There are other flags available if we're storing integers etc.
|
||||
val db = env.openDbi(DB_NAME, DbiFlags.MDB_CREATE, DbiFlags.MDB_DUPSORT)
|
||||
|
||||
// Duplicate support requires both keys and values to be <= max key size.
|
||||
val key = ByteBuffer.allocateDirect(env.maxKeySize)
|
||||
val `val` = ByteBuffer.allocateDirect(env.maxKeySize)
|
||||
|
||||
env.txnWrite().use { txn ->
|
||||
val c = db.openCursor(txn)
|
||||
|
||||
// Store one key, but many values, and in non-natural order.
|
||||
key.put("key".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
`val`.put("xxx".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
c.put(key, `val`)
|
||||
`val`.clear()
|
||||
`val`.put("kkk".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
c.put(key, `val`)
|
||||
`val`.clear()
|
||||
`val`.put("lll".toByteArray(StandardCharsets.UTF_8)).flip()
|
||||
c.put(key, `val`)
|
||||
|
||||
// Cursor can tell us how many values the current key has.
|
||||
val count = c.count()
|
||||
MatcherAssert.assertThat(count, CoreMatchers.`is`(3L))
|
||||
|
||||
// Let's position the Cursor. Note sorting still works.
|
||||
c.seek(SeekOp.MDB_FIRST)
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(c.`val`()).toString(), CoreMatchers.`is`("kkk"))
|
||||
|
||||
c.seek(SeekOp.MDB_LAST)
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(c.`val`()).toString(), CoreMatchers.`is`("xxx"))
|
||||
|
||||
c.seek(SeekOp.MDB_PREV)
|
||||
MatcherAssert.assertThat(StandardCharsets.UTF_8.decode(c.`val`()).toString(), CoreMatchers.`is`("lll"))
|
||||
|
||||
c.close()
|
||||
txn.commit()
|
||||
}
|
||||
|
||||
env.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* Next up we'll show you how to easily check your platform (operating system
|
||||
* and Java version) is working properly with LmdbJava and the embedded LMDB
|
||||
* native library.
|
||||
*
|
||||
* @throws IOException if a path was unavailable for memory mapping
|
||||
*/
|
||||
@Test
|
||||
@Throws(IOException::class)
|
||||
fun tutorial6() {
|
||||
// Note we need to specify the Verifier's DBI_COUNT for the Env.
|
||||
val env = Env.create(ByteBufferProxy.PROXY_OPTIMAL).setMapSize(10485760).setMaxDbs(Verifier.DBI_COUNT).open(tmp().newFolder())
|
||||
|
||||
// Create a Verifier (it's a Callable<Long> for those needing full control).
|
||||
val v = Verifier(env)
|
||||
|
||||
// We now run the verifier for 3 seconds; it raises an exception on failure.
|
||||
// The method returns the number of entries it successfully verified.
|
||||
v.runFor(3, TimeUnit.SECONDS)
|
||||
env.close()
|
||||
}
|
||||
|
||||
/**
|
||||
* In this final tutorial we'll look at using Agrona's DirectBuffer.
|
||||
*
|
||||
* @throws IOException if a path was unavailable for memory mapping
|
||||
*/
|
||||
@Test
|
||||
@Throws(IOException::class)
|
||||
fun tutorial7() {
|
||||
// The critical difference is we pass the PROXY_DB field to Env.create().
|
||||
// There's also a PROXY_SAFE if you want to stop ByteBuffer's Unsafe use.
|
||||
// Aside from that and a different type argument, it's the same as usual...
|
||||
val env = Env.create(DirectBufferProxy.PROXY_DB).setMapSize(10485760).setMaxDbs(1).open(tmp().newFolder())
|
||||
|
||||
val db = env.openDbi(DB_NAME, DbiFlags.MDB_CREATE)
|
||||
|
||||
val keyBb = ByteBuffer.allocateDirect(env.maxKeySize)
|
||||
val key: MutableDirectBuffer = UnsafeBuffer(keyBb)
|
||||
val `val`: MutableDirectBuffer = UnsafeBuffer(ByteBuffer.allocateDirect(700))
|
||||
|
||||
env.txnWrite().use { txn ->
|
||||
db.openCursor(txn).use { c ->
|
||||
// Agrona is faster than ByteBuffer and its methods are nicer...
|
||||
`val`.putStringWithoutLengthUtf8(0, "The Value")
|
||||
key.putStringWithoutLengthUtf8(0, "yyy")
|
||||
c.put(key, `val`)
|
||||
|
||||
key.putStringWithoutLengthUtf8(0, "ggg")
|
||||
c.put(key, `val`)
|
||||
|
||||
c.seek(SeekOp.MDB_FIRST)
|
||||
MatcherAssert.assertThat(c.key().getStringWithoutLengthUtf8(0, env.maxKeySize), CoreMatchers.startsWith("ggg"))
|
||||
|
||||
c.seek(SeekOp.MDB_LAST)
|
||||
MatcherAssert.assertThat(c.key().getStringWithoutLengthUtf8(0, env.maxKeySize), CoreMatchers.startsWith("yyy"))
|
||||
|
||||
// DirectBuffer has no position concept. Often you don't want to store
|
||||
// the unnecessary bytes of a varying-size buffer. Let's have a look...
|
||||
val keyLen = key.putStringWithoutLengthUtf8(0, "12characters")
|
||||
MatcherAssert.assertThat(keyLen, CoreMatchers.`is`(12))
|
||||
MatcherAssert.assertThat(key.capacity(), CoreMatchers.`is`(env.maxKeySize))
|
||||
|
||||
// To only store the 12 characters, we simply call wrap:
|
||||
key.wrap(key, 0, keyLen)
|
||||
MatcherAssert.assertThat(key.capacity(), CoreMatchers.`is`(keyLen))
|
||||
c.put(key, `val`)
|
||||
c.seek(SeekOp.MDB_FIRST)
|
||||
MatcherAssert.assertThat(c.key().capacity(), CoreMatchers.`is`(keyLen))
|
||||
MatcherAssert.assertThat(c.key().getStringWithoutLengthUtf8(0, c.key().capacity()), CoreMatchers.`is`("12characters"))
|
||||
|
||||
// To store bigger values again, just wrap the original buffer.
|
||||
key.wrap(keyBb)
|
||||
MatcherAssert.assertThat(key.capacity(), CoreMatchers.`is`(env.maxKeySize))
|
||||
}
|
||||
txn.commit()
|
||||
}
|
||||
|
||||
env.close()
|
||||
}
|
||||
|
||||
// You've finished! There are lots of other neat things we could show you (eg
|
||||
// how to speed up inserts by appending them in key order, using integer
|
||||
// or reverse ordered keys, using Env.DISABLE_CHECKS_PROP etc), but you now
|
||||
// know enough to tackle the JavaDocs with confidence. Have fun!
|
||||
private fun createSimpleEnv(path: File): Env<ByteBuffer> {
|
||||
return Env.create().setMapSize(10485760).setMaxDbs(1).setMaxReaders(1).open(path)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val DB_NAME = "my DB"
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent
|
|||
import ch.qos.logback.core.ConsoleAppender
|
||||
import dorkbox.network.Client
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.storage.types.MemoryStore
|
||||
import dorkboxTest.network.BaseTest
|
||||
import dorkboxTest.network.rmi.RmiCommonTest
|
||||
import dorkboxTest.network.rmi.cows.TestCow
|
||||
|
@ -66,6 +67,7 @@ object TestClient {
|
|||
setup()
|
||||
|
||||
val config = BaseTest.clientConfig()
|
||||
config.settingsStore = MemoryStore.type() // don't want to persist anything on disk!
|
||||
config.enableRemoteSignatureValidation = false
|
||||
config.enableIpc = false
|
||||
config.aeronDirectoryForceUnique = true
|
||||
|
|
|
@ -17,6 +17,7 @@ package dorkboxTest.network.rmi.multiJVM
|
|||
|
||||
import dorkbox.network.Server
|
||||
import dorkbox.network.connection.Connection
|
||||
import dorkbox.network.storage.types.MemoryStore
|
||||
import dorkboxTest.network.BaseTest
|
||||
import dorkboxTest.network.rmi.cows.MessageWithTestCow
|
||||
import dorkboxTest.network.rmi.cows.TestBabyCowImpl
|
||||
|
@ -34,6 +35,7 @@ object TestServer {
|
|||
setup()
|
||||
|
||||
val configuration = BaseTest.serverConfig()
|
||||
configuration.settingsStore = MemoryStore.type() // don't want to persist anything on disk!
|
||||
|
||||
configuration.serialization.registerRmi(TestCow::class.java, TestCowImpl::class.java)
|
||||
configuration.serialization.register(MessageWithTestCow::class.java)
|
||||
|
|
Loading…
Reference in New Issue
Block a user