Commit 22175eaa authored by Him188's avatar Him188

Rewrite events

parent e56ef7ba
......@@ -3,9 +3,6 @@
package net.mamoe.mirai.event
import net.mamoe.mirai.event.internal.broadcastInternal
import net.mamoe.mirai.utils.DefaultLogger
import net.mamoe.mirai.utils.MiraiLogger
import net.mamoe.mirai.utils.withSwitch
/**
* 可被监听的.
......@@ -13,9 +10,7 @@ import net.mamoe.mirai.utils.withSwitch
* 可以是任何 class 或 object.
*
* @see subscribeAlways
* @see subscribeOnce
* @see subscribeWhile
* @see subscribeAll
*
* @see subscribeMessages
*/
......@@ -47,19 +42,6 @@ abstract class Event : Subscribable {
fun cancel() {
cancelled = true
}
init {
if (EventDebuggingFlag) {
EventDebugLogger.debug(this::class.simpleName + " created")
}
}
}
internal object EventDebugLogger : MiraiLogger by DefaultLogger("Event").withSwitch(EventDebuggingFlag)
private val EventDebuggingFlag: Boolean by lazy {
// avoid 'Condition is always true'
false
}
/**
......@@ -74,19 +56,10 @@ interface Cancellable : Subscribable {
/**
* 广播一个事件的唯一途径.
*/
@Suppress("UNCHECKED_CAST")
suspend fun <E : Subscribable> E.broadcast(): E {
if (EventDebuggingFlag) {
EventDebugLogger.debug(this::class.simpleName + " pre broadcast")
}
try {
@Suppress("EXPERIMENTAL_API_USAGE")
return this@broadcast.broadcastInternal()
} finally {
if (EventDebuggingFlag) {
EventDebugLogger.debug(this::class.simpleName + " after broadcast")
}
}
@Suppress("EXPERIMENTAL_API_USAGE")
this@broadcast.broadcastInternal() // inline, no extra cost
return this
}
/**
......
@file:Suppress("unused")
package net.mamoe.mirai.event
import net.mamoe.mirai.Bot
import net.mamoe.mirai.event.internal.HandlerWithSession
import net.mamoe.mirai.event.internal.Listener
import net.mamoe.mirai.event.internal.subscribeInternal
import kotlin.reflect.KClass
/**
* 该文件为所有的含 Bot 的事件的订阅方法
*
* 与不含 bot 的相比, 在监听时将会有 `this: Bot`
* 在 demo 中找到实例可很快了解区别.
*/
// region 顶层方法
suspend inline fun <reified E : Subscribable> Bot.subscribe(noinline handler: suspend Bot.(E) -> ListeningStatus): Listener<E> =
E::class.subscribe(this, handler)
suspend inline fun <reified E : Subscribable> Bot.subscribeAlways(noinline listener: suspend Bot.(E) -> Unit): Listener<E> =
E::class.subscribeAlways(this, listener)
suspend inline fun <reified E : Subscribable> Bot.subscribeOnce(noinline listener: suspend Bot.(E) -> Unit): Listener<E> =
E::class.subscribeOnce(this, listener)
suspend inline fun <reified E : Subscribable, T> Bot.subscribeUntil(valueIfStop: T, noinline listener: suspend Bot.(E) -> T): Listener<E> =
E::class.subscribeUntil(this, valueIfStop, listener)
suspend inline fun <reified E : Subscribable> Bot.subscribeUntilFalse(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
E::class.subscribeUntilFalse(this, listener)
suspend inline fun <reified E : Subscribable> Bot.subscribeUntilTrue(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
E::class.subscribeUntilTrue(this, listener)
suspend inline fun <reified E : Subscribable> Bot.subscribeUntilNull(noinline listener: suspend Bot.(E) -> Any?): Listener<E> =
E::class.subscribeUntilNull(this, listener)
suspend inline fun <reified E : Subscribable, T> Bot.subscribeWhile(valueIfContinue: T, noinline listener: suspend Bot.(E) -> T): Listener<E> =
E::class.subscribeWhile(this, valueIfContinue, listener)
suspend inline fun <reified E : Subscribable> Bot.subscribeWhileFalse(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
E::class.subscribeWhileFalse(this, listener)
suspend inline fun <reified E : Subscribable> Bot.subscribeWhileTrue(noinline listener: suspend Bot.(E) -> Boolean): Listener<E> =
E::class.subscribeWhileTrue(this, listener)
suspend inline fun <reified E : Subscribable> Bot.subscribeWhileNull(noinline listener: suspend Bot.(E) -> Any?): Listener<E> =
E::class.subscribeWhileNull(this, listener)
// endregion
// region KClass 的扩展方法 (仅内部使用)
@PublishedApi
internal suspend fun <E : Subscribable> KClass<E>.subscribe(bot: Bot, handler: suspend Bot.(E) -> ListeningStatus) =
this.subscribeInternal(HandlerWithSession(bot, handler))
@PublishedApi
internal suspend fun <E : Subscribable> KClass<E>.subscribeAlways(bot: Bot, listener: suspend Bot.(E) -> Unit) =
this.subscribeInternal(HandlerWithSession(bot) { listener(it); ListeningStatus.LISTENING })
@PublishedApi
internal suspend fun <E : Subscribable> KClass<E>.subscribeOnce(bot: Bot, listener: suspend Bot.(E) -> Unit) =
this.subscribeInternal(HandlerWithSession(bot) { listener(it); ListeningStatus.STOPPED })
@PublishedApi
internal suspend fun <E : Subscribable, T> KClass<E>.subscribeUntil(bot: Bot, valueIfStop: T, listener: suspend Bot.(E) -> T) =
subscribeInternal(HandlerWithSession(bot) { if (listener(it) === valueIfStop) ListeningStatus.STOPPED else ListeningStatus.LISTENING })
@PublishedApi
internal suspend inline fun <E : Subscribable> KClass<E>.subscribeUntilFalse(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
subscribeUntil(bot, false, listener)
@PublishedApi
internal suspend inline fun <E : Subscribable> KClass<E>.subscribeUntilTrue(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
subscribeUntil(bot, true, listener)
@PublishedApi
internal suspend inline fun <E : Subscribable> KClass<E>.subscribeUntilNull(bot: Bot, noinline listener: suspend Bot.(E) -> Any?) =
subscribeUntil(bot, null, listener)
@PublishedApi
internal suspend fun <E : Subscribable, T> KClass<E>.subscribeWhile(bot: Bot, valueIfContinue: T, listener: suspend Bot.(E) -> T) =
subscribeInternal(HandlerWithSession(bot) { if (listener(it) !== valueIfContinue) ListeningStatus.STOPPED else ListeningStatus.LISTENING })
@PublishedApi
internal suspend inline fun <E : Subscribable> KClass<E>.subscribeWhileFalse(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
subscribeWhile(bot, false, listener)
@PublishedApi
internal suspend inline fun <E : Subscribable> KClass<E>.subscribeWhileTrue(bot: Bot, noinline listener: suspend Bot.(E) -> Boolean) =
subscribeWhile(bot, true, listener)
@PublishedApi
internal suspend inline fun <E : Subscribable> KClass<E>.subscribeWhileNull(bot: Bot, noinline listener: suspend Bot.(E) -> Any?) =
subscribeWhile(bot, null, listener)
// endregion
\ No newline at end of file
package net.mamoe.mirai.event.internal
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import net.mamoe.mirai.Bot
import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.event.ListeningStatus
import net.mamoe.mirai.event.Subscribable
import net.mamoe.mirai.event.events.BotEvent
import net.mamoe.mirai.utils.LockFreeLinkedList
import net.mamoe.mirai.utils.io.logStacktrace
import net.mamoe.mirai.utils.unsafeWeakRef
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.jvm.JvmField
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
/**
* 设置为 `true` 以关闭事件.
......@@ -24,15 +19,8 @@ import kotlin.reflect.KFunction
*/
var EventDisabled = false
/**
* 监听和广播实现.
* 它会首先检查这个事件是否正在被广播
* - 如果是, 则将新的监听者放入缓存中. 在当前广播结束后转移到主列表 (通过一个协程完成)
* - 如果不是, 则直接将新的监听者放入主列表
*
* @author Him188moe
*/ // inline to avoid a Continuation creation
internal suspend inline fun <L : Listener<E>, E : Subscribable> KClass<E>.subscribeInternal(listener: L): L {
@PublishedApi
internal fun <L : Listener<E>, E : Subscribable> KClass<out E>.subscribeInternal(listener: L): L {
this.listeners().addLast(listener)
return listener
}
......@@ -40,18 +28,15 @@ internal suspend inline fun <L : Listener<E>, E : Subscribable> KClass<E>.subscr
/**
* 事件监听器.
*
* 它实现 [CompletableJob] 接口,
* 可通过 [CompletableJob.complete] 来正常结束监听, 或 [CompletableJob.completeExceptionally] 来异常地结束监听.
*
* @author Him188moe
*/
sealed class Listener<in E : Subscribable> : CompletableJob {
abstract suspend fun onEvent(event: E): ListeningStatus
internal sealed class ListenerImpl<in E : Subscribable> : Listener<E> {
abstract override suspend fun onEvent(event: E): ListeningStatus
}
@PublishedApi
@Suppress("FunctionName")
internal suspend inline fun <E : Subscribable> Handler(noinline handler: suspend (E) -> ListeningStatus): Handler<E> {
internal fun <E : Subscribable> CoroutineScope.Handler(handler: suspend (E) -> ListeningStatus): Handler<E> {
return Handler(coroutineContext[Job], coroutineContext, handler)
}
......@@ -60,14 +45,15 @@ internal suspend inline fun <E : Subscribable> Handler(noinline handler: suspend
*/
@PublishedApi
internal class Handler<in E : Subscribable>
@PublishedApi internal constructor(parentJob: Job?, private val context: CoroutineContext, @JvmField val handler: suspend (E) -> ListeningStatus) :
Listener<E>(), CompletableJob by Job(parentJob) {
@PublishedApi internal constructor(parentJob: Job?, private val subscriberContext: CoroutineContext, @JvmField val handler: suspend (E) -> ListeningStatus) :
ListenerImpl<E>(), CompletableJob by Job(parentJob) {
override suspend fun onEvent(event: E): ListeningStatus {
if (isCompleted || isCancelled) return ListeningStatus.STOPPED
if (!isActive) return ListeningStatus.LISTENING
return try {
withContext(context) { handler.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() }
// Inherit context.
withContext(subscriberContext) { handler.invoke(event) }.also { if (it == ListeningStatus.STOPPED) this.complete() }
} catch (e: Throwable) {
e.logStacktrace()
//this.completeExceptionally(e)
......@@ -76,96 +62,51 @@ internal class Handler<in E : Subscribable>
}
}
@PublishedApi
@Suppress("FunctionName")
internal suspend inline fun <E : Subscribable> HandlerWithSession(
bot: Bot,
noinline handler: suspend Bot.(E) -> ListeningStatus
): HandlerWithSession<E> {
return HandlerWithSession(bot, coroutineContext[Job], coroutineContext, handler)
}
/**
* 带有 bot 筛选的监听器.
* 所有的非 [BotEvent] 的事件都不会被处理
* 所有的 [BotEvent.bot] `!==` `bot` 的事件都不会被处理
*/
@PublishedApi
internal class HandlerWithSession<E : Subscribable> @PublishedApi internal constructor(
bot: Bot,
parentJob: Job?, private val context: CoroutineContext, @JvmField val handler: suspend Bot.(E) -> ListeningStatus
) : Listener<E>(), CompletableJob by Job(parentJob) {
val bot: Bot by bot.unsafeWeakRef()
override suspend fun onEvent(event: E): ListeningStatus {
if (isCompleted || isCancelled) return ListeningStatus.STOPPED
if (!isActive) return ListeningStatus.LISTENING
if (event !is BotEvent || event.bot !== bot) return ListeningStatus.LISTENING
return try {
withContext(context) { bot.handler(event) }.also { if (it == ListeningStatus.STOPPED) complete() }
} catch (e: Throwable) {
e.logStacktrace()
//completeExceptionally(e)
complete()
ListeningStatus.STOPPED
}
}
}
/**
* 这个事件类的监听器 list
*/
internal suspend fun <E : Subscribable> KClass<out E>.listeners(): EventListeners<E> = EventListenerManger.get(this)
internal class EventListeners<E : Subscribable> : LockFreeLinkedList<Listener<E>>() {
init {
this::class.members.filterIsInstance<KFunction<*>>().forEach {
if (it.name == "add") {
it.isExternal
}
}
}
}
internal fun <E : Subscribable> KClass<out E>.listeners(): EventListeners<E> = EventListenerManger.get(this)
internal class EventListeners<E : Subscribable> : LockFreeLinkedList<Listener<E>>()
/**
* 管理每个事件 class 的 [EventListeners].
* [EventListeners] 是 lazy 的: 它们只会在被需要的时候才创建和存储.
*/
internal object EventListenerManger {
private val registries: MutableMap<KClass<out Subscribable>, EventListeners<*>> = mutableMapOf()
private val registriesMutex = Mutex()
private data class Registry<E : Subscribable>(val clazz: KClass<E>, val listeners: EventListeners<E>)
@Suppress("UNCHECKED_CAST")
internal suspend fun <E : Subscribable> get(clazz: KClass<out E>): EventListeners<E> =
if (registries.containsKey(clazz)) registries[clazz] as EventListeners<E>
else registriesMutex.withLock {
EventListeners<E>().let {
registries[clazz] = it
return it
}
}
private val registries = LockFreeLinkedList<Registry<*>>()
@Suppress("UNCHECKED_CAST")
internal fun <E : Subscribable> get(clazz: KClass<out E>): EventListeners<E> {
return registries.filteringGetOrAdd({ it.clazz == clazz }) {
Registry(
clazz,
EventListeners()
)
}.listeners as EventListeners<E>
}
}
// inline: NO extra Continuation
internal suspend inline fun <E : Subscribable> E.broadcastInternal(): E {
if (EventDisabled) return this
internal suspend inline fun Subscribable.broadcastInternal() {
if (EventDisabled) return
callAndRemoveIfRequired(this::class.listeners())
this::class.supertypes.forEach { superType ->
if (Subscribable::class.isInstance(superType)) {
// the super type is a child of Subscribable, then we can cast.
val superListeners =
@Suppress("UNCHECKED_CAST")
callAndRemoveIfRequired((superType.classifier as KClass<out Subscribable>).listeners())
}
(superType.classifier as? KClass<out Subscribable>)?.listeners() ?: return // return if super type is not Subscribable
callAndRemoveIfRequired(superListeners)
}
return this
return
}
private suspend inline fun <E : Subscribable> E.callAndRemoveIfRequired(listeners: EventListeners<E>) {
// atomic foreach
listeners.forEach {
if (it.onEvent(this) == ListeningStatus.STOPPED) {
listeners.remove(it) // atomic remove
......
package net.mamoe.mirai.event
import kotlinx.coroutines.runBlocking
import net.mamoe.mirai.test.shouldBeEqualTo
import kotlin.system.exitProcess
import kotlin.test.Test
class TestEvent : Subscribable {
var triggered = false
}
class EventTests {
@Test
fun testSubscribeInplace() {
runBlocking {
val subscriber = subscribeAlways<TestEvent> {
triggered = true
println("Triggered")
}
TestEvent().broadcast().triggered shouldBeEqualTo true
subscriber.complete()
println("finished")
}
}
@Test
fun testSubscribeGlobalScope() {
runBlocking {
TestEvent().broadcast().triggered shouldBeEqualTo true
println("finished")
}
}
companion object {
@JvmStatic
fun main(args: Array<String>) {
EventTests().testSubscribeGlobalScope()
exitProcess(0)
}
}
}
\ No newline at end of file
......@@ -51,17 +51,6 @@ suspend fun main() {
bot.messageDSL()
directlySubscribe(bot)
//DSL 监听
subscribeAll<FriendMessage> {
always {
//获取第一个纯文本消息
val firstText = it.message.firstOrNull<PlainText>()
}
}
demo2()
bot.network.awaitDisconnection()//等到直到断开连接
}
......@@ -199,9 +188,22 @@ suspend fun Bot.messageDSL() {
*/
@Suppress("UNUSED_VARIABLE")
suspend fun directlySubscribe(bot: Bot) {
// 在当前协程作用域 (CoroutineScope) 下创建一个子 Job, 监听一个事件.
//
// 手动处理消息
// 使用 Bot 的扩展方法监听, 将在处理事件时得到一个 this: Bot.
// 这样可以调用 Bot 内的一些扩展方法如 UInt.qq():QQ
//
// 这个函数返回 Listener, Listener 是一个 CompletableJob. 如果不手动 close 它, 它会一直阻止当前 CoroutineScope 结束.
// 例如:
// ```kotlin
// runBlocking {// this: CoroutineScope
// bot.subscribeAlways<FriendMessage> {
// }
// }
// ```
// 则这个 `runBlocking` 永远不会结束, 因为 `subscribeAlways` 在 `runBlocking` 的 `CoroutineScope` 下创建了一个 Job.
// 正确的用法为:
bot.subscribeAlways<FriendMessage> {
// this: Bot
// it: FriendMessageEvent
......@@ -245,21 +247,4 @@ suspend fun directlySubscribe(bot: Bot) {
it.message eq "发图片2" -> it.reply(PlainText("test") + Image(ImageId("{7AA4B3AA-8C3C-0F45-2D9B-7F302A0ACEAA}.jpg")))
}
}
}
/**
* 实现功能:
* 对机器人说 "记笔记", 机器人记录之后的所有消息.
* 对机器人说 "停止", 机器人停止
*/
suspend fun demo2() {
subscribeAlways<FriendMessage> { event ->
if (event.message eq "记笔记") {
subscribeUntilFalse<FriendMessage> {
it.reply("你发送了 ${it.message}")
it.message eq "停止"
}
}
}
}
\ No newline at end of file
......@@ -37,7 +37,7 @@ private fun readTestAccount(): BotAccount? {
val lines = file.readLines()
return try {
BotAccount(lines[0].toLong(), lines[1])
} catch (e: Exception) {
} catch (e: Throwable) {
null
}
}
......@@ -54,16 +54,15 @@ suspend fun main() {
/**
* 监听所有事件
*/
subscribeAlways<Subscribable> {
//bot.logger.verbose("收到了一个事件: ${it::class.simpleName}")
GlobalScope.subscribeAlways<Subscribable> {
bot.logger.verbose("收到了一个事件: $this")
}
subscribeAlways<ReceiveFriendAddRequestEvent> {
GlobalScope.subscribeAlways<ReceiveFriendAddRequestEvent> {
it.approve()
}
bot.subscribeGroupMessages {
GlobalScope.subscribeGroupMessages {
"群资料" reply {
group.updateGroupInfo().toString().reply()
}
......@@ -85,7 +84,11 @@ suspend fun main() {
}
bot.subscribeMessages {
always {
}
case("at me") { At(sender).reply() }
// 等同于 "at me" reply { At(sender) }
"你好" reply "你好!"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment