RMI method override implemented. lots of misc fixes

This commit is contained in:
nathan 2015-07-28 02:23:01 +02:00
parent 9c18128b2d
commit 1d12f2dbcd
12 changed files with 138 additions and 113 deletions

View File

@ -15,8 +15,6 @@
*/ */
package dorkbox.util; package dorkbox.util;
import java.io.Serializable; public interface Message {
public interface Message extends Serializable {
} }

View File

@ -21,6 +21,9 @@ import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.io.Output;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import java.io.IOException;
public public
interface SerializationManager { interface SerializationManager {
@ -72,7 +75,7 @@ interface SerializationManager {
* <p/> * <p/>
* There is a small speed penalty if there were no kryo's available to use. * There is a small speed penalty if there were no kryo's available to use.
*/ */
void write(ByteBuf buffer, Object message); void write(ByteBuf buffer, Object message) throws IOException;
/** /**
* Reads an object from the buffer. * Reads an object from the buffer.
@ -81,17 +84,17 @@ interface SerializationManager {
* *
* @param length should ALWAYS be the length of the expected object! * @param length should ALWAYS be the length of the expected object!
*/ */
Object read(ByteBuf buffer, int length); Object read(ByteBuf buffer, int length) throws IOException;
/** /**
* Writes the class and object using an available kryo instance * Writes the class and object using an available kryo instance
*/ */
void writeFullClassAndObject(Output output, Object value); void writeFullClassAndObject(final Logger logger, Output output, Object value) throws IOException;
/** /**
* Returns a class read from the input * Returns a class read from the input
*/ */
Object readFullClassAndObject(Input input); Object readFullClassAndObject(final Logger logger, final Input input) throws IOException;
/** /**
* Borrows a kryo from the threadsafe pool. You must release it back to the pool when done. * Borrows a kryo from the threadsafe pool. You must release it back to the pool when done.
@ -102,4 +105,15 @@ interface SerializationManager {
* Releases the kryo back to the threadsafe pool * Releases the kryo back to the threadsafe pool
*/ */
void release(Kryo kryo); void release(Kryo kryo);
/**
* Called when initialization is complete. This is to prevent (and recognize) out-of-order class/serializer registration.
*/
void finishInit();
/**
* @return true if our initialization is complete. Some registrations (in the property store, for example) always register for client
* and server, even if in the same JVM. This only attempts to register once.
*/
boolean initialized();
} }

View File

@ -26,7 +26,7 @@ class DB_Server {
/** /**
* Address 0.0.0.0/32 may be used as a source address for this host on this network. * Address 0.0.0.0/32 may be used as a source address for this host on this network.
*/ */
public static final ByteArrayWrapper IP_0_0_0_0 = ByteArrayWrapper.wrap(new byte[] {0, 0, 0, 0}); public static final ByteArrayWrapper IP_LOCALHOST = ByteArrayWrapper.wrap(new byte[] {127, 0, 0, 1});
// salt + IP address is used for equals! // salt + IP address is used for equals!
private byte[] ipAddress; private byte[] ipAddress;

View File

@ -1,42 +0,0 @@
/*
* Copyright 2010 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dorkbox.util.exceptions;
public
class NetException extends RuntimeException {
private static final long serialVersionUID = -3499720814336253695L;
public
NetException() {
super();
}
public
NetException(String message, Throwable cause) {
super(message, cause);
}
public
NetException(String message) {
super(message);
}
public
NetException(Throwable cause) {
super(cause);
}
}

View File

@ -20,7 +20,6 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.io.Output;
import dorkbox.util.exceptions.NetException;
import dorkbox.util.SerializationManager; import dorkbox.util.SerializationManager;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -48,7 +47,7 @@ public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
.getDeclaredField("m"); .getDeclaredField("m");
SOURCE_MAP_FIELD.setAccessible( true ); SOURCE_MAP_FIELD.setAccessible( true );
} catch ( final Exception e ) { } catch ( final Exception e ) {
throw new NetException("Could not access source collection" + throw new RuntimeException("Could not access source collection" +
" field in java.util.Collections$UnmodifiableCollection.", e ); " field in java.util.Collections$UnmodifiableCollection.", e );
} }
} }

View File

@ -159,7 +159,7 @@ class DiskStorage implements Storage {
} }
/** /**
* Uses the DEFAULT key ("") to return saved data. * Uses the DEFAULT key ("") to return saved data. Also saves the data.
* <p/> * <p/>
* This will check to see if there is an associated key for that data, if not - it will use data as the default * This will check to see if there is an associated key for that data, if not - it will use data as the default
* *
@ -167,48 +167,48 @@ class DiskStorage implements Storage {
*/ */
@Override @Override
public public
<T> T load(T data) throws IOException { <T> T getAndPut(T data) throws IOException {
return load(this.defaultKey, data); return getAndPut(this.defaultKey, data);
} }
/** /**
* Returns the saved data for the specified key. * Returns the saved data for the specified key. Also saves the data.
* *
* @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db) * @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db)
*/ */
@Override @Override
public public
<T> T load(String key, T data) throws IOException { <T> T getAndPut(String key, T data) throws IOException {
ByteArrayWrapper wrap = ByteArrayWrapper.wrap(key); ByteArrayWrapper wrap = ByteArrayWrapper.wrap(key);
return load(wrap, data); return getAndPut(wrap, data);
} }
/** /**
* Returns the saved data for the specified key. * Returns the saved data for the specified key. Also saves the data.
* *
* @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db) * @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db)
*/ */
@Override @Override
public public
<T> T load(byte[] key, T data) throws IOException { <T> T getAndPut(byte[] key, T data) throws IOException {
return load(ByteArrayWrapper.wrap(key), data); return getAndPut(ByteArrayWrapper.wrap(key), data);
} }
/** /**
* Returns the saved data for the specified key. * Returns the saved data for the specified key. Also saves the data.
* *
* @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db) * @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db)
*/ */
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public public
<T> T load(ByteArrayWrapper key, T data) throws IOException { <T> T getAndPut(ByteArrayWrapper key, T data) throws IOException {
Object source = get0(key); Object source = get0(key);
if (source == null) { if (source == null) {
// returned was null, so we should take value as the default // returned was null, so we should take value as the default
put(key, data); putAndSave(key, data);
return data; return data;
} }
else { else {
@ -318,7 +318,7 @@ class DiskStorage implements Storage {
} }
/** /**
* Deletes an object from storage. To ALSO remove from the cache, use unRegister(key) * Deletes an object from storage.
* *
* @return true if the delete was successful. False if there were problems deleting the data. * @return true if the delete was successful. False if there were problems deleting the data.
*/ */
@ -341,6 +341,28 @@ class DiskStorage implements Storage {
} }
} }
/**
* Deletes an object from storage.
*
* @return true if the delete was successful. False if there were problems deleting the data.
*/
@Override
public final
boolean delete(ByteArrayWrapper key) {
if (!this.isOpen.get()) {
throw new RuntimeException("Unable to act on closed storage");
}
// timer action runs on THIS thread, not timer thread
if (timer != null) {
this.timer.delay(0L);
return this.storage.delete(key);
}
else {
return false;
}
}
/** /**
* Closes the database and file. * Closes the database and file.
*/ */
@ -352,10 +374,7 @@ class DiskStorage implements Storage {
// have to "close" it after we run the timer! // have to "close" it after we run the timer!
this.isOpen.set(false); this.isOpen.set(false);
this.storage.close();
if (timer != null) {
this.storage.close();
}
} }
/** /**
@ -446,8 +465,14 @@ class DiskStorage implements Storage {
try { try {
this.actionLock.lock(); this.actionLock.lock();
// push action to map if (object != null) {
this.actionMap.put(key, object); // push action to map
this.actionMap.put(key, object);
} else {
this.actionMap.remove(key);
}
} finally { } finally {
this.actionLock.unlock(); this.actionLock.unlock();
} }
@ -477,7 +502,7 @@ class DiskStorage implements Storage {
*/ */
@Override @Override
public final public final
void commit() { void save() {
if (!this.isOpen.get()) { if (!this.isOpen.get()) {
throw new RuntimeException("Unable to act on closed storage"); throw new RuntimeException("Unable to act on closed storage");
} }
@ -495,7 +520,7 @@ class DiskStorage implements Storage {
*/ */
@Override @Override
public public
void commit(final String key, final Object object) { void putAndSave(final String key, final Object object) {
if (!this.isOpen.get()) { if (!this.isOpen.get()) {
throw new RuntimeException("Unable to act on closed storage"); throw new RuntimeException("Unable to act on closed storage");
} }
@ -514,7 +539,7 @@ class DiskStorage implements Storage {
*/ */
@Override @Override
public public
void commit(final byte[] key, final Object object) { void putAndSave(final byte[] key, final Object object) {
if (!this.isOpen.get()) { if (!this.isOpen.get()) {
throw new RuntimeException("Unable to act on closed storage"); throw new RuntimeException("Unable to act on closed storage");
} }
@ -535,7 +560,7 @@ class DiskStorage implements Storage {
*/ */
@Override @Override
public public
void commit(final ByteArrayWrapper key, final Object object) { void putAndSave(final ByteArrayWrapper key, final Object object) {
if (!this.isOpen.get()) { if (!this.isOpen.get()) {
throw new RuntimeException("Unable to act on closed storage"); throw new RuntimeException("Unable to act on closed storage");
} }

View File

@ -103,8 +103,8 @@ class MemoryStorage implements Storage {
*/ */
@Override @Override
public public
<T> T load(T data) throws IOException { <T> T getAndPut(T data) throws IOException {
return load(this.defaultKey, data); return getAndPut(this.defaultKey, data);
} }
/** /**
@ -114,10 +114,10 @@ class MemoryStorage implements Storage {
*/ */
@Override @Override
public public
<T> T load(String key, T data) throws IOException { <T> T getAndPut(String key, T data) throws IOException {
ByteArrayWrapper wrap = ByteArrayWrapper.wrap(key); ByteArrayWrapper wrap = ByteArrayWrapper.wrap(key);
return load(wrap, data); return getAndPut(wrap, data);
} }
/** /**
@ -127,14 +127,14 @@ class MemoryStorage implements Storage {
*/ */
@Override @Override
public public
<T> T load(byte[] key, T data) throws IOException { <T> T getAndPut(byte[] key, T data) throws IOException {
return load(ByteArrayWrapper.wrap(key), data); return getAndPut(ByteArrayWrapper.wrap(key), data);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public public
<T> T load(final ByteArrayWrapper key, final T data) throws IOException { <T> T getAndPut(final ByteArrayWrapper key, final T data) throws IOException {
final Object o = storage.get(key); final Object o = storage.get(key);
if (o == null) { if (o == null) {
storage.put(key, data); storage.put(key, data);
@ -192,14 +192,25 @@ class MemoryStorage implements Storage {
} }
/** /**
* Deletes an object from storage. To ALSO remove from the cache, use unRegister(key) * Deletes an object from storage.
* *
* @return true if the delete was successful. False if there were problems deleting the data. * @return true if the delete was successful. False if there were problems deleting the data.
*/ */
@Override @Override
public public
boolean delete(final String key) { boolean delete(final String key) {
storage.remove(ByteArrayWrapper.wrap(key)); return delete(ByteArrayWrapper.wrap(key));
}
/**
* Deletes an object from storage.
*
* @return true if the delete was successful. False if there were problems deleting the data.
*/
@Override
public
boolean delete(final ByteArrayWrapper key) {
storage.remove(key);
return true; return true;
} }
@ -246,25 +257,25 @@ class MemoryStorage implements Storage {
@Override @Override
public public
void commit() { void save() {
// no-op // no-op
} }
@Override @Override
public public
void commit(final String key, final Object object) { void putAndSave(final String key, final Object object) {
// no-op // no-op
} }
@Override @Override
public public
void commit(final byte[] key, final Object object) { void putAndSave(final byte[] key, final Object object) {
// no-op // no-op
} }
@Override @Override
public public
void commit(final ByteArrayWrapper key, final Object object) { void putAndSave(final ByteArrayWrapper key, final Object object) {
// no-op // no-op
} }
} }

View File

@ -260,10 +260,10 @@ class Metadata {
*/ */
static static
<T> T readData(SerializationManager serializationManager, InflaterInputStream inputStream) { <T> T readData(SerializationManager serializationManager, InflaterInputStream inputStream) throws IOException {
Input input = new Input(inputStream, 1024); // read 1024 at a time Input input = new Input(inputStream, 1024); // read 1024 at a time
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T readObject = (T) serializationManager.readFullClassAndObject(input); T readObject = (T) serializationManager.readFullClassAndObject(null, input);
return readObject; return readObject;
} }
@ -277,7 +277,7 @@ class Metadata {
final DeflaterOutputStream outputStream) throws IOException { final DeflaterOutputStream outputStream) throws IOException {
// HAVE TO LOCK BEFORE THIS IS CALLED! (AND FREE AFTERWARDS!) // HAVE TO LOCK BEFORE THIS IS CALLED! (AND FREE AFTERWARDS!)
Output output = new Output(outputStream, 1024); // write 1024 at a time Output output = new Output(outputStream, 1024); // write 1024 at a time
serializationManager.writeFullClassAndObject(output, data); serializationManager.writeFullClassAndObject(null, output, data);
output.flush(); output.flush();
outputStream.flush(); // sync-flush is enabled, so the output stream will finish compressing data. outputStream.flush(); // sync-flush is enabled, so the output stream will finish compressing data.

View File

@ -23,6 +23,7 @@ import java.io.IOException;
/** /**
* *
*/ */
@SuppressWarnings("unused")
public public
interface Storage { interface Storage {
/** /**
@ -36,7 +37,7 @@ interface Storage {
boolean contains(String key); boolean contains(String key);
/** /**
* Reads a object using the default (blank) key, and casts it to the expected class * Reads a object using the DEFAULT key ("") key, and casts it to the expected class
*/ */
<T> T get(); <T> T get();
@ -62,21 +63,21 @@ interface Storage {
* *
* @param data The data that will hold the copy of the data from disk * @param data The data that will hold the copy of the data from disk
*/ */
<T> T load(T data) throws IOException; <T> T getAndPut(T data) throws IOException;
/** /**
* Returns the saved data for the specified key. * Returns the saved data for the specified key.
* *
* @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db) * @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db)
*/ */
<T> T load(String key, T data) throws IOException; <T> T getAndPut(String key, T data) throws IOException;
/** /**
* Returns the saved data for the specified key. * Returns the saved data for the specified key.
* *
* @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db) * @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db)
*/ */
<T> T load(byte[] key, T data) throws IOException; <T> T getAndPut(byte[] key, T data) throws IOException;
/** /**
* Returns the saved data for the specified key. * Returns the saved data for the specified key.
@ -84,7 +85,7 @@ interface Storage {
* @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db) * @param data If there is no object in the DB with the specified key, this value will be the default (and will be saved to the db)
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
<T> T load(ByteArrayWrapper key, T data) throws IOException; <T> T getAndPut(ByteArrayWrapper key, T data) throws IOException;
/** /**
* Saves the given data to storage with the associated key. * Saves the given data to storage with the associated key.
@ -119,12 +120,19 @@ interface Storage {
void put(Object data); void put(Object data);
/** /**
* Deletes an object from storage. To ALSO remove from the cache, use unRegister(key) * Deletes an object from storage.
* *
* @return true if the delete was successful. False if there were problems deleting the data. * @return true if the delete was successful. False if there were problems deleting the data.
*/ */
boolean delete(String key); boolean delete(String key);
/**
* Deletes an object from storage.
*
* @return true if the delete was successful. False if there were problems deleting the data.
*/
boolean delete(ByteArrayWrapper key);
/** /**
* @return the file that backs this storage * @return the file that backs this storage
*/ */
@ -167,26 +175,26 @@ interface Storage {
* <p/> * <p/>
* This will save the ALL of the pending save actions to the file * This will save the ALL of the pending save actions to the file
*/ */
void commit(); void save();
/** /**
* Save the storage to disk, immediately. * Save the storage to disk, immediately.
* <p/> * <p/>
* This will save the ALL of the pending save actions to the file * This will save the ALL of the pending save actions to the file
*/ */
void commit(String key, Object object); void putAndSave(String key, Object object);
/** /**
* Save the storage to disk, immediately. * Save the storage to disk, immediately.
* <p/> * <p/>
* This will save the ALL of the pending save actions to the file * This will save the ALL of the pending save actions to the file
*/ */
void commit(byte[] key, Object object); void putAndSave(byte[] key, Object object);
/** /**
* Save the storage to disk, immediately. * Save the storage to disk, immediately.
* <p/> * <p/>
* This will save the ALL of the pending save actions to the file * This will save the ALL of the pending save actions to the file
*/ */
void commit(ByteArrayWrapper key, Object object); void putAndSave(ByteArrayWrapper key, Object object);
} }

View File

@ -379,7 +379,7 @@ class StorageBase {
} }
else { else {
// this is comparatively slow, since we serialize it first to get the size, then we put it in the file. // this is comparatively slow, since we serialize it first to get the size, then we put it in the file.
ByteArrayOutputStream dataStream = getDataAsByteArray(this.serializationManager, object, deflater); ByteArrayOutputStream dataStream = getDataAsByteArray(this.serializationManager, this.logger, object, deflater);
int size = dataStream.size(); int size = dataStream.size();
if (size > metaData.dataCapacity) { if (size > metaData.dataCapacity) {
@ -445,11 +445,11 @@ class StorageBase {
private static private static
ByteArrayOutputStream getDataAsByteArray(SerializationManager kryo, Object data, Deflater deflater) throws IOException { ByteArrayOutputStream getDataAsByteArray(SerializationManager serializationManager, Logger logger, Object data, Deflater deflater) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
OutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, deflater); OutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, deflater);
Output output = new Output(outputStream, 1024); // write 1024 at a time Output output = new Output(outputStream, 1024); // write 1024 at a time
kryo.writeFullClassAndObject(output, data); serializationManager.writeFullClassAndObject(logger, output, data);
output.flush(); output.flush();
outputStream.flush(); outputStream.flush();

View File

@ -151,7 +151,7 @@ class Store {
boolean waiting = storage.hasWriteWaiting(); boolean waiting = storage.hasWriteWaiting();
// we want this storage to be in a fresh state // we want this storage to be in a fresh state
if (waiting) { if (waiting) {
storage.commit(); storage.save();
} }
((DiskStorage) storage).increaseReference(); ((DiskStorage) storage).increaseReference();
} }

View File

@ -9,6 +9,7 @@ import dorkbox.util.storage.Store;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.junit.*; import org.junit.*;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -44,17 +45,17 @@ class StorageTest {
public public
void write(final ByteBuf buffer, final Object message) { void write(final ByteBuf buffer, final Object message) {
final Output output = new Output(); final Output output = new Output();
writeFullClassAndObject(output, message); writeFullClassAndObject(null, output, message);
buffer.writeBytes(output.getBuffer()); buffer.writeBytes(output.getBuffer());
} }
@Override @Override
public public
Object read(final ByteBuf buffer, final int length) { Object read(final ByteBuf buffer, final int length) throws IOException {
final Input input = new Input(); final Input input = new Input();
buffer.readBytes(input.getBuffer()); buffer.readBytes(input.getBuffer());
final Object o = readFullClassAndObject(input); final Object o = readFullClassAndObject(null, input);
buffer.skipBytes(input.position()); buffer.skipBytes(input.position());
return o; return o;
@ -62,13 +63,13 @@ class StorageTest {
@Override @Override
public public
void writeFullClassAndObject(final Output output, final Object value) { void writeFullClassAndObject(final Logger logger, final Output output, final Object value) {
kryo.writeClassAndObject(output, value); kryo.writeClassAndObject(output, value);
} }
@Override @Override
public public
Object readFullClassAndObject(final Input input) { Object readFullClassAndObject(final Logger logger, final Input input) throws IOException {
return kryo.readClassAndObject(input); return kryo.readClassAndObject(input);
} }
@ -82,6 +83,17 @@ class StorageTest {
public public
void release(final Kryo kryo) { void release(final Kryo kryo) {
} }
@Override
public
void finishInit() {
}
@Override
public
boolean initialized() {
return false;
}
}; };
static static
@ -335,7 +347,7 @@ class StorageTest {
storage.put(createKey, data); storage.put(createKey, data);
Data data2 = new Data(); Data data2 = new Data();
storage.load(createKey, data2); storage.getAndPut(createKey, data2);
Assert.assertEquals("Object is not the same", data, data2); Assert.assertEquals("Object is not the same", data, data2);
Store.close(storage); Store.close(storage);
@ -345,7 +357,7 @@ class StorageTest {
.make(); .make();
data2 = new Data(); data2 = new Data();
storage.load(createKey, data2); storage.getAndPut(createKey, data2);
Assert.assertEquals("Object is not the same", data, data2); Assert.assertEquals("Object is not the same", data, data2);
Store.close(storage); Store.close(storage);
@ -522,7 +534,7 @@ class StorageTest {
String createKey = createKey(i); String createKey = createKey(i);
Data data2 = new Data(); Data data2 = new Data();
storage.load(createKey, data2); storage.getAndPut(createKey, data2);
Assert.assertEquals("Object is not the same", data, data2); Assert.assertEquals("Object is not the same", data, data2);
} }
Store.close(storage); Store.close(storage);