Commit 9182abd9 authored by nanahira's avatar nanahira

fix

parent d103fef3
Pipeline #43121 passed with stages
in 2 minutes and 11 seconds
...@@ -35,7 +35,7 @@ export abstract class Client { ...@@ -35,7 +35,7 @@ export abstract class Client {
this.disconnect$ = merge( this.disconnect$ = merge(
this.disconnectSubject.asObservable(), this.disconnectSubject.asObservable(),
this._onDisconnect(), this._onDisconnect(),
).pipe(take(1)); ).pipe(take(1), share());
this.receive$ = this._receive().pipe( this.receive$ = this._receive().pipe(
YGOProProtoPipe(YGOProCtos, { YGOProProtoPipe(YGOProCtos, {
onError: (error) => { onError: (error) => {
...@@ -108,7 +108,10 @@ export abstract class Client { ...@@ -108,7 +108,10 @@ export abstract class Client {
return this.ip || this.physicalIp() || 'unknown'; return this.ip || this.physicalIp() || 'unknown';
} }
hostname = '';
name = ''; name = '';
vpass = ''; vpass = '';
name_vpass = ''; name_vpass = '';
established = false;
} }
import { import {
YGOProCtosBase,
YGOProCtosExternalAddress, YGOProCtosExternalAddress,
YGOProCtosJoinGame,
YGOProCtosPlayerInfo, YGOProCtosPlayerInfo,
} from 'ygopro-msg-encode'; } from 'ygopro-msg-encode';
import { Context } from '../app'; import { Context } from '../app';
import { Client } from '../client'; import { Client } from '../client';
import { IpResolver } from './ip-resolver'; import { IpResolver } from './ip-resolver';
import { WsClient } from '../transport/ws/client'; import { WsClient } from '../transport/ws/client';
import { forkJoin, filter, takeUntil, timeout, firstValueFrom } from 'rxjs';
export class ClientHandler { export class ClientHandler {
constructor(private ctx: Context) { constructor(private ctx: Context) {
this.ctx.middleware( this.ctx.middleware(
YGOProCtosExternalAddress, YGOProCtosExternalAddress,
async (msg, client, next) => { async (msg, client, next) => {
if (client instanceof WsClient) { if (client instanceof WsClient || client.ip) {
// ws should tell real IP and hostname in http headers, so we skip this step for ws clients
return next(); return next();
} }
this.ctx this.ctx
...@@ -21,33 +25,67 @@ export class ClientHandler { ...@@ -21,33 +25,67 @@ export class ClientHandler {
client, client,
msg.real_ip === '0.0.0.0' ? undefined : msg.real_ip, msg.real_ip === '0.0.0.0' ? undefined : msg.real_ip,
); );
client.hostname = msg.hostname?.split(':')[0] || '';
return next(); return next();
}, },
); );
this.ctx.middleware(YGOProCtosPlayerInfo, async (msg, client, next) => { this.ctx.middleware(YGOProCtosPlayerInfo, async (msg, client, next) => {
if (!client.ip) {
this.ctx.get(IpResolver).setClientIp(client);
}
const [name, vpass] = msg.name.split('$'); const [name, vpass] = msg.name.split('$');
client.name = name; client.name = name;
client.vpass = vpass || ''; client.vpass = vpass || '';
return next(); return next();
}); });
this.ctx.middleware(YGOProCtosBase, async (msg, client, next) => {
const isPreHandshakeMsg = [
YGOProCtosExternalAddress,
YGOProCtosPlayerInfo,
YGOProCtosJoinGame,
].some((allowed) => msg instanceof allowed);
if (client.established !== isPreHandshakeMsg) {
// disallow any messages before handshake is complete, except for the ones needed for handshake
return undefined;
}
return next();
});
} }
private logger = this.ctx.createLogger('ClientHandler'); private logger = this.ctx.createLogger('ClientHandler');
async handleClient(client: Client): Promise<void> { async handleClient(client: Client): Promise<void> {
try { client.init();
client.init().receive$.subscribe(async (msg) => { const receive$ = client.receive$;
try {
await this.ctx.dispatch(msg, client); receive$.subscribe(async (msg) => {
} catch (e) { try {
this.logger.warn( await this.ctx.dispatch(msg, client);
`Error dispatching message ${msg.constructor.name} from ${client.loggingIp()}: ${(e as Error).message}`, } catch (e) {
); this.logger.warn(
} `Error dispatching message ${msg.constructor.name} from ${client.loggingIp()}: ${(e as Error).message}`,
);
}
});
const handshake$ = forkJoin([
receive$.pipe(
filter((msg) => msg instanceof YGOProCtosPlayerInfo),
takeUntil(client.disconnect$),
),
receive$.pipe(
filter((msg) => msg instanceof YGOProCtosJoinGame),
takeUntil(client.disconnect$),
),
]).pipe(timeout(5000), takeUntil(client.disconnect$));
firstValueFrom(handshake$)
.then(() => {
client.established = true;
})
.catch(() => {
client.disconnect();
}); });
} catch {
client.disconnect();
}
} }
} }
...@@ -68,6 +68,7 @@ export class WsServer { ...@@ -68,6 +68,7 @@ export class WsServer {
private handleConnection(ws: WebSocket, req: IncomingMessage): void { private handleConnection(ws: WebSocket, req: IncomingMessage): void {
const client = new WsClient(this.ctx, ws, req); const client = new WsClient(this.ctx, ws, req);
if (this.ctx.get(IpResolver).setClientIp(client, client.xffIp())) return; if (this.ctx.get(IpResolver).setClientIp(client, client.xffIp())) return;
client.hostname = req.headers.host?.split(':')[0] || '';
const handler = this.ctx.get(ClientHandler); const handler = this.ctx.get(ClientHandler);
handler.handleClient(client).catch((err) => { handler.handleClient(client).catch((err) => {
this.logger.error({ err }, 'Error handling client'); this.logger.error({ err }, 'Error handling client');
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment