185 lines
6.0 KiB
Java
185 lines
6.0 KiB
Java
/*
|
|
* Copyright 2016 dorkbox, llc
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
package dorkbox.messageBus.subscription;
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
|
|
|
import dorkbox.collections.IdentityMap;
|
|
import dorkbox.messageBus.common.MessageHandler;
|
|
import dorkbox.messageBus.error.ErrorHandler;
|
|
import dorkbox.messageBus.publication.Publisher;
|
|
|
|
/**
|
|
* A subscription is a container that manages exactly one message handler of all registered
|
|
* message listeners of the same class, i.e. all subscribed instances (excluding subclasses) of a message
|
|
* will be referenced in the subscription created for a message.
|
|
* <p/>
|
|
* There will be as many unique subscription objects per message listener class as there are message handlers
|
|
* defined in the message listeners class hierarchy.
|
|
* <p/>
|
|
* This class uses the "single writer principle", so that the subscription are only MODIFIED by a single thread,
|
|
* but are READ by X number of threads (in a safe way). This uses object thread visibility/publication to work.
|
|
*
|
|
* @author dorkbox, llc
|
|
* Date: 2/3/16
|
|
*/
|
|
public abstract
|
|
class Subscription<T> {
|
|
private static final AtomicInteger ID_COUNTER = new AtomicInteger();
|
|
private final int ID = ID_COUNTER.getAndIncrement();
|
|
|
|
// this is the listener class that created this subscription
|
|
private final Class<?> listenerClass;
|
|
|
|
// the handler's metadata -> for each handler in a listener, a unique subscription context is created
|
|
private final MessageHandler handler;
|
|
|
|
// This is only touched by a single thread!
|
|
private final IdentityMap<Object, Entry> entries; // maintain a map of entries for FAST lookup during unsubscribe.
|
|
|
|
// this is still inside the single-writer, and can use the same techniques as subscription manager (for thread safe publication)
|
|
protected volatile Entry<T> head = null; // reference to the first element
|
|
|
|
// Recommended for best performance while adhering to the "single writer principle". Must be static-final
|
|
protected static final AtomicReferenceFieldUpdater<Subscription, Entry> headREF =
|
|
AtomicReferenceFieldUpdater.newUpdater(Subscription.class,
|
|
Entry.class,
|
|
"head");
|
|
|
|
protected
|
|
Subscription(final Class<?> listenerClass, final MessageHandler handler) {
|
|
this.listenerClass = listenerClass;
|
|
this.handler = handler;
|
|
this.entries = new IdentityMap<Object, Entry>(32, SubscriptionManager.LOAD_FACTOR);
|
|
}
|
|
|
|
/**
|
|
* called on shutdown for GC purposes
|
|
* called within SYNCHRONIZE
|
|
*/
|
|
public final
|
|
void clear() {
|
|
this.entries.clear();
|
|
this.head = null;
|
|
}
|
|
|
|
// only used in unit tests to verify that the subscription manager is working correctly
|
|
public final
|
|
Class<?> getListenerClass() {
|
|
return listenerClass;
|
|
}
|
|
|
|
public final
|
|
MessageHandler getHandler() {
|
|
return handler;
|
|
}
|
|
|
|
public abstract
|
|
Entry<T> createEntry(final Object listener, final Entry head);
|
|
|
|
/**
|
|
* single writer principle!
|
|
* called from within SYNCHRONIZE
|
|
*
|
|
* @param listener the object that will receive messages during publication
|
|
*/
|
|
public
|
|
void subscribe(final Object listener) {
|
|
Entry head = headREF.get(this);
|
|
|
|
if (!entries.containsKey(listener)) {
|
|
head = createEntry(listener, head);
|
|
|
|
entries.put(listener, head);
|
|
headREF.lazySet(this, head);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* single writer principle!
|
|
* called from within SYNCHRONIZE
|
|
*
|
|
* @param listener the object that will NO LONGER receive messages during publication
|
|
*/
|
|
public
|
|
void unsubscribe(final Object listener) {
|
|
Entry entry = entries.get(listener);
|
|
|
|
if (entry != null) {
|
|
removeNode(entry);
|
|
|
|
this.entries.remove(listener);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* single writer principle!
|
|
* called from within SYNCHRONIZE
|
|
*
|
|
* @param entry the entry that will be removed from the linked list
|
|
*/
|
|
protected
|
|
void removeNode(final Entry entry) {
|
|
Entry head = headREF.get(this);
|
|
|
|
if (entry == head) {
|
|
// if it was second, now it's first
|
|
head = head.next();
|
|
//oldHead.clear(); // optimize for GC not possible because of potentially running iterators
|
|
}
|
|
else {
|
|
entry.remove();
|
|
}
|
|
headREF.lazySet(this, head);
|
|
}
|
|
|
|
/**
|
|
* only used in unit tests
|
|
*/
|
|
public final
|
|
int size() {
|
|
return this.entries.size();
|
|
}
|
|
|
|
public abstract boolean publish(final Publisher publisher, final ErrorHandler errorHandler, final Object message);
|
|
public abstract boolean publish(final Publisher publisher, final ErrorHandler errorHandler, final Object message1, final Object message2);
|
|
public abstract boolean publish(final Publisher publisher, final ErrorHandler errorHandler, final Object message1, final Object message2, final Object message3);
|
|
|
|
|
|
@Override
|
|
public final
|
|
int hashCode() {
|
|
return this.ID;
|
|
}
|
|
|
|
@Override
|
|
public final
|
|
boolean equals(final Object obj) {
|
|
if (this == obj) {
|
|
return true;
|
|
}
|
|
if (obj == null) {
|
|
return false;
|
|
}
|
|
if (getClass() != obj.getClass()) {
|
|
return false;
|
|
}
|
|
Subscription other = (Subscription) obj;
|
|
return this.ID == other.ID;
|
|
}
|
|
}
|