Commit 45ebb911 authored by nanahira's avatar nanahira

finish

parent 4dc247fa
Pipeline #6526 canceled with stages
in 3 minutes and 7 seconds
<p align="center"> # onebot-lb
<a href="http://nestjs.com/" target="blank"><img src="https://nestjs.com/img/logo_text.svg" width="320" alt="Nest Logo" /></a>
</p>
[circleci-image]: https://img.shields.io/circleci/build/github/nestjs/nest/master?token=abc123def456 [OneBot](https://onebot.dev/) 负载均衡器。
[circleci-url]: https://circleci.com/gh/nestjs/nest
<p align="center">A progressive <a href="http://nodejs.org" target="_blank">Node.js</a> framework for building efficient and scalable server-side applications.</p> 目前支持 OneBot v11 的双向 WebSockets 作为应用后端,以及全部连接方式作为 Bot 后端。
<p align="center">
<a href="https://www.npmjs.com/~nestjscore" target="_blank"><img src="https://img.shields.io/npm/v/@nestjs/core.svg" alt="NPM Version" /></a>
<a href="https://www.npmjs.com/~nestjscore" target="_blank"><img src="https://img.shields.io/npm/l/@nestjs/core.svg" alt="Package License" /></a>
<a href="https://www.npmjs.com/~nestjscore" target="_blank"><img src="https://img.shields.io/npm/dm/@nestjs/common.svg" alt="NPM Downloads" /></a>
<a href="https://circleci.com/gh/nestjs/nest" target="_blank"><img src="https://img.shields.io/circleci/build/github/nestjs/nest/master" alt="CircleCI" /></a>
<a href="https://coveralls.io/github/nestjs/nest?branch=master" target="_blank"><img src="https://coveralls.io/repos/github/nestjs/nest/badge.svg?branch=master#9" alt="Coverage" /></a>
<a href="https://discord.gg/G7Qnnhy" target="_blank"><img src="https://img.shields.io/badge/discord-online-brightgreen.svg" alt="Discord"/></a>
<a href="https://opencollective.com/nest#backer" target="_blank"><img src="https://opencollective.com/nest/backers/badge.svg" alt="Backers on Open Collective" /></a>
<a href="https://opencollective.com/nest#sponsor" target="_blank"><img src="https://opencollective.com/nest/sponsors/badge.svg" alt="Sponsors on Open Collective" /></a>
<a href="https://paypal.me/kamilmysliwiec" target="_blank"><img src="https://img.shields.io/badge/Donate-PayPal-ff3f59.svg"/></a>
<a href="https://opencollective.com/nest#sponsor" target="_blank"><img src="https://img.shields.io/badge/Support%20us-Open%20Collective-41B883.svg" alt="Support us"></a>
<a href="https://twitter.com/nestframework" target="_blank"><img src="https://img.shields.io/twitter/follow/nestframework.svg?style=social&label=Follow"></a>
</p>
<!--[![Backers on Open Collective](https://opencollective.com/nest/backers/badge.svg)](https://opencollective.com/nest#backer)
[![Sponsors on Open Collective](https://opencollective.com/nest/sponsors/badge.svg)](https://opencollective.com/nest#sponsor)-->
## Description ## 配置
[Nest](https://github.com/nestjs/nest) framework TypeScript starter repository. 参考项目的 `config.example.yaml` ,并复制成为 `config.yaml` 以运行。
## Installation ## Docker
```bash Docker 容器镜像位于 `git-registry.mycard.moe/3rdeye/onebot-lb`。使用时把 `config.yaml` 挂载到 `/usr/src/app/config.yaml` 即可。
$ npm install \ No newline at end of file
```
## Running the app
```bash
# development
$ npm run start
# watch mode
$ npm run start:dev
# production mode
$ npm run start:prod
```
## Test
```bash
# unit tests
$ npm run test
# e2e tests
$ npm run test:e2e
# test coverage
$ npm run test:cov
```
## Support
Nest is an MIT-licensed open source project. It can grow thanks to the sponsors and support by the amazing backers. If you'd like to join them, please [read more here](https://docs.nestjs.com/support).
## Stay in touch
- Author - [Kamil Myśliwiec](https://kamilmysliwiec.com)
- Website - [https://nestjs.com](https://nestjs.com/)
- Twitter - [@nestframework](https://twitter.com/nestframework)
## License
Nest is [MIT licensed](LICENSE).
bots: onebot: # 配置模式请参照 [Koishi 文档](https://koishi.js.org/v4/plugins/adapter/onebot.html#%E6%9C%BA%E5%99%A8%E4%BA%BA%E9%80%89%E9%A1%B9)
- protocol: 'ws' bots:
endpoint: 'ws://localhost:6700' - protocol: 'ws'
selfId: '1111111111' endpoint: 'ws://localhost:6700'
token: 'token' selfId: '1111111111'
routes: token: 'token'
- name: default routes: # 路由配置。对于每个消息或事件,负载均衡器会发给所有路由的依照策略的某一个连接。
botId: '3221204940' - name: default # 必填。路由名称。机器人连接的 ws 路径为 ws://<地址>/routes/<name>
token: 'oONw7YpqUdYuc' selfId: '3221204940' # 必填。机器人 ID,和 OneBot 配置的 selfId 一致
token: 'token' # 连接 token
# 分流策略,有 'broadcast' | 'random' | 'round-robin' | 'hash' 四种。
## 分别为『广播给所有连接』『随机连接』『轮询』『按会话 hash』
## 默认为 hash。在有机器人交互的应用中建议使用 hash
balancePolicy: hash balancePolicy: hash
select: false # 作用域,详见 [Koishi 文档](https://koishi.js.org/v4/guide/plugin/context.html#%E5%9C%A8%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6%E4%B8%AD%E4%BD%BF%E7%94%A8%E9%80%89%E6%8B%A9%E5%99%A8)
heartbeat: 3000 # 心跳包的间隔。0 或不填为禁用心跳包。
wsReverse: # 该路由的反向 WebSocket 配置
- url: 'ws://localhost:8080'
token: 'token';
reconnectInterval: 60000 # 重连间隔
...@@ -7,6 +7,7 @@ import { RouteService } from './route/route.service'; ...@@ -7,6 +7,7 @@ import { RouteService } from './route/route.service';
import { OnebotGateway } from './onebot.gateway'; import { OnebotGateway } from './onebot.gateway';
import { MessageService } from './message/message.service'; import { MessageService } from './message/message.service';
import { ReverseWsService } from './reverse-ws/reverse-ws.service'; import { ReverseWsService } from './reverse-ws/reverse-ws.service';
import { WaitBotService } from './wait-bot/wait-bot.service';
@Module({ @Module({
imports: [ imports: [
...@@ -26,6 +27,7 @@ import { ReverseWsService } from './reverse-ws/reverse-ws.service'; ...@@ -26,6 +27,7 @@ import { ReverseWsService } from './reverse-ws/reverse-ws.service';
OnebotGateway, OnebotGateway,
MessageService, MessageService,
ReverseWsService, ReverseWsService,
WaitBotService,
], ],
}) })
export class AppModule {} export class AppModule {}
...@@ -18,6 +18,7 @@ import * as PluginOnebot from '@koishijs/plugin-adapter-onebot'; ...@@ -18,6 +18,7 @@ import * as PluginOnebot from '@koishijs/plugin-adapter-onebot';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { InjectContext, PluginDef, UsePlugin } from 'koishi-nestjs'; import { InjectContext, PluginDef, UsePlugin } from 'koishi-nestjs';
import { BotConfig } from '@koishijs/plugin-adapter-onebot/lib/bot'; import { BotConfig } from '@koishijs/plugin-adapter-onebot/lib/bot';
import { AdapterConfig } from '@koishijs/plugin-adapter-onebot/lib/utils';
@Injectable() @Injectable()
export class BotLoaderService implements OnModuleInit { export class BotLoaderService implements OnModuleInit {
...@@ -28,11 +29,18 @@ export class BotLoaderService implements OnModuleInit { ...@@ -28,11 +29,18 @@ export class BotLoaderService implements OnModuleInit {
@UsePlugin() @UsePlugin()
loadBots() { loadBots() {
const bots = this.config.get<BotConfig[]>('bots'); const onebotConfig = this.config.get<
for (const bot of bots) { Adapter.PluginConfig<AdapterConfig, BotConfig>
bot.selfId = bot.selfId.toString(); >('onebot');
if (onebotConfig.selfId) {
onebotConfig.selfId = onebotConfig.selfId.toString();
} }
return PluginDef(PluginOnebot, { bots }); if (onebotConfig.bots) {
for (const bot of onebotConfig.bots) {
bot.selfId = bot.selfId.toString();
}
}
return PluginDef(PluginOnebot, onebotConfig);
} }
onModuleInit() { onModuleInit() {
......
...@@ -6,10 +6,14 @@ import { Route } from '../route/Route'; ...@@ -6,10 +6,14 @@ import { Route } from '../route/Route';
import { genMetaEvent } from '../utility/oicq'; import { genMetaEvent } from '../utility/oicq';
import { OnebotProtocol } from '../utility/onebot-protocol'; import { OnebotProtocol } from '../utility/onebot-protocol';
import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot'; import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot';
import { WaitBotService } from '../wait-bot/wait-bot.service';
@Injectable() @Injectable()
export class MessageService extends ConsoleLogger { export class MessageService extends ConsoleLogger {
constructor(@InjectContext() private ctx: Context) { constructor(
@InjectContext() private ctx: Context,
private waitBot: WaitBotService,
) {
super('message'); super('message');
} }
...@@ -49,27 +53,28 @@ export class MessageService extends ConsoleLogger { ...@@ -49,27 +53,28 @@ export class MessageService extends ConsoleLogger {
); );
} }
}); });
client.send(JSON.stringify(genMetaEvent(route.botId, 'connect'))); client.send(JSON.stringify(genMetaEvent(route.selfId, 'connect')));
client.send(JSON.stringify(genMetaEvent(route.botId, 'enable'))); client.send(JSON.stringify(genMetaEvent(route.selfId, 'enable')));
} }
private async onWsEvent(route: Route, data: OnebotProtocol) { private async onWsEvent(route: Route, data: OnebotProtocol) {
const bot = this.ctx.bots.find( const bot = this.ctx.bots.find(
(b) => b.selfId === route.botId && b.platform === 'onebot', (b) => b.selfId === route.selfId && b.platform === 'onebot',
) as OneBotBot; ) as OneBotBot;
if (!bot) { if (!bot) {
this.error(`Bot ${route.botId} from ${route.name} not found.`); this.error(`Bot ${route.selfId} from ${route.name} not found.`);
return { return {
retcode: 1404, retcode: 1404,
status: 'failed', status: 'failed',
data: null, data: null,
error: { error: {
code: 1404, code: 1404,
message: `Bot ${route.botId} from ${route.name} not found.`, message: `Bot ${route.selfId} from ${route.name} not found.`,
}, },
echo: data?.echo, echo: data?.echo,
}; };
} }
await this.waitBot.waitForBotOnline(bot);
try { try {
const result = await bot.internal._request(data.action, data.params); const result = await bot.internal._request(data.action, data.params);
// console.log(result); // console.log(result);
...@@ -78,14 +83,16 @@ export class MessageService extends ConsoleLogger { ...@@ -78,14 +83,16 @@ export class MessageService extends ConsoleLogger {
echo: data?.echo, echo: data?.echo,
}; };
} catch (e) { } catch (e) {
this.error(`Bot ${route.botId} from ${route.name} timed out.`); this.error(
`Bot ${route.selfId} from ${route.name} errored: ${e.toString()}`,
);
return { return {
retcode: 1404, retcode: 1404,
status: 'failed', status: 'failed',
data: null, data: null,
error: { error: {
code: 1404, code: 1404,
message: `Bot ${route.botId} from ${route.name} timed out.`, message: `Bot ${route.selfId} from ${route.name} errored.`,
}, },
echo: data?.echo, echo: data?.echo,
}; };
......
...@@ -11,7 +11,7 @@ export class ReverseWsService extends ConsoleLogger { ...@@ -11,7 +11,7 @@ export class ReverseWsService extends ConsoleLogger {
} }
initializeReverseWs(route: Route, revConfig: ReverseWsConfig) { initializeReverseWs(route: Route, revConfig: ReverseWsConfig) {
const headers: OutgoingHttpHeaders = { const headers: OutgoingHttpHeaders = {
'X-Self-ID': route.botId, 'X-Self-ID': route.selfId,
'X-Client-Role': 'Universal', 'X-Client-Role': 'Universal',
'User-Agent': 'OneBot', 'User-Agent': 'OneBot',
}; };
...@@ -19,10 +19,17 @@ export class ReverseWsService extends ConsoleLogger { ...@@ -19,10 +19,17 @@ export class ReverseWsService extends ConsoleLogger {
headers['Authorization'] = `Bearer ${revConfig.token}`; headers['Authorization'] = `Bearer ${revConfig.token}`;
} }
const ws = new WebSocket(revConfig.url, { headers }); const ws = new WebSocket(revConfig.url, { headers });
ws.on('error', (err) => const interval = revConfig.reconnectInterval || 5000;
this.warn(`Socket from ${route.name} error: ${err.toString()}`), let initialized = false;
); ws.on('error', (err) => {
this.warn(`Socket from ${route.name} error: ${err.toString()}`);
if (!initialized) {
this.warn(`Will retry after ${interval} ms.`);
setTimeout(() => this.initializeReverseWs(route, revConfig), interval);
}
});
ws.on('open', () => { ws.on('open', () => {
initialized = true;
this.log(`Route ${route.name} connected to ${revConfig.url}.`); this.log(`Route ${route.name} connected to ${revConfig.url}.`);
route.addConnection(ws); route.addConnection(ws);
this.meesageService.registerWsEvent(ws, route); this.meesageService.registerWsEvent(ws, route);
...@@ -30,7 +37,7 @@ export class ReverseWsService extends ConsoleLogger { ...@@ -30,7 +37,7 @@ export class ReverseWsService extends ConsoleLogger {
ws.on('close', (code, msg) => { ws.on('close', (code, msg) => {
route.removeConnection(ws); route.removeConnection(ws);
const interval = revConfig.reconnectInterval || 5000; const interval = revConfig.reconnectInterval || 5000;
this.log( this.warn(
`Route ${route.name} disconnected from ${revConfig.url}: ${code}: ${msg}. Will retry after ${interval} ms.`, `Route ${route.name} disconnected from ${revConfig.url}: ${code}: ${msg}. Will retry after ${interval} ms.`,
); );
setTimeout(() => this.initializeReverseWs(route, revConfig), interval); setTimeout(() => this.initializeReverseWs(route, revConfig), interval);
......
...@@ -3,7 +3,6 @@ import type WebSocket from 'ws'; ...@@ -3,7 +3,6 @@ import type WebSocket from 'ws';
import { Context, Session } from 'koishi'; import { Context, Session } from 'koishi';
import { Random, remove } from 'koishi'; import { Random, remove } from 'koishi';
import { createHash } from 'crypto'; import { createHash } from 'crypto';
import { OneBotBot } from '@koishijs/plugin-adapter-onebot/lib/bot';
export type BalancePolicy = 'broadcast' | 'random' | 'round-robin' | 'hash'; export type BalancePolicy = 'broadcast' | 'random' | 'round-robin' | 'hash';
...@@ -15,7 +14,7 @@ export interface ReverseWsConfig { ...@@ -15,7 +14,7 @@ export interface ReverseWsConfig {
export interface RouteConfig { export interface RouteConfig {
name: string; name: string;
botId: string; selfId: string;
token?: string; token?: string;
select?: Selection; select?: Selection;
balancePolicy?: BalancePolicy; balancePolicy?: BalancePolicy;
...@@ -27,20 +26,22 @@ export class Route implements RouteConfig { ...@@ -27,20 +26,22 @@ export class Route implements RouteConfig {
private roundCount = 0; private roundCount = 0;
ctx: Context; ctx: Context;
name: string; name: string;
botId: string; selfId: string;
token?: string; token?: string;
select?: Selection; select?: Selection;
balancePolicy?: BalancePolicy; balancePolicy?: BalancePolicy;
heartbeat?: number; heartbeat?: number;
reverseWs?: ReverseWsConfig[];
preMessages: { data: any; session: Session }[] = [];
constructor(routeConfig: RouteConfig, ctx: Context) { constructor(routeConfig: RouteConfig, ctx: Context) {
Object.assign(this, routeConfig); Object.assign(this, routeConfig);
this.balancePolicy ||= 'hash'; this.balancePolicy ||= 'hash';
this.botId = this.botId.toString(); this.selfId = this.selfId.toString();
this.ctx = this.getFilteredContext(ctx); this.ctx = this.getFilteredContext(ctx);
if (this.heartbeat) { if (this.heartbeat) {
setInterval(() => { setInterval(() => {
this.broadcast({ this.broadcast({
self_id: this.botId, self_id: this.selfId,
time: Math.floor(Date.now() / 1000), time: Math.floor(Date.now() / 1000),
post_type: 'meta_event', post_type: 'meta_event',
meta_event_type: 'heartbeat', meta_event_type: 'heartbeat',
...@@ -49,9 +50,13 @@ export class Route implements RouteConfig { ...@@ -49,9 +50,13 @@ export class Route implements RouteConfig {
}, this.heartbeat); }, this.heartbeat);
} }
} }
send(data: any, sess: Session, allConns = this.connections) { send(data: any, session: Session, allConns = this.connections) {
if (!allConns.length) {
this.preMessages.push({ data, session });
return;
}
const message = JSON.stringify(data); const message = JSON.stringify(data);
const conns = this.getRelatedConnections(sess, allConns); const conns = this.getRelatedConnections(session, allConns);
for (const conn of conns) { for (const conn of conns) {
conn.send(message, (err) => { conn.send(message, (err) => {
if (err) { if (err) {
...@@ -64,7 +69,7 @@ export class Route implements RouteConfig { ...@@ -64,7 +69,7 @@ export class Route implements RouteConfig {
.warn(`Retrying another connection.`); .warn(`Retrying another connection.`);
this.send( this.send(
data, data,
sess, session,
allConns.filter((c) => c !== conn), allConns.filter((c) => c !== conn),
); );
} }
...@@ -85,7 +90,7 @@ export class Route implements RouteConfig { ...@@ -85,7 +90,7 @@ export class Route implements RouteConfig {
} }
} }
getFilteredContext(ctx: Context) { getFilteredContext(ctx: Context) {
const idCtx = ctx.self(this.botId); const idCtx = ctx.self(this.selfId);
if (!this.select) { if (!this.select) {
return idCtx; return idCtx;
} }
...@@ -126,7 +131,11 @@ export class Route implements RouteConfig { ...@@ -126,7 +131,11 @@ export class Route implements RouteConfig {
} }
addConnection(conn: WebSocket) { addConnection(conn: WebSocket) {
this.connections.push(conn); this.connections.push(conn);
return conn; const preMessages = this.preMessages;
this.preMessages = [];
for (const message of preMessages) {
this.send(message.data, message.session);
}
} }
removeConnection(conn: WebSocket) { removeConnection(conn: WebSocket) {
remove(this.connections, conn); remove(this.connections, conn);
......
...@@ -7,6 +7,7 @@ import { ConfigService } from '@nestjs/config'; ...@@ -7,6 +7,7 @@ import { ConfigService } from '@nestjs/config';
import { Route, RouteConfig } from './Route'; import { Route, RouteConfig } from './Route';
import { InjectContextPlatform } from 'koishi-nestjs'; import { InjectContextPlatform } from 'koishi-nestjs';
import { Context, Session } from 'koishi'; import { Context, Session } from 'koishi';
import { ReverseWsService } from '../reverse-ws/reverse-ws.service';
@Injectable() @Injectable()
export class RouteService export class RouteService
...@@ -16,11 +17,12 @@ export class RouteService ...@@ -16,11 +17,12 @@ export class RouteService
constructor( constructor(
config: ConfigService, config: ConfigService,
@InjectContextPlatform('onebot') private ctx: Context, @InjectContextPlatform('onebot') private ctx: Context,
private reverseWsService: ReverseWsService,
) { ) {
super('route'); super('route');
const routeConfs = config.get<RouteConfig[]>('routes'); const routeConfs = config.get<RouteConfig[]>('routes');
for (const routeConf of routeConfs) { for (const routeConf of routeConfs) {
this.log(`Loaded route ${routeConf.name} for ${routeConf.botId}`); this.log(`Loaded route ${routeConf.name} for ${routeConf.selfId}`);
this.routes.set(routeConf.name, new Route(routeConf, ctx)); this.routes.set(routeConf.name, new Route(routeConf, ctx));
} }
} }
...@@ -32,6 +34,11 @@ export class RouteService ...@@ -32,6 +34,11 @@ export class RouteService
onApplicationBootstrap() { onApplicationBootstrap() {
for (const route of this.routes.values()) { for (const route of this.routes.values()) {
route.ctx.on('dispatch', (session) => this.onOnebotEvent(session, route)); route.ctx.on('dispatch', (session) => this.onOnebotEvent(session, route));
if (route.reverseWs) {
for (const revConfig of route.reverseWs) {
this.reverseWsService.initializeReverseWs(route, revConfig);
}
}
} }
} }
......
import { BotConfig } from '@koishijs/plugin-adapter-onebot/lib/bot';
import yaml from 'yaml'; import yaml from 'yaml';
import * as fs from 'fs'; import * as fs from 'fs';
import { RouteConfig } from '../route/Route'; import { RouteConfig } from '../route/Route';
import { Adapter } from 'koishi';
import { AdapterConfig } from '@koishijs/plugin-adapter-onebot/lib/utils';
import { BotConfig } from '@koishijs/plugin-adapter-onebot/lib/bot';
export interface LbConfig { export interface LbConfig {
bots: BotConfig[]; onebot: Adapter.PluginConfig<AdapterConfig, BotConfig>;
routes: RouteConfig[]; routes: RouteConfig[];
} }
......
import { Test, TestingModule } from '@nestjs/testing';
import { WaitBotService } from './wait-bot.service';
describe('WaitBotService', () => {
let service: WaitBotService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [WaitBotService],
}).compile();
service = module.get<WaitBotService>(WaitBotService);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
});
import { Injectable } from '@nestjs/common';
import { Bot } from 'koishi';
import { UseEvent } from 'koishi-nestjs';
@Injectable()
export class WaitBotService {
private botWaitMap = new Map<Bot, (() => void)[]>();
async waitForBotOnline(bot: Bot) {
if (bot.status === 'online') {
return;
}
const resolvers = this.botWaitMap.get(bot) || [];
return new Promise<void>((resolve) => {
resolvers.push(resolve);
this.botWaitMap.set(bot, resolvers);
});
}
@UseEvent('bot-updated')
onBotChanged(bot: Bot) {
if (bot.status !== 'online') {
return;
}
const resolvers = this.botWaitMap.get(bot);
if (!resolvers) {
return;
}
this.botWaitMap.delete(bot);
for (const resolve of resolvers) {
resolve();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment