Commit 9924d37e authored by Him188's avatar Him188

Misc improvements

parent 20af1fc3
......@@ -15,6 +15,7 @@ import kotlin.test.Test
import kotlin.test.assertFalse
import kotlin.test.assertTrue
/*
internal class AtomicResizeCacheListTest {
@Test
......@@ -38,4 +39,4 @@ internal class AtomicResizeCacheListTest {
assertTrue { list.ensureNoDuplication(1) }
}
}
}
\ No newline at end of file
}*/
\ No newline at end of file
......@@ -13,10 +13,12 @@ package net.mamoe.mirai.event
import kotlinx.atomicfu.atomic
import net.mamoe.mirai.event.internal.broadcastInternal
import net.mamoe.mirai.utils.MiraiExperimentalAPI
import net.mamoe.mirai.utils.MiraiInternalAPI
import net.mamoe.mirai.utils.PlannedRemoval
import net.mamoe.mirai.utils.SinceMirai
import kotlin.jvm.JvmSynthetic
import kotlin.jvm.Volatile
/**
* 可被监听的类, 可以是任何 class 或 object.
......@@ -34,6 +36,20 @@ import kotlin.jvm.JvmSynthetic
* @see [subscribe] 监听事件
*/
interface Event {
/**
* 事件是否已被拦截.
*
* 所有事件都可以被拦截, 拦截后低优先级的监听器将不会处理到这个事件.
*/
@SinceMirai("1.0.0")
val isIntercepted: Boolean
/**
* 拦截这个事件
*/
@SinceMirai("1.0.0")
fun intercept()
@Deprecated(
"""
......@@ -47,6 +63,8 @@ interface Event {
/**
* 所有实现了 [Event] 接口的类都应该继承的父类.
*
* 在使用事件时应使用类型 [Event]. 在实现自定义事件时应继承 [AbstractEvent].
*/
@SinceMirai("1.0.0")
abstract class AbstractEvent : Event {
......@@ -56,40 +74,20 @@ abstract class AbstractEvent : Event {
final override val DoNotImplementThisClassButExtendAbstractEvent: Nothing
get() = throw Error("Shouldn't be reached")
@Volatile
private var _intercepted = false
private val _cancelled = atomic(false)
/**
* 事件是否已被拦截.
*
* 所有事件都可以被拦截, 拦截后低优先级的监听器将不会处理到这个事件.
*/
@SinceMirai("1.0.0")
val isIntercepted: Boolean
get() = _intercepted
// 实现 Event
override val isIntercepted: Boolean get() = _intercepted
/**
* 拦截这个事件.
* 重复拦截时不会抛出异常.
*/
@SinceMirai("1.0.0")
fun intercept() {
override fun intercept() {
_intercepted = true
}
/**
* 事件是否已取消.
*
* 事件需实现 [CancellableEvent] 接口才可以被取消,
* 否则此属性固定返回 false.
*/
// 实现 CancellableEvent
val isCancelled: Boolean get() = _cancelled.value
/**
* 取消这个事件.
* 重复取消时不会抛出异常.
*/
fun cancel() {
check(this is CancellableEvent) {
"Event $this is not cancellable"
......@@ -103,12 +101,18 @@ abstract class AbstractEvent : Event {
*/
interface CancellableEvent : Event {
/**
* 事件是否已取消.
* 事件是否已被取消.
*
* 事件需实现 [CancellableEvent] 接口才可以被取消,
* 否则此属性固定返回 false.
*/
val isCancelled: Boolean
/**
* 取消这个事件.
* 事件需实现 [CancellableEvent] 接口才可以被取消
*
* @throws IllegalStateException 当事件未实现接口 [CancellableEvent] 时抛出
*/
fun cancel()
}
......@@ -128,6 +132,7 @@ suspend fun <E : Event> E.broadcast(): E = apply {
* 设置为 `true` 以关闭事件.
* 所有的 `subscribe` 都能正常添加到监听器列表, 但所有的广播都会直接返回.
*/
@MiraiExperimentalAPI
var EventDisabled = false
/**
......
......@@ -14,14 +14,17 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.mamoe.mirai.event.*
import net.mamoe.mirai.utils.*
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.MiraiExperimentalAPI
import net.mamoe.mirai.utils.MiraiInternalAPI
import net.mamoe.mirai.utils.MiraiLogger
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.jvm.JvmField
import kotlin.reflect.KClass
@PublishedApi
internal fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listener: L): L {
with(GlobalEventListeners[listener.priority]) {
@Suppress("UNCHECKED_CAST")
......@@ -37,7 +40,6 @@ internal fun <L : Listener<E>, E : Event> KClass<out E>.subscribeInternal(listen
}
@PublishedApi
@Suppress("FunctionName")
internal fun <E : Event> CoroutineScope.Handler(
coroutineContext: CoroutineContext,
......@@ -53,9 +55,7 @@ internal fun <E : Event> CoroutineScope.Handler(
/**
* 事件处理器.
*/
@PublishedApi
internal class Handler<in E : Event>
@PublishedApi internal constructor(
internal class Handler<in E : Event> internal constructor(
parentJob: Job?,
subscriberContext: CoroutineContext,
@JvmField val handler: suspend (E) -> ListeningStatus,
......@@ -65,8 +65,6 @@ internal class Handler<in E : Event>
private val subscriberContext: CoroutineContext = subscriberContext + this // override Job.
@MiraiInternalAPI
val lock: Mutex? = when (concurrencyKind) {
Listener.ConcurrencyKind.LOCKED -> Mutex()
else -> null
......@@ -84,10 +82,11 @@ internal class Handler<in E : Event>
?: coroutineContext[CoroutineExceptionHandler]?.handleException(subscriberContext, e)
?: kotlin.run {
@Suppress("DEPRECATION")
MiraiLogger.warning(
"""Event processing: An exception occurred but no CoroutineExceptionHandler found,
(if (event is BotEvent) event.bot.logger else MiraiLogger)
.warning(
"""Event processing: An exception occurred but no CoroutineExceptionHandler found,
either in coroutineContext from Handler job, or in subscriberContext""".trimIndent(), e
)
)
}
// this.complete() // do not `completeExceptionally`, otherwise parentJob will fai`l.
// ListeningStatus.STOPPED
......@@ -102,6 +101,7 @@ internal class ListenerNode(
val listener: Listener<Event>,
val owner: KClass<out Event>
)
internal expect object GlobalEventListeners {
operator fun get(priority: Listener.EventPriority): LockFreeLinkedList<ListenerNode>
}
......@@ -116,51 +116,61 @@ internal expect class MiraiAtomicBoolean(initial: Boolean) {
// inline: NO extra Continuation
@Suppress("UNCHECKED_CAST")
internal suspend inline fun Event.broadcastInternal() = coroutineScope {
if (EventDisabled) return@coroutineScope
internal suspend inline fun Event.broadcastInternal() {
@OptIn(MiraiExperimentalAPI::class)
if (EventDisabled) return
callAndRemoveIfRequired(this@broadcastInternal as? AbstractEvent ?: error("Events must extends AbstractEvent"))
}
@Suppress("DuplicatedCode")
@OptIn(MiraiInternalAPI::class)
private suspend fun <E : AbstractEvent> callAndRemoveIfRequired(
internal suspend fun <E : AbstractEvent> callAndRemoveIfRequired(
event: E
) {
coroutineScope {
for (p in Listener.EventPriority.values()) {
GlobalEventListeners[p].forEachNode { eventNode ->
if (event.isIntercepted) {
return@coroutineScope
for (p in Listener.EventPriority.valuesExceptMonitor) {
GlobalEventListeners[p].forEachNode { eventNode ->
if (event.isIntercepted) {
return
}
val node = eventNode.nodeValue
if (!node.owner.isInstance(event)) return@forEachNode
val listener = node.listener
when (listener.concurrencyKind) {
Listener.ConcurrencyKind.LOCKED -> {
(listener as Handler).lock!!.withLock {
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(eventNode)
}
}
}
Listener.ConcurrencyKind.CONCURRENT -> {
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(eventNode)
}
}
val node = eventNode.nodeValue
if (!node.owner.isInstance(event)) return@forEachNode
val listener = node.listener
}
}
}
coroutineScope {
GlobalEventListeners[Listener.EventPriority.MONITOR].forEachNode { eventNode ->
if (event.isIntercepted) {
return@coroutineScope
}
val node = eventNode.nodeValue
if (!node.owner.isInstance(event)) return@forEachNode
val listener = node.listener
launch {
when (listener.concurrencyKind) {
Listener.ConcurrencyKind.LOCKED -> {
(listener as Handler).lock!!.withLock {
kotlin.runCatching {
when (listener.onEvent(event)) {
ListeningStatus.STOPPED -> {
removeNode(eventNode)
}
else -> {
}
}
}.onFailure {
// TODO("Exception catching")
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(eventNode)
}
}
}
Listener.ConcurrencyKind.CONCURRENT -> {
kotlin.runCatching {
when (listener.onEvent(event)) {
ListeningStatus.STOPPED -> {
removeNode(eventNode)
}
else -> {
}
}
}.onFailure {
// TODO("Exception catching")
if (listener.onEvent(event) == ListeningStatus.STOPPED) {
removeNode(eventNode)
}
}
}
......
......@@ -9,8 +9,6 @@
package net.mamoe.mirai.event
import kotlinx.atomicfu.AtomicInt
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import net.mamoe.mirai.event.internal.GlobalEventListeners
import net.mamoe.mirai.utils.MiraiInternalAPI
......@@ -18,7 +16,6 @@ import net.mamoe.mirai.utils.StepUtil
import net.mamoe.mirai.utils.internal.runBlocking
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.Test
import kotlin.test.assertTrue
......@@ -67,7 +64,6 @@ class EventTests {
}
kotlinx.coroutines.runBlocking {
ParentEvent().broadcast()
delay(5000L) // ?
}
val called = counter.get()
println("Registered $listeners listeners and $called called")
......@@ -109,37 +105,39 @@ class EventTests {
}
@Test
fun `test concurrent listening 2`() {
fun `test concurrent listening 2`() = runBlocking {
resetEventListeners()
val registered = AtomicInteger()
val called = AtomicInteger()
val threads = mutableListOf<Thread>()
repeat(50) {
threads.add(thread {
repeat(444) {
registered.getAndIncrement()
GlobalScope.launch {
subscribeAlways<ParentEvent> {
val supervisor = CoroutineScope(SupervisorJob())
coroutineScope {
repeat(50) {
launch {
repeat(444) {
registered.getAndIncrement()
supervisor.subscribeAlways<ParentEvent> {
called.getAndIncrement()
}
}
}
})
}
Thread.sleep(5000L)// Wait all thread started.
threads.forEach {
it.join() // Wait all finished
}
}
println("All listeners registered")
val postCount = 3
kotlinx.coroutines.runBlocking {
coroutineScope {
repeat(postCount) {
ParentEvent().broadcast()
launch { ParentEvent().broadcast() }
}
delay(5000L)
}
val calledCount = called.get()
val shouldCalled = registered.get() * postCount
println("Should call $shouldCalled times and $called called")
if (shouldCalled != calledCount) {
throw IllegalStateException("?")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment