Commit 7fff235a authored by Him188's avatar Him188

Ensure sequential listener invoking, ensure contextual equals

parent df1fbbe2
......@@ -57,19 +57,33 @@ suspend inline fun <reified T : MessagePacket<*, *>> T.whileSelectMessages(
) = withTimeoutOrCoroutineScope(timeoutMillis) {
var deferred: CompletableDeferred<Boolean>? = CompletableDeferred()
// ensure sequential invoking
val listeners: MutableList<Pair<T.(String) -> Boolean, MessageListener<T, Any?>>> = mutableListOf()
MessageSelectBuilder<T, Boolean>(SELECT_MESSAGE_STUB) { filter: T.(String) -> Boolean, listener: MessageListener<T, Any?> ->
subscribeAlways<T> {
if (deferred?.isCompleted == false && isActive) {
val toString = it.message.toString()
if (filter.invoke(it, toString)) {
val value = listener.invoke(it, toString)
listeners += filter to listener
}.apply(selectBuilder)
// ensure atomic completing
subscribeAlways<T>(concurrency = Listener.ConcurrencyKind.LOCKED) { event ->
if (!this.isContextIdenticalWith(this@whileSelectMessages))
return@subscribeAlways
listeners.forEach { (filter, listener) ->
if (deferred?.isCompleted != false || !isActive)
return@subscribeAlways
val toString = event.message.toString()
if (filter.invoke(event, toString)) {
listener.invoke(event, toString).let { value ->
if (value !== SELECT_MESSAGE_STUB) {
deferred?.complete(value as Boolean)
return@subscribeAlways // accept the first value only
}
}
}
}
}.apply(selectBuilder)
}
while (deferred?.await() == true) {
deferred = CompletableDeferred()
......@@ -120,22 +134,32 @@ suspend inline fun <reified T : MessagePacket<*, *>, R> T.selectMessages(
): R = withTimeoutOrCoroutineScope(timeoutMillis) {
val deferred = CompletableDeferred<R>()
// ensure sequential invoking
val listeners: MutableList<Pair<T.(String) -> Boolean, MessageListener<T, Any?>>> = mutableListOf()
MessageSelectBuilder<T, R>(SELECT_MESSAGE_STUB) { filter: T.(String) -> Boolean, listener: MessageListener<T, Any?> ->
subscribeAlways<T> {
if (!this.isContextIdenticalWith(this@selectMessages))
return@subscribeAlways
listeners += filter to listener
}.apply(selectBuilder)
subscribeAlways<T> { event ->
if (!this.isContextIdenticalWith(this@selectMessages))
return@subscribeAlways
val toString = it.message.toString()
if (!filter.invoke(it, toString))
listeners.forEach { (filter, listener) ->
if (deferred.isCompleted || !isActive)
return@subscribeAlways
val value = listener.invoke(it, toString)
if (value !== SELECT_MESSAGE_STUB) {
@Suppress("UNCHECKED_CAST")
deferred.complete(value as R)
val toString = event.message.toString()
if (filter.invoke(event, toString)) {
val value = listener.invoke(event, toString)
if (value !== SELECT_MESSAGE_STUB) {
@Suppress("UNCHECKED_CAST")
deferred.complete(value as R)
return@subscribeAlways
}
}
}
}.apply(selectBuilder)
}
deferred.await().also { coroutineContext[Job]!!.cancelChildren() }
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment