Commit 921f6a3d authored by nanahira's avatar nanahira

update message buffer

parent 3002f53d
...@@ -38,7 +38,8 @@ routes: ...@@ -38,7 +38,8 @@ routes:
heartbeat: 3000 # 心跳包的间隔。0 或不填为禁用心跳包。 heartbeat: 3000 # 心跳包的间隔。0 或不填为禁用心跳包。
readonly: false # 该路由是否为只读。只读路由的连接无法对机器人进行写操作,只会得到模拟响应,但是可以进行 get 操作以及接收事件。 readonly: false # 该路由是否为只读。只读路由的连接无法对机器人进行写操作,只会得到模拟响应,但是可以进行 get 操作以及接收事件。
rateLimitInterval: 500 # 限速调用间隔,默认 500ms。 rateLimitInterval: 500 # 限速调用间隔,默认 500ms。
bufferMessage: false # 是否在 app 断线期间缓存消息,并在 app 恢复连接时发送。 bufferAppMessage: false # 是否在 app 断线期间缓存消息,并在 app 恢复连接时发送。
bufferBotMessage: false # 是否在机器人断线期间缓存消息,并在机器人恢复连接时发送。
wsReverse: # 该路由的反向 WebSocket 配置。可以配置多个。 wsReverse: # 该路由的反向 WebSocket 配置。可以配置多个。
- endpoint: 'ws://localhost:8080' - endpoint: 'ws://localhost:8080'
token: 'token' token: 'token'
......
...@@ -25,7 +25,8 @@ routes: ...@@ -25,7 +25,8 @@ routes:
heartbeat: 3000 # 心跳包的间隔。0 或不填为禁用心跳包。 heartbeat: 3000 # 心跳包的间隔。0 或不填为禁用心跳包。
readonly: false # 该路由是否为只读。只读路由的连接无法对机器人进行写操作,只会得到模拟响应,但是可以进行 get 操作以及接收事件。 readonly: false # 该路由是否为只读。只读路由的连接无法对机器人进行写操作,只会得到模拟响应,但是可以进行 get 操作以及接收事件。
rateLimitInterval: 500 # 限速调用间隔,默认 500ms。 rateLimitInterval: 500 # 限速调用间隔,默认 500ms。
bufferMessage: false # 是否在 app 断线期间缓存消息,并在 app 恢复连接时发送。 bufferAppMessage: false # 是否在 app 断线期间缓存消息,并在 app 恢复连接时发送。
bufferBotMessage: false # 是否在机器人断线期间缓存消息,并在机器人恢复连接时发送。
wsReverse: # 该路由的反向 WebSocket 配置。可以配置多个。 wsReverse: # 该路由的反向 WebSocket 配置。可以配置多个。
- endpoint: 'ws://localhost:8080' - endpoint: 'ws://localhost:8080'
token: 'token' token: 'token'
......
...@@ -10,6 +10,7 @@ import { ReverseWsService } from './reverse-ws/reverse-ws.service'; ...@@ -10,6 +10,7 @@ import { ReverseWsService } from './reverse-ws/reverse-ws.service';
import { WaitBotService } from './wait-bot/wait-bot.service'; import { WaitBotService } from './wait-bot/wait-bot.service';
import { HealthService } from './health/health.service'; import { HealthService } from './health/health.service';
import { HealthController } from './health/health.controller'; import { HealthController } from './health/health.controller';
import { BotRegistryService } from './bot-registry/bot-registry.service';
@Module({ @Module({
imports: [ imports: [
...@@ -31,6 +32,7 @@ import { HealthController } from './health/health.controller'; ...@@ -31,6 +32,7 @@ import { HealthController } from './health/health.controller';
ReverseWsService, ReverseWsService,
WaitBotService, WaitBotService,
HealthService, HealthService,
BotRegistryService,
], ],
controllers: [HealthController], controllers: [HealthController],
}) })
......
import { Test, TestingModule } from '@nestjs/testing';
import { BotRegistryService } from './bot-registry.service';
describe('BotRegistryService', () => {
let service: BotRegistryService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [BotRegistryService],
}).compile();
service = module.get<BotRegistryService>(BotRegistryService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import { Injectable } from '@nestjs/common';
import { WireContextService } from 'koishi-nestjs';
import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot';
@Injectable()
export class BotRegistryService {
@WireContextService('bots')
private bots: OneBotBot[];
private botMap = new Map<string, OneBotBot>();
getBotWithId(selfId: string) {
if (!this.botMap.has(selfId)) {
const bot = this.bots.find((bot) => bot.selfId === selfId);
if (bot) {
this.botMap.set(selfId, bot);
}
}
return this.botMap.get(selfId);
}
getAllBots() {
return this.bots;
}
}
...@@ -3,12 +3,13 @@ import { RouteService } from '../route/route.service'; ...@@ -3,12 +3,13 @@ import { RouteService } from '../route/route.service';
import { InjectContext } from 'koishi-nestjs'; import { InjectContext } from 'koishi-nestjs';
import { Context } from 'koishi'; import { Context } from 'koishi';
import { HealthInfoDto } from '../dto/HealthInfo.dto'; import { HealthInfoDto } from '../dto/HealthInfo.dto';
import { BotRegistryService } from '../bot-registry/bot-registry.service';
@Injectable() @Injectable()
export class HealthService { export class HealthService {
constructor( constructor(
private readonly routeService: RouteService, private readonly routeService: RouteService,
@InjectContext() private readonly ctx: Context, private readonly botRegistry: BotRegistryService,
) {} ) {}
healthOfAllRoutes() { healthOfAllRoutes() {
...@@ -20,13 +21,13 @@ export class HealthService { ...@@ -20,13 +21,13 @@ export class HealthService {
} }
healthOfAllBots() { healthOfAllBots() {
return this.ctx.bots.map( return this.botRegistry
(b) => new HealthInfoDto(b.selfId, b.status === 'online'), .getAllBots()
); .map((b) => new HealthInfoDto(b.selfId, b.status === 'online'));
} }
healthOfBot(selfId: string) { healthOfBot(selfId: string) {
const bot = this.ctx.bots.find((b) => b.selfId === selfId); const bot = this.botRegistry.getBotWithId(selfId);
if (!bot) { if (!bot) {
return; return;
} }
......
...@@ -10,6 +10,7 @@ import { ...@@ -10,6 +10,7 @@ import {
} from '../utility/onebot-protocol'; } from '../utility/onebot-protocol';
import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot'; import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot';
import { WaitBotService } from '../wait-bot/wait-bot.service'; import { WaitBotService } from '../wait-bot/wait-bot.service';
import { BotRegistryService } from '../bot-registry/bot-registry.service';
export interface SendTask { export interface SendTask {
bot: OneBotBot; bot: OneBotBot;
...@@ -20,8 +21,8 @@ export interface SendTask { ...@@ -20,8 +21,8 @@ export interface SendTask {
@Injectable() @Injectable()
export class MessageService extends ConsoleLogger { export class MessageService extends ConsoleLogger {
constructor( constructor(
@InjectContext() private ctx: Context, private readonly botRegistry: BotRegistryService,
private waitBot: WaitBotService, private readonly waitBot: WaitBotService,
) { ) {
super('message'); super('message');
} }
...@@ -66,8 +67,28 @@ export class MessageService extends ConsoleLogger { ...@@ -66,8 +67,28 @@ export class MessageService extends ConsoleLogger {
client.send(JSON.stringify(genMetaEvent(route.selfId, 'enable'))); client.send(JSON.stringify(genMetaEvent(route.selfId, 'enable')));
} }
private isRouteBotHealthy(route: Route): boolean {
const bot = this.botRegistry.getBotWithId(route.selfId);
return bot && bot.status === 'online';
}
private async sendToBot(task: SendTask) { private async sendToBot(task: SendTask) {
await this.waitBot.waitForBotOnline(task.bot); if (!this.isRouteBotHealthy(task.route)) {
if (task.route.bufferBotMessage) {
await this.waitBot.waitForBotOnline(task.bot);
} else {
return {
retcode: 1404,
status: 'failed',
data: null,
error: {
code: 1404,
message: `Bot ${task.route.selfId} from ${task.route.name} not online.`,
},
echo: task.data?.echo,
};
}
}
try { try {
const result = await task.bot.internal._request( const result = await task.bot.internal._request(
task.data.action, task.data.action,
...@@ -98,9 +119,7 @@ export class MessageService extends ConsoleLogger { ...@@ -98,9 +119,7 @@ export class MessageService extends ConsoleLogger {
} }
private async onWsEvent(route: Route, data: OnebotProtocol) { private async onWsEvent(route: Route, data: OnebotProtocol) {
const bot = this.ctx.bots.find( const bot = this.botRegistry.getBotWithId(route.selfId);
(b) => b.selfId === route.selfId && b.platform === 'onebot',
) as OneBotBot;
if (!bot) { if (!bot) {
this.error(`Bot ${route.selfId} from ${route.name} not found.`); this.error(`Bot ${route.selfId} from ${route.name} not found.`);
return { return {
...@@ -153,6 +172,10 @@ export class MessageService extends ConsoleLogger { ...@@ -153,6 +172,10 @@ export class MessageService extends ConsoleLogger {
} }
private async resolveSendTaskOfRoute(route: Route) { private async resolveSendTaskOfRoute(route: Route) {
const bot = this.botRegistry.getBotWithId(route.selfId);
if (!bot || bot.status !== 'online') {
return;
}
const task = route.fetchSendTask(); const task = route.fetchSendTask();
if (!task) { if (!task) {
return; return;
......
...@@ -24,7 +24,8 @@ export interface RouteConfig { ...@@ -24,7 +24,8 @@ export interface RouteConfig {
readonly?: boolean; readonly?: boolean;
rateLimitInterval?: number; rateLimitInterval?: number;
reverseWs?: ReverseWsConfig[]; reverseWs?: ReverseWsConfig[];
bufferMessage?: boolean; bufferAppMessage?: boolean;
bufferBotMessage?: boolean;
} }
export class Route implements RouteConfig { export class Route implements RouteConfig {
private connections: WebSocket[] = []; private connections: WebSocket[] = [];
...@@ -40,7 +41,8 @@ export class Route implements RouteConfig { ...@@ -40,7 +41,8 @@ export class Route implements RouteConfig {
reverseWs?: ReverseWsConfig[]; reverseWs?: ReverseWsConfig[];
readonly?: boolean; readonly?: boolean;
rateLimitInterval: number; rateLimitInterval: number;
bufferMessage?: boolean; bufferAppMessage?: boolean;
bufferBotMessage?: boolean;
preMessages: { data: any; session: Session }[] = []; preMessages: { data: any; session: Session }[] = [];
constructor(routeConfig: RouteConfig, ctx: Context) { constructor(routeConfig: RouteConfig, ctx: Context) {
Object.assign(this, routeConfig); Object.assign(this, routeConfig);
...@@ -68,7 +70,7 @@ export class Route implements RouteConfig { ...@@ -68,7 +70,7 @@ export class Route implements RouteConfig {
} }
send(data: any, session: Session, allConns = this.connections) { send(data: any, session: Session, allConns = this.connections) {
if (!allConns.length) { if (!allConns.length) {
if (this.bufferMessage) { if (this.bufferAppMessage) {
this.preMessages.push({ data, session }); this.preMessages.push({ data, session });
} }
return; return;
...@@ -160,7 +162,7 @@ export class Route implements RouteConfig { ...@@ -160,7 +162,7 @@ export class Route implements RouteConfig {
} }
addConnection(conn: WebSocket) { addConnection(conn: WebSocket) {
this.connections.push(conn); this.connections.push(conn);
if (!this.bufferMessage) { if (!this.bufferAppMessage) {
return; return;
} }
const preMessages = this.preMessages; const preMessages = this.preMessages;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment