Commit 871772f8 authored by nanahira's avatar nanahira

fix zombie again

parent 5e0de755
Pipeline #43360 passed with stages
in 4 minutes and 51 seconds
......@@ -21,6 +21,7 @@ import {
timer,
} from 'rxjs';
import { YGOProCtosDisconnect } from '../utility/ygopro-ctos-disconnect';
import PQueue from 'p-queue';
export class ClientHandler {
private static readonly CLIENT_IDLE_TIMEOUT_MS = 5 * 60 * 1000;
......@@ -69,6 +70,9 @@ export class ClientHandler {
.middleware(
YGOProCtosBase,
async (msg, client, next) => {
if (msg instanceof YGOProCtosDisconnect) {
return next();
}
const bypassEstablished =
msg instanceof YGOProCtosJoinGame && msg.bypassEstablished;
if (bypassEstablished) {
......@@ -76,7 +80,7 @@ export class ClientHandler {
return next();
}
if (client.established === this.isPreHandshakeMsg(msg)) {
if (!client.established && !this.isPreHandshakeMsg(msg)) {
// disallow any messages before handshake is complete, except for the ones needed for handshake
return undefined;
}
......@@ -102,23 +106,10 @@ export class ClientHandler {
// 合并 receive$ 和 disconnect$
const receive$ = merge(client.receive$, disconnect$);
const dispatchQueue = new PQueue({ concurrency: 1 });
receive$.subscribe(async (msg) => {
this.logger.debug(
{
msgName: msg.constructor.name,
client: client.name || client.loggingIp(),
payload: JSON.stringify(msg),
},
'Received client message',
);
try {
await this.ctx.dispatch(msg, client);
} catch (e) {
this.logger.warn(
`Error dispatching message ${msg.constructor.name} from ${client.loggingIp()}: ${(e as Error).stack}`,
);
}
receive$.subscribe((msg) => {
dispatchQueue.add(async () => this.dispatchClientMessage(client, msg));
});
const handshake$ = forkJoin([
......@@ -159,6 +150,24 @@ export class ClientHandler {
});
}
private async dispatchClientMessage(client: Client, msg: YGOProCtosBase) {
this.logger.debug(
{
msgName: msg.constructor.name,
client: client.name || client.loggingIp(),
payload: JSON.stringify(msg),
},
'Received client message',
);
try {
await this.ctx.dispatch(msg, client);
} catch (e) {
this.logger.warn(
`Error dispatching message ${msg.constructor.name} from ${client.loggingIp()}: ${(e as Error).stack}`,
);
}
}
private installIdleDisconnectGuard(client: Client) {
client.receive$
.pipe(
......
......@@ -30,6 +30,7 @@ import {
resolveColoredMessages,
splitColoredMessagesByLine,
} from '../utility';
import { RoomManager } from '../room';
export class Client {
protected async _send(data: Buffer): Promise<void> {
......@@ -68,7 +69,7 @@ export class Client {
).pipe(
take(1),
tap(() => {
this.disconnected = new Date();
this.disconnected ??= new Date();
}),
share(),
);
......@@ -101,6 +102,13 @@ export class Client {
disconnected?: Date;
disconnect(): undefined {
this.disconnected ??= new Date();
if (this.roomName) {
const room = this.ctx.get(() => RoomManager).findByName(this.roomName);
if (room) {
room.removePlayer(this, true);
}
}
this.disconnectSubject.next();
this.disconnectSubject.complete();
this._disconnect().then();
......
......@@ -51,6 +51,21 @@ export class RoomEventRegister {
if (ctosParamIndex === -1 || !ctosParamType) {
continue;
}
if (clientParamIndex === -1) {
const fallbackClientIndex = paramTypes.findIndex(
(_paramType, index) => index !== ctosParamIndex,
);
if (fallbackClientIndex === -1) {
this.logger.warn(
`Method ${method} has no resolvable client parameter index, skipping`,
);
continue;
}
clientParamIndex = fallbackClientIndex;
this.logger.warn(
`Method ${method} has no explicit Client param metadata, fallback to arg[${clientParamIndex}] for client`,
);
}
// 获取方法选项
const options: RoomMethodOptions = metadata;
......
......@@ -582,11 +582,28 @@ export class Room {
}
@RoomMethod()
private async onDisconnect(client: Client, _msg: YGOProCtosDisconnect) {
private async onDisconnect(client: Client, msg: YGOProCtosDisconnect) {
if (!client) {
return;
}
return this.removePlayer(client, msg.bySystem);
}
async removePlayer(client: Client | undefined, bySystem = false) {
if (!client) {
return;
}
if (this.finalizing) {
return;
}
const wasObserver = client.pos === NetPlayerType.OBSERVER;
if (wasObserver) {
if (!this.watchers.has(client)) {
return;
}
} else if (this.players[client.pos] !== client) {
return;
}
const oldPos = client.pos;
if (wasObserver) {
......@@ -631,7 +648,7 @@ export class Room {
new OnRoomLeaveObserver(
this,
RoomLeaveObserverReason.Disconnect,
_msg.bySystem,
bySystem,
),
client,
);
......@@ -641,7 +658,7 @@ export class Room {
this,
oldPos,
RoomLeavePlayerReason.Disconnect,
_msg.bySystem,
bySystem,
),
client,
);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment