Converted to executors.

This commit is contained in:
Robinson 2023-09-07 01:01:22 +02:00
parent 94ae22716d
commit e11287b31e
No known key found for this signature in database
GPG Key ID: 8E7DB78588BD6F5C
1 changed files with 12 additions and 25 deletions

View File

@ -16,11 +16,9 @@
package dorkbox.network.connection
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue
import dorkbox.network.Configuration
import dorkbox.util.NamedThreadFactory
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import mu.KotlinLogging
import java.util.concurrent.*
@ -54,19 +52,7 @@ enum class EventDispatcher {
)
}.toTypedArray()
private val queues = executors.map { executor ->
val disruptor = DisruptorBlockingQueue<java.lang.Runnable>(16_000)
executor.submit {
while (true) {
val event = disruptor.take()
event.run()
}
}
disruptor
}.toTypedArray()
private val typedEntries: Array<EventDispatcher>
init {
executors.forEachIndexed { _, executor ->
@ -74,6 +60,8 @@ enum class EventDispatcher {
// this is to create a new thread only, so that the thread ID can be assigned
}
}
typedEntries = entries.toTypedArray()
}
/**
@ -82,7 +70,7 @@ enum class EventDispatcher {
* No values specified means we check ALL events
*/
fun isDispatch(): Boolean {
return isCurrentEvent(*entries.toTypedArray())
return isCurrentEvent(*typedEntries)
}
/**
@ -90,7 +78,7 @@ enum class EventDispatcher {
*
* No values specified means we check ALL events
*/
fun isCurrentEvent(vararg events: EventDispatcher = entries.toTypedArray()): Boolean {
fun isCurrentEvent(vararg events: EventDispatcher = typedEntries): Boolean {
val threadId = Thread.currentThread().id
events.forEach { event ->
@ -107,7 +95,7 @@ enum class EventDispatcher {
*
* No values specified means we check ALL events
*/
fun isNotCurrentEvent(vararg events: EventDispatcher = values()): Boolean {
fun isNotCurrentEvent(vararg events: EventDispatcher = typedEntries): Boolean {
val currentDispatch = getCurrentEvent() ?: return false
return events.contains(currentDispatch)
@ -119,7 +107,7 @@ enum class EventDispatcher {
fun getCurrentEvent(): EventDispatcher? {
val threadId = Thread.currentThread().id
values().forEach { event ->
typedEntries.forEach { event ->
if (threadIds[event.ordinal].value == threadId) {
return event
}
@ -141,18 +129,17 @@ enum class EventDispatcher {
if (DEBUG_EVENTS) {
val id = traceId.getAndIncrement()
queues[eventId].add(Runnable {
executors[eventId].submit {
logger.debug { "Starting $event : $id" }
function()
logger.debug { "Finished $event : $id" }
})
}
} else {
queues[eventId].add(Runnable {
function()
})
executors[eventId].submit(function)
}
}
fun launchSequentially(endEvent: EventDispatcher, function: () -> Unit) {
// If one of our callbacks requested a shutdown, we wait until all callbacks have run ... THEN shutdown
val event = getCurrentEvent()
@ -165,7 +152,7 @@ enum class EventDispatcher {
// this problem is solved by running AGAIN after we have finished running whatever event dispatcher we are currently on
// MORE SPECIFICALLY, we must run at the end of our current one, but repeatedly until CLOSE
EventDispatcher.launch(values()[index+1]) {
EventDispatcher.launch(typedEntries[index+1]) {
launchSequentially(endEvent, function)
}
} else {