Commit 2400c417 authored by nanahira's avatar nanahira

fix

parent 1201d51e
...@@ -40,8 +40,7 @@ app.middleware(YGOProCtosJoinGame, async (msg, client, next) => { ...@@ -40,8 +40,7 @@ app.middleware(YGOProCtosJoinGame, async (msg, client, next) => {
await client.sendChat(`Your IP: ${client.ip}`); await client.sendChat(`Your IP: ${client.ip}`);
await client.sendChat(`Your physical IP: ${client.physicalIp()}`); await client.sendChat(`Your physical IP: ${client.physicalIp()}`);
await client.sendChat(`Your pass: ${msg.pass}`); await client.sendChat(`Your pass: ${msg.pass}`);
await client.die( return client.die(
'This server is for testing purposes only. Please use an official server to play the game.', 'This server is for testing purposes only. Please use an official server to play the game.',
); );
return undefined;
}); });
import { import { filter, merge, Observable, Subject } from 'rxjs';
filter, import { map, share, take, takeUntil } from 'rxjs/operators';
firstValueFrom,
merge,
Observable,
Subject,
timeout,
TimeoutError,
} from 'rxjs';
import { take } from 'rxjs/operators';
import { Context } from './app'; import { Context } from './app';
import { import {
YGOProCtos, YGOProCtos,
...@@ -18,7 +10,6 @@ import { ...@@ -18,7 +10,6 @@ import {
YGOProStocErrorMsg, YGOProStocErrorMsg,
} from 'ygopro-msg-encode'; } from 'ygopro-msg-encode';
import { YGOProProtoPipe } from './utility/ygopro-proto-pipe'; import { YGOProProtoPipe } from './utility/ygopro-proto-pipe';
import { ClassType } from 'nfkit';
import { I18nService } from './services/i18n'; import { I18nService } from './services/i18n';
import { Chnroute } from './services/chnroute'; import { Chnroute } from './services/chnroute';
...@@ -33,36 +24,48 @@ export abstract class Client { ...@@ -33,36 +24,48 @@ export abstract class Client {
isLocal = false; isLocal = false;
private logger = this.ctx.createLogger(this.constructor.name); private logger = this.ctx.createLogger(this.constructor.name);
private receiveSubject?: Subject<YGOProCtosBase>;
private disconnectSubject = new Subject<void>(); private disconnectSubject = new Subject<void>();
private manuallyDisconnected = false;
constructor(protected ctx: Context) {} constructor(protected ctx: Context) {}
receive$!: Observable<YGOProCtosBase>;
disconnect$!: Observable<void>;
init() { init() {
this.onDisconnect().subscribe(() => { this.disconnect$ = merge(
if (this.receiveSubject) { this.disconnectSubject.asObservable(),
this.receiveSubject.complete(); this._onDisconnect(),
this.receiveSubject = undefined; ).pipe(take(1));
this.receive$ = this._receive().pipe(
YGOProProtoPipe(YGOProCtos, {
onError: (error) => {
this.logger.warn(
{ ip: this.loggingIp() },
`Protocol decode error: ${error.message}`,
);
},
}),
filter((msg) => {
if (!msg) {
this.logger.warn(
{ ip: this.loggingIp() },
`Received invalid message, skipping`,
);
return false;
} }
}); return true;
}),
map((s) => s!),
takeUntil(this.disconnect$),
share(),
);
} }
async disconnect(): Promise<void> { disconnect() {
this.manuallyDisconnected = true;
this.disconnectSubject.next(); this.disconnectSubject.next();
this.disconnectSubject.complete(); this.disconnectSubject.complete();
await this._disconnect(); this._disconnect().then();
} return undefined;
onDisconnect(): Observable<void> {
if (this.manuallyDisconnected) {
return this.disconnectSubject.asObservable();
}
return merge(
this.disconnectSubject.asObservable(),
this._onDisconnect(),
).pipe(take(1));
} }
async send(data: YGOProStocBase) { async send(data: YGOProStocBase) {
...@@ -97,49 +100,13 @@ export abstract class Client { ...@@ -97,49 +100,13 @@ export abstract class Client {
code: 9, code: 9,
}), }),
); );
this.disconnect().then(); return this.disconnect();
} }
loggingIp() { loggingIp() {
return this.ip || this.physicalIp() || 'unknown'; return this.ip || this.physicalIp() || 'unknown';
} }
receive(): Observable<YGOProCtosBase> {
// Create subject on first call and reuse it
if (!this.receiveSubject) {
this.receiveSubject = new Subject<YGOProCtosBase>();
this._receive()
.pipe(
YGOProProtoPipe(YGOProCtos, {
onError: (error) => {
this.logger.warn(
{ ip: this.loggingIp() },
`Protocol decode error: ${error.message}`,
);
},
}),
filter((msg) => {
if (!msg) {
this.logger.warn(
{ ip: this.loggingIp() },
`Received invalid message, skipping`,
);
return false;
}
return true;
}),
)
.subscribe({
next: (data) => this.receiveSubject?.next(data!),
error: (err) => this.receiveSubject?.error(err),
complete: () => this.receiveSubject?.complete(),
});
}
return this.receiveSubject.asObservable();
}
name = ''; name = '';
vpass = ''; vpass = '';
name_vpass = ''; name_vpass = '';
......
...@@ -39,7 +39,7 @@ export class ClientHandler { ...@@ -39,7 +39,7 @@ export class ClientHandler {
client.init(); client.init();
try { try {
client.init(); client.init();
client.receive().subscribe(async (msg) => { client.receive$.subscribe(async (msg) => {
try { try {
await this.ctx.dispatch(msg, client); await this.ctx.dispatch(msg, client);
} catch (e) { } catch (e) {
...@@ -49,7 +49,7 @@ export class ClientHandler { ...@@ -49,7 +49,7 @@ export class ClientHandler {
} }
}); });
} catch { } catch {
client.disconnect().then(); client.disconnect();
} }
} }
} }
...@@ -30,7 +30,7 @@ export class ClientVersionCheck { ...@@ -30,7 +30,7 @@ export class ClientVersionCheck {
code: YGOPRO_VERSION, code: YGOPRO_VERSION,
}), }),
); );
await client.disconnect(); return client.disconnect();
}); });
} }
} }
...@@ -135,9 +135,7 @@ export class IpResolver { ...@@ -135,9 +135,7 @@ export class IpResolver {
{ ip: newIp, badCount, connectCount }, { ip: newIp, badCount, connectCount },
'Rejecting bad IP', 'Rejecting bad IP',
); );
client.disconnect().catch((err) => { client.disconnect();
this.logger.error({ err }, 'Error disconnecting client');
});
return true; return true;
} }
......
...@@ -77,7 +77,7 @@ export const YGOProProtoPipe = < ...@@ -77,7 +77,7 @@ export const YGOProProtoPipe = <
const total = 2 + len; const total = 2 + len;
if (total > maxFrameBytes) { if (total > maxFrameBytes) {
opts.onError?.(new Error('Frame size exceeds maxFrameBytes')); opts.onError?.(new Error('Frame size exceeds maxFrameBytes'));
state.skipBytes += total; state.skipBytes = (state.skipBytes ?? 0) + total;
continue; continue;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment