Commit 16dfabaa authored by nanahira's avatar nanahira

buggy

parent 54f74bbf
......@@ -6,3 +6,6 @@ SSL_PATH: ""
SSL_CERT: ""
SSL_KEY: ""
TRUSTED_PROXIES: 127.0.0.0/8,::1/128
NO_CONNECT_COUNT_LIMIT: ""
ALT_VERSIONS: ""
USE_PROXY: ""
This diff is collapsed.
......@@ -9,6 +9,7 @@
"build": "tsc",
"gen:config-example": "npm run build && node dist/src/scripts/gen-config-example.js",
"test": "jest --passWithNoTests",
"dev": "ts-node index.ts",
"start": "node dist/index.js"
},
"repository": {
......@@ -40,7 +41,6 @@
"testEnvironment": "node"
},
"devDependencies": {
"@types/ip6addr": "^0.2.6",
"@types/jest": "^30.0.0",
"@types/node": "^25.2.3",
"@types/ws": "^8.18.1",
......@@ -52,12 +52,16 @@
"jest": "^30.2.0",
"prettier": "^3.8.1",
"ts-jest": "^29.4.6",
"ts-node": "^10.9.2",
"typescript": "^5.9.3"
},
"dependencies": {
"ip6addr": "^0.2.5",
"axios": "^1.13.5",
"http-proxy-agent": "^7.0.2",
"https-proxy-agent": "^7.0.6",
"ipaddr.js": "^2.3.0",
"koishipro-core.js": "^1.3.0",
"nfkit": "^1.0.21",
"nfkit": "^1.0.22",
"pino": "^10.3.1",
"pino-pretty": "^13.1.3",
"rxjs": "^7.8.2",
......
......@@ -5,6 +5,13 @@ import { Emitter } from './services/emitter';
import { SSLFinder } from './services/ssl-finder';
import { ClientHandler } from './services/client-handler';
import { IpResolver } from './services/ip-resolver';
import { HttpClient } from './services/http-client';
import { Chnroute } from './services/chnroute';
import { I18nService } from './services/i18n';
import { YGOProCtosJoinGame } from 'ygopro-msg-encode';
import { TcpServer } from './transport/tcp/server';
import { WsServer } from './transport/ws/server';
import { ClientVersionCheck } from './services/client-version-check';
const core = createAppContext()
.provide(ConfigService, {
......@@ -12,6 +19,7 @@ const core = createAppContext()
})
.provide(Logger, { merge: ['createLogger'] })
.provide(Emitter, { merge: ['dispatch', 'middleware', 'removeMiddleware'] })
.provide(HttpClient, { merge: ['http'] })
.define();
export type Context = typeof core;
......@@ -19,5 +27,21 @@ export type Context = typeof core;
export const app = core
.provide(SSLFinder)
.provide(IpResolver)
.provide(Chnroute)
.provide(I18nService)
.provide(ClientHandler)
.provide(TcpServer)
.provide(WsServer)
.provide(ClientVersionCheck)
.define();
app.middleware(YGOProCtosJoinGame, async (msg, client, next) => {
await client.sendChat(`Welcome ${client.name_vpass || client.name}!`);
await client.sendChat(`Your IP: ${client.ip}`);
await client.sendChat(`Your physical IP: ${client.physicalIp()}`);
await client.sendChat(`Your pass: ${msg.pass}`);
await client.die(
'This server is for testing purposes only. Please use an official server to play the game.',
);
return undefined;
});
import {
filter,
firstValueFrom,
merge,
Observable,
Subject,
timeout,
TimeoutError,
} from 'rxjs';
import { take } from 'rxjs/operators';
import { Context } from './app';
import {
YGOProCtos,
......@@ -18,22 +20,26 @@ import {
import { YGOProProtoPipe } from './utility/ygopro-proto-pipe';
import { ClassType } from 'nfkit';
import { I18nService } from './services/i18n';
import { Chnroute } from './services/chnroute';
export abstract class Client {
protected abstract _send(data: Buffer): Promise<void>;
protected abstract _receive(): Observable<Buffer<ArrayBufferLike>>;
abstract disconnect(): Promise<void>;
abstract onDisconnect(): Observable<void>;
protected abstract _disconnect(): Promise<void>;
protected abstract _onDisconnect(): Observable<void>;
abstract physicalIp(): string;
ip = '';
isLocal = false;
private logger = this.ctx.createLogger(`Client ${this.physicalIp()}`);
private logger = this.ctx.createLogger(this.constructor.name);
private receiveSubject?: Subject<YGOProCtosBase>;
private disconnectSubject = new Subject<void>();
private manuallyDisconnected = false;
constructor(protected ctx: Context) {
// Subscribe to disconnect event to clean up subject
constructor(protected ctx: Context) {}
init() {
this.onDisconnect().subscribe(() => {
if (this.receiveSubject) {
this.receiveSubject.complete();
......@@ -42,6 +48,23 @@ export abstract class Client {
});
}
async disconnect(): Promise<void> {
this.manuallyDisconnected = true;
this.disconnectSubject.next();
this.disconnectSubject.complete();
await this._disconnect();
}
onDisconnect(): Observable<void> {
if (this.manuallyDisconnected) {
return this.disconnectSubject.asObservable();
}
return merge(
this.disconnectSubject.asObservable(),
this._onDisconnect(),
).pipe(take(1));
}
async send(data: YGOProStocBase) {
try {
await this._send(Buffer.from(data.toFullPayload()));
......@@ -53,16 +76,18 @@ export abstract class Client {
}
}
async sendChat(msg: string, type: number) {
async sendChat(msg: string, type = ChatColor.BABYBLUE) {
return this.send(
new YGOProStocChat().fromPartial({
msg: await this.ctx.get(I18nService).translate('en-US', msg),
msg: await this.ctx
.get(I18nService)
.translate(this.ctx.get(Chnroute).getLocale(this.ip), msg),
player_type: type,
}),
);
}
async die(msg?: string, type?: number) {
async die(msg?: string, type = ChatColor.BABYBLUE) {
if (msg) {
await this.sendChat(msg, type || ChatColor.BABYBLUE);
}
......@@ -115,34 +140,6 @@ export abstract class Client {
return this.receiveSubject.asObservable();
}
/**
* Wait for a message of any of the specified types
* @param types Array of message classes to wait for
* @param timeoutMs Timeout in milliseconds (default: 5000)
* @returns Promise that resolves with the matching message
* @throws Error if timeout is reached
*/
async waitForMessage<const C extends ClassType<YGOProCtosBase>[]>(
types: C,
timeoutMs = 5000,
): Promise<InstanceType<C[number]>> {
try {
return (await firstValueFrom(
this.receive().pipe(
filter((msg) => types.some((type) => msg instanceof type)) as any,
timeout(timeoutMs),
),
)) as InstanceType<C[number]>;
} catch (err) {
if (err instanceof TimeoutError) {
throw new Error(
`Timeout waiting for message after ${timeoutMs}ms (IP: ${this.loggingIp()})`,
);
}
throw err;
}
}
name = '';
vpass = '';
name_vpass = '';
......
......@@ -12,6 +12,7 @@ export const defaultConfig = {
TRUSTED_PROXIES: '127.0.0.0/8,::1/128',
NO_CONNECT_COUNT_LIMIT: '',
ALT_VERSIONS: '',
USE_PROXY: '',
};
export type Config = typeof defaultConfig;
......
export const TRANSLATIONS = {
'en-US': {},
'zh-CN': {},
}
'en-US': {
update_required: 'Please update your client version',
wait_update:
'Your client version is higher than the server version, please wait for the server to update',
version_to_polyfill:
'Your client version is not fully supported. Please rejoin to enable temporary compatibility mode. For the best experience, we recommend updating your game to the latest version.',
version_polyfilled:
'Temporary compatibility mode has been enabled for your version. We recommend updating your game to avoid potential compatibility issues in the future.',
},
'zh-CN': {
update_required: '请更新你的客户端版本',
wait_update: '你的客户端版本高于服务器版本,请等待服务器更新',
version_to_polyfill:
'当前客户端版本暂未完全支持。请重新加入以启用临时兼容模式。为获得更佳体验,建议尽快更新游戏版本。',
version_polyfilled:
'已为当前版本启用临时兼容模式。建议尽快更新游戏,以避免后续兼容性问题。',
},
};
......@@ -8,35 +8,37 @@ import { IpResolver } from './ip-resolver';
import { WsClient } from '../transport/ws/client';
export class ClientHandler {
constructor(private ctx: Context) {}
private logger = this.ctx.createLogger('ClientHandler');
async handleClient(client: Client): Promise<void> {
try {
const first = await client.waitForMessage([
YGOProCtosPlayerInfo,
constructor(private ctx: Context) {
this.ctx.middleware(
YGOProCtosExternalAddress,
]);
let playerInfo: YGOProCtosPlayerInfo;
if (first instanceof YGOProCtosExternalAddress) {
if (!(client instanceof WsClient)) {
this.ctx.get(IpResolver).setClientIp(client, first.real_ip);
}
playerInfo = await client.waitForMessage([YGOProCtosPlayerInfo]);
} else {
if (!(client instanceof WsClient)) {
this.ctx.get(IpResolver).setClientIp(client);
}
playerInfo = first;
async (msg, client, next) => {
if (client instanceof WsClient) {
return next();
}
this.ctx
.get(IpResolver)
.setClientIp(
client,
msg.real_ip === '0.0.0.0' ? undefined : msg.real_ip,
);
return next();
},
);
client.name_vpass = playerInfo.name;
const [name, vpass] = playerInfo.name.split('$');
this.ctx.middleware(YGOProCtosPlayerInfo, async (msg, client, next) => {
const [name, vpass] = msg.name.split('$');
client.name = name;
client.vpass = vpass || '';
return next();
});
}
private logger = this.ctx.createLogger('ClientHandler');
async handleClient(client: Client): Promise<void> {
client.init();
try {
client.init();
client.receive().subscribe(async (msg) => {
try {
await this.ctx.dispatch(msg, client);
......
import { YGOProCtosJoinGame } from 'ygopro-msg-encode';
import {
ChatColor,
YGOProCtosJoinGame,
YGOProStocErrorMsg,
} from 'ygopro-msg-encode';
import { Context } from '../app';
const YGOPRO_VERSION = 0x1362;
......@@ -15,6 +19,18 @@ export class ClientVersionCheck {
if (msg.version === YGOPRO_VERSION) {
return next();
}
if (this.altVersions.includes(msg.version)) {
await client.sendChat('#{version_polyfilled}', ChatColor.BABYBLUE);
return next();
}
await client.sendChat('#{update_required}', ChatColor.RED);
await client.send(
new YGOProStocErrorMsg().fromPartial({
msg: 4,
code: YGOPRO_VERSION,
}),
);
await client.disconnect();
});
}
}
import { Context } from '../app';
import { Client } from '../client';
import * as ip6addr from 'ip6addr';
import * as ipaddr from 'ipaddr.js';
export class IpResolver {
private logger = this.ctx.createLogger('IpResolver');
private connectedIpCount = new Map<string, number>();
private badIpCount = new Map<string, number>();
private trustedProxies: Array<
ReturnType<typeof ip6addr.createCIDR | typeof ip6addr.createAddrRange>
> = [];
private trustedProxies: Array<[ipaddr.IPv4 | ipaddr.IPv6, number]> = [];
constructor(private ctx: Context) {
// Parse trusted proxies configuration
......@@ -23,11 +21,7 @@ export class IpResolver {
for (const trusted of proxies) {
try {
if (trusted.includes('/')) {
this.trustedProxies.push(ip6addr.createCIDR(trusted));
} else {
this.trustedProxies.push(ip6addr.createAddrRange(trusted, trusted));
}
this.trustedProxies.push(ipaddr.parseCIDR(trusted));
} catch (e: any) {
this.logger.warn(
{ trusted, err: e.message },
......@@ -57,13 +51,17 @@ export class IpResolver {
}
isTrustedProxy(ip: string): boolean {
return this.trustedProxies.some((trusted) => {
if (ip.startsWith('::ffff:')) {
ip = this.toIpv4(ip);
}
try {
return trusted.contains(ip);
const addr = ipaddr.parse(ip);
return this.trustedProxies.some(([range, mask]) => {
return addr.match(range, mask);
});
} catch {
return false;
}
});
}
getRealIp(physicalIp: string, xffIp?: string): string {
......
import { Socket } from 'node:net';
import { Observable, fromEvent } from 'rxjs';
import { Observable, fromEvent, merge } from 'rxjs';
import { map, take } from 'rxjs/operators';
import { Context } from '../../app';
import { Client } from '../../client';
......@@ -11,7 +12,7 @@ export class TcpClient extends Client {
super(ctx);
}
_send(data: Buffer): Promise<void> {
protected _send(data: Buffer): Promise<void> {
return new Promise((resolve, reject) => {
this.sock.write(data, (error) => {
if (error) {
......@@ -23,13 +24,13 @@ export class TcpClient extends Client {
});
}
_receive(): Observable<Buffer> {
protected _receive(): Observable<Buffer> {
return fromEvent<Buffer>(this.sock, 'data');
}
disconnect(): Promise<void> {
protected async _disconnect(): Promise<void> {
if (this.sock.destroyed) {
return Promise.resolve();
return;
}
return new Promise((resolve) => {
this.sock.once('close', () => resolve());
......@@ -37,8 +38,11 @@ export class TcpClient extends Client {
});
}
onDisconnect(): Observable<void> {
return fromEvent<void>(this.sock, 'close');
protected _onDisconnect(): Observable<void> {
return merge(
fromEvent<void>(this.sock, 'close'),
fromEvent<Error>(this.sock, 'error').pipe(map(() => undefined)),
).pipe(take(1));
}
physicalIp(): string {
......
import { IncomingMessage } from 'node:http';
import { Socket } from 'node:net';
import { Observable, filter, fromEvent, map } from 'rxjs';
import { Observable, filter, fromEvent, map, merge } from 'rxjs';
import { take } from 'rxjs/operators';
import WebSocket, { RawData } from 'ws';
import { Context } from '../../app';
import { Client } from '../../client';
......@@ -14,7 +15,7 @@ export class WsClient extends Client {
super(ctx);
}
_send(data: Buffer): Promise<void> {
protected _send(data: Buffer): Promise<void> {
return new Promise((resolve, reject) => {
this.sock.send(data, (error) => {
if (error) {
......@@ -26,7 +27,7 @@ export class WsClient extends Client {
});
}
_receive(): Observable<Buffer> {
protected _receive(): Observable<Buffer> {
return fromEvent<[RawData, boolean]>(this.sock, 'message').pipe(
filter(([, isBinary]) => isBinary),
map(([data]) => {
......@@ -41,9 +42,9 @@ export class WsClient extends Client {
);
}
disconnect(): Promise<void> {
protected async _disconnect(): Promise<void> {
if (this.sock.readyState === WebSocket.CLOSED) {
return Promise.resolve();
return;
}
return new Promise((resolve) => {
this.sock.once('close', () => resolve());
......@@ -51,8 +52,11 @@ export class WsClient extends Client {
});
}
onDisconnect(): Observable<void> {
return fromEvent<void>(this.sock, 'close');
protected _onDisconnect(): Observable<void> {
return merge(
fromEvent<void>(this.sock, 'close'),
fromEvent<Error>(this.sock, 'error').pipe(map(() => undefined)),
).pipe(take(1));
}
physicalIp(): string {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment