Commit 54f74bbf authored by nanahira's avatar nanahira

first

parent d2c334b4
# compiled output
/dist
/node_modules
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
# OS
.DS_Store
# Tests
/coverage
/.nyc_output
# IDEs and editors
/.idea
.project
.classpath
.c9/
*.launch
.settings/
*.sublime-workspace
# IDE - VSCode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
/data
/output
/config.yaml
.git*
Dockerfile
.dockerignore
/tests
/ssl
webpack.config.js
dist/*
build/*
*.js
module.exports = {
parser: '@typescript-eslint/parser',
parserOptions: {
project: 'tsconfig.json',
tsconfigRootDir : __dirname,
sourceType: 'module',
},
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
'plugin:@typescript-eslint/recommended',
'plugin:prettier/recommended',
],
root: true,
env: {
node: true,
jest: true,
},
ignorePatterns: ['.eslintrc.js'],
rules: {
'@typescript-eslint/interface-name-prefix': 'off',
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-explicit-any': 'off',
'@typescript-eslint/no-unused-vars': [
'error',
{ argsIgnorePattern: '^_' },
],
},
};
# compiled output
/dist
/node_modules
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
# OS
.DS_Store
# Tests
/coverage
/.nyc_output
# IDEs and editors
/.idea
.project
.classpath
.c9/
*.launch
.settings/
*.sublime-workspace
# IDE - VSCode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
/data
/output
/config.yaml
/ssl
stages:
- build
- deploy
variables:
GIT_DEPTH: "1"
before_script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
.build-image:
stage: build
script:
- docker build --pull -t $TARGET_IMAGE .
- docker push $TARGET_IMAGE
build-x86:
extends: .build-image
tags:
- docker
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-x86
build-arm:
extends: .build-image
tags:
- docker-arm
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-arm
.deploy:
stage: deploy
tags:
- docker
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-x86
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-arm
- docker manifest create $TARGET_IMAGE --amend $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-x86 --amend
$CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-arm
- docker manifest push $TARGET_IMAGE
deploy_latest:
extends: .deploy
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:latest
only:
- master
deploy_branch:
extends: .deploy
variables:
TARGET_IMAGE: $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG
/install-npm.sh
.git*
/data
/output
/config.yaml
.idea
.dockerignore
Dockerfile
/src
/coverage
/tests
/dist/tests
/build.js
{
"singleQuote": true,
"trailingComma": "all"
}
\ No newline at end of file
FROM node:lts-trixie-slim as base
LABEL Author="Nanahira <nanahira@momobako.com>"
RUN apt update && apt -y install python3 build-essential && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* /var/log/*
WORKDIR /usr/src/app
COPY ./package*.json ./
FROM base as builder
RUN npm ci && npm cache clean --force
COPY . ./
RUN npm run build
FROM base
ENV NODE_ENV production
RUN npm ci && npm cache clean --force
COPY --from=builder /usr/src/app/dist ./dist
CMD [ "npm", "start" ]
The MIT License (MIT)
Copyright (c) 2021 Nanahira
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
HOST: "::"
PORT: "7911"
LOG_LEVEL: info
WS_PORT: "0"
SSL_PATH: ""
SSL_CERT: ""
SSL_KEY: ""
TRUSTED_PROXIES: 127.0.0.0/8,::1/128
import { app } from './src/app';
app.start().then();
This diff is collapsed.
{
"name": "myproject",
"description": "myproject-desc",
"version": "1.0.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"lint": "eslint --fix .",
"build": "tsc",
"gen:config-example": "npm run build && node dist/src/scripts/gen-config-example.js",
"test": "jest --passWithNoTests",
"start": "node dist/index.js"
},
"repository": {
"type": "git",
"url": "https://code.moenext.com/nanahira/myproject.git"
},
"author": "Nanahira <nanahira@momobako.com>",
"license": "MIT",
"keywords": [],
"bugs": {
"url": "https://code.moenext.com/nanahira/myproject/issues"
},
"homepage": "https://code.moenext.com/nanahira/myproject",
"jest": {
"moduleFileExtensions": [
"js",
"json",
"ts"
],
"rootDir": "tests",
"testRegex": ".*\\.spec\\.ts$",
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
},
"collectCoverageFrom": [
"**/*.(t|j)s"
],
"coverageDirectory": "../coverage",
"testEnvironment": "node"
},
"devDependencies": {
"@types/ip6addr": "^0.2.6",
"@types/jest": "^30.0.0",
"@types/node": "^25.2.3",
"@types/ws": "^8.18.1",
"@typescript-eslint/eslint-plugin": "^8.55.0",
"@typescript-eslint/parser": "^8.55.0",
"eslint": "^8.57.1",
"eslint-config-prettier": "^10.1.8",
"eslint-plugin-prettier": "^5.5.5",
"jest": "^30.2.0",
"prettier": "^3.8.1",
"ts-jest": "^29.4.6",
"typescript": "^5.9.3"
},
"dependencies": {
"ip6addr": "^0.2.5",
"koishipro-core.js": "^1.3.0",
"nfkit": "^1.0.21",
"pino": "^10.3.1",
"pino-pretty": "^13.1.3",
"rxjs": "^7.8.2",
"ws": "^8.19.0",
"yaml": "^2.8.2",
"ygopro-cdb-encode": "^1.0.1",
"ygopro-deck-encode": "^1.0.15",
"ygopro-lflist-encode": "^1.0.3",
"ygopro-msg-encode": "^1.1.8",
"ygopro-yrp-encode": "^1.0.1"
}
}
import { createAppContext } from 'nfkit';
import { ConfigService } from './services/config';
import { Logger } from './services/logger';
import { Emitter } from './services/emitter';
import { SSLFinder } from './services/ssl-finder';
import { ClientHandler } from './services/client-handler';
import { IpResolver } from './services/ip-resolver';
const core = createAppContext()
.provide(ConfigService, {
merge: ['getConfig'],
})
.provide(Logger, { merge: ['createLogger'] })
.provide(Emitter, { merge: ['dispatch', 'middleware', 'removeMiddleware'] })
.define();
export type Context = typeof core;
export const app = core
.provide(SSLFinder)
.provide(IpResolver)
.provide(ClientHandler)
.define();
import {
filter,
firstValueFrom,
Observable,
Subject,
timeout,
TimeoutError,
} from 'rxjs';
import { Context } from './app';
import {
YGOProCtos,
YGOProStocBase,
YGOProCtosBase,
YGOProStocChat,
ChatColor,
YGOProStocErrorMsg,
} from 'ygopro-msg-encode';
import { YGOProProtoPipe } from './utility/ygopro-proto-pipe';
import { ClassType } from 'nfkit';
import { I18nService } from './services/i18n';
export abstract class Client {
protected abstract _send(data: Buffer): Promise<void>;
protected abstract _receive(): Observable<Buffer<ArrayBufferLike>>;
abstract disconnect(): Promise<void>;
abstract onDisconnect(): Observable<void>;
abstract physicalIp(): string;
ip = '';
isLocal = false;
private logger = this.ctx.createLogger(`Client ${this.physicalIp()}`);
private receiveSubject?: Subject<YGOProCtosBase>;
constructor(protected ctx: Context) {
// Subscribe to disconnect event to clean up subject
this.onDisconnect().subscribe(() => {
if (this.receiveSubject) {
this.receiveSubject.complete();
this.receiveSubject = undefined;
}
});
}
async send(data: YGOProStocBase) {
try {
await this._send(Buffer.from(data.toFullPayload()));
} catch (e) {
this.logger.warn(
{ ip: this.loggingIp(), error: (e as Error).message },
`Failed to send message: ${(e as Error).message}`,
);
}
}
async sendChat(msg: string, type: number) {
return this.send(
new YGOProStocChat().fromPartial({
msg: await this.ctx.get(I18nService).translate('en-US', msg),
player_type: type,
}),
);
}
async die(msg?: string, type?: number) {
if (msg) {
await this.sendChat(msg, type || ChatColor.BABYBLUE);
}
await this.send(
new YGOProStocErrorMsg().fromPartial({
msg: 1,
code: 9,
}),
);
this.disconnect().then();
}
loggingIp() {
return this.ip || this.physicalIp() || 'unknown';
}
receive(): Observable<YGOProCtosBase> {
// Create subject on first call and reuse it
if (!this.receiveSubject) {
this.receiveSubject = new Subject<YGOProCtosBase>();
this._receive()
.pipe(
YGOProProtoPipe(YGOProCtos, {
onError: (error) => {
this.logger.warn(
{ ip: this.loggingIp() },
`Protocol decode error: ${error.message}`,
);
},
}),
filter((msg) => {
if (!msg) {
this.logger.warn(
{ ip: this.loggingIp() },
`Received invalid message, skipping`,
);
return false;
}
return true;
}),
)
.subscribe({
next: (data) => this.receiveSubject?.next(data!),
error: (err) => this.receiveSubject?.error(err),
complete: () => this.receiveSubject?.complete(),
});
}
return this.receiveSubject.asObservable();
}
/**
* Wait for a message of any of the specified types
* @param types Array of message classes to wait for
* @param timeoutMs Timeout in milliseconds (default: 5000)
* @returns Promise that resolves with the matching message
* @throws Error if timeout is reached
*/
async waitForMessage<const C extends ClassType<YGOProCtosBase>[]>(
types: C,
timeoutMs = 5000,
): Promise<InstanceType<C[number]>> {
try {
return (await firstValueFrom(
this.receive().pipe(
filter((msg) => types.some((type) => msg instanceof type)) as any,
timeout(timeoutMs),
),
)) as InstanceType<C[number]>;
} catch (err) {
if (err instanceof TimeoutError) {
throw new Error(
`Timeout waiting for message after ${timeoutMs}ms (IP: ${this.loggingIp()})`,
);
}
throw err;
}
}
name = '';
vpass = '';
name_vpass = '';
}
import yaml from 'yaml';
import * as fs from 'node:fs';
export const defaultConfig = {
HOST: '::',
PORT: '7911',
LOG_LEVEL: 'info',
WS_PORT: '0',
SSL_PATH: '',
SSL_CERT: '',
SSL_KEY: '',
TRUSTED_PROXIES: '127.0.0.0/8,::1/128',
NO_CONNECT_COUNT_LIMIT: '',
ALT_VERSIONS: '',
};
export type Config = typeof defaultConfig;
export function loadConfig(): Config {
let readConfig: Partial<Config> = {};
try {
const configText = fs.readFileSync('./config.yaml', 'utf-8');
readConfig = yaml.parse(configText);
} catch (e) {
console.error(`Failed to read config: ${e.toString()}`);
}
return {
...defaultConfig,
...readConfig,
...process.env,
};
}
export const TRANSLATIONS = {
'en-US': {},
'zh-CN': {},
}
import * as fs from 'fs';
import yaml from 'yaml';
import { defaultConfig } from '../config';
async function main(): Promise<void> {
const output = yaml.stringify(defaultConfig);
await fs.promises.writeFile('./config.example.yaml', output, 'utf-8');
console.log('Generated config.example.yaml');
}
main().catch((error) => {
console.error(`Failed to generate config.example.yaml: ${error}`);
process.exitCode = 1;
});
import {
YGOProCtosExternalAddress,
YGOProCtosPlayerInfo,
} from 'ygopro-msg-encode';
import { Context } from '../app';
import { Client } from '../client';
import { IpResolver } from './ip-resolver';
import { WsClient } from '../transport/ws/client';
export class ClientHandler {
constructor(private ctx: Context) {}
private logger = this.ctx.createLogger('ClientHandler');
async handleClient(client: Client): Promise<void> {
try {
const first = await client.waitForMessage([
YGOProCtosPlayerInfo,
YGOProCtosExternalAddress,
]);
let playerInfo: YGOProCtosPlayerInfo;
if (first instanceof YGOProCtosExternalAddress) {
if (!(client instanceof WsClient)) {
this.ctx.get(IpResolver).setClientIp(client, first.real_ip);
}
playerInfo = await client.waitForMessage([YGOProCtosPlayerInfo]);
} else {
if (!(client instanceof WsClient)) {
this.ctx.get(IpResolver).setClientIp(client);
}
playerInfo = first;
}
client.name_vpass = playerInfo.name;
const [name, vpass] = playerInfo.name.split('$');
client.name = name;
client.vpass = vpass || '';
client.receive().subscribe(async (msg) => {
try {
await this.ctx.dispatch(msg, client);
} catch (e) {
this.logger.warn(
`Error dispatching message ${msg.constructor.name} from ${client.loggingIp()}: ${(e as Error).message}`,
);
}
});
} catch {
client.disconnect().then();
}
}
}
import { YGOProCtosJoinGame } from 'ygopro-msg-encode';
import { Context } from '../app';
const YGOPRO_VERSION = 0x1362;
export class ClientVersionCheck {
private altVersions = this.ctx
.getConfig('ALT_VERSIONS', '')
.split(',')
.map((v) => parseInt(v.trim()))
.filter((v) => v);
constructor(private ctx: Context) {
this.ctx.middleware(YGOProCtosJoinGame, async (msg, client, next) => {
if (msg.version === YGOPRO_VERSION) {
return next();
}
});
}
}
import { AppContext } from 'nfkit';
import { Config, loadConfig } from '../config';
export class ConfigService {
constructor(private app: AppContext) {}
config = loadConfig();
getConfig<K extends keyof Config, D extends Config[K]>(
key: K,
defaultValue?: D,
): D extends string ? Config[K] | D : Config[K] | undefined {
return (this.config[key] || defaultValue || undefined) as any;
}
}
import { AppContext, ProtoMiddlewareDispatcher } from 'nfkit';
import { Client } from '../client';
export class Emitter extends ProtoMiddlewareDispatcher<[Client]> {
constructor(private ctx: AppContext) {
super({
acceptResult: () => true,
errorHandler: (e) => {
throw e;
},
});
}
}
import { I18n, I18nLookupMiddleware } from 'nfkit';
import { Context } from '../app';
import { TRANSLATIONS } from '../constants/trans';
export class I18nService extends I18n {
constructor(private ctx: Context) {
super({
locales: Object.keys(TRANSLATIONS),
defaultLocale: 'en-US',
});
this.middleware(I18nLookupMiddleware(TRANSLATIONS));
}
}
import { Context } from '../app';
import { Client } from '../client';
import * as ip6addr from 'ip6addr';
export class IpResolver {
private logger = this.ctx.createLogger('IpResolver');
private connectedIpCount = new Map<string, number>();
private badIpCount = new Map<string, number>();
private trustedProxies: Array<
ReturnType<typeof ip6addr.createCIDR | typeof ip6addr.createAddrRange>
> = [];
constructor(private ctx: Context) {
// Parse trusted proxies configuration
const trustedProxiesConfig = this.ctx.getConfig(
'TRUSTED_PROXIES',
'127.0.0.0/8,::1/128',
);
const proxies = trustedProxiesConfig
.split(',')
.map((s) => s.trim())
.filter(Boolean);
for (const trusted of proxies) {
try {
if (trusted.includes('/')) {
this.trustedProxies.push(ip6addr.createCIDR(trusted));
} else {
this.trustedProxies.push(ip6addr.createAddrRange(trusted, trusted));
}
} catch (e: any) {
this.logger.warn(
{ trusted, err: e.message },
'Failed to parse trusted proxy',
);
}
}
this.logger.info(
{ count: this.trustedProxies.length },
'Trusted proxies initialized',
);
}
toIpv4(ip: string): string {
if (ip.startsWith('::ffff:')) {
return ip.slice(7);
}
return ip;
}
toIpv6(ip: string): string {
if (/^(\d{1,3}\.){3}\d{1,3}$/.test(ip)) {
return '::ffff:' + ip;
}
return ip;
}
isTrustedProxy(ip: string): boolean {
return this.trustedProxies.some((trusted) => {
try {
return trusted.contains(ip);
} catch {
return false;
}
});
}
getRealIp(physicalIp: string, xffIp?: string): string {
if (!xffIp || xffIp === physicalIp) {
return this.toIpv6(physicalIp);
}
if (this.isTrustedProxy(physicalIp)) {
return this.toIpv6(xffIp.split(',')[0].trim());
}
this.logger.warn({ physicalIp, xffIp }, 'Untrusted proxy detected');
return this.toIpv6(physicalIp);
}
/**
* Set client IP and check if client should be rejected
* @param client The client instance
* @param xffIp Optional X-Forwarded-For IP
* @returns true if client should be rejected (bad IP or too many connections)
*/
setClientIp(client: Client, xffIp?: string): boolean {
const prevIp = client.ip;
// Priority: passed xffIp > client.xffIp() > client.physicalIp()
const xff = xffIp;
const newIp = this.getRealIp(client.physicalIp(), xff);
client.ip = newIp;
// If IP hasn't changed, no need to update counts
if (prevIp === newIp) {
return false;
}
// Decrement count for previous IP
if (prevIp) {
const prevCount = this.connectedIpCount.get(prevIp) || 0;
if (prevCount > 0) {
if (prevCount === 1) {
this.connectedIpCount.delete(prevIp);
} else {
this.connectedIpCount.set(prevIp, prevCount - 1);
}
}
}
// Check if this is a local IP (127.0.0.1/::1) or in trusted proxies
const isLocal =
newIp.includes('127.0.0.1') ||
newIp.includes('::1') ||
this.isTrustedProxy(newIp);
client.isLocal = isLocal;
// Increment count for new IP
const noConnectCountLimit = this.ctx.getConfig(
'NO_CONNECT_COUNT_LIMIT',
'',
);
let connectCount = this.connectedIpCount.get(newIp) || 0;
if (!noConnectCountLimit && !isLocal && !this.isTrustedProxy(newIp)) {
connectCount++;
this.connectedIpCount.set(newIp, connectCount);
} else {
this.connectedIpCount.set(newIp, connectCount);
}
// Check if IP should be rejected
const badCount = this.badIpCount.get(newIp) || 0;
if (badCount > 5 || connectCount > 10) {
this.logger.info(
{ ip: newIp, badCount, connectCount },
'Rejecting bad IP',
);
client.disconnect().catch((err) => {
this.logger.error({ err }, 'Error disconnecting client');
});
return true;
}
return false;
}
/**
* Mark an IP as bad (increment bad count)
* @param ip The IP address to mark as bad
*/
addBadIp(ip: string): void {
const currentCount = this.badIpCount.get(ip) || 0;
this.badIpCount.set(ip, currentCount + 1);
this.logger.warn(
{ ip, count: currentCount + 1 },
'Bad IP count incremented',
);
}
/**
* Get the current connection count for an IP
*/
getConnectedIpCount(ip: string): number {
return this.connectedIpCount.get(ip) || 0;
}
/**
* Get the bad count for an IP
*/
getBadIpCount(ip: string): number {
return this.badIpCount.get(ip) || 0;
}
/**
* Clear all connection counts (useful for testing or maintenance)
*/
clearConnectionCounts(): void {
this.connectedIpCount.clear();
this.logger.info('Connection counts cleared');
}
/**
* Clear all bad IP counts (useful for testing or maintenance)
*/
clearBadIpCounts(): void {
this.badIpCount.clear();
this.logger.info('Bad IP counts cleared');
}
}
import { AppContext } from 'nfkit';
import pino from 'pino';
import { ConfigService } from './config';
export class Logger {
constructor(private ctx: AppContext) {}
private readonly logger = pino({
level: this.ctx.get(ConfigService).getConfig('LOG_LEVEL') || 'info',
transport: {
target: 'pino-pretty',
options: {
colorize: true,
translateTime: 'SYS:standard',
ignore: 'pid,hostname',
},
},
});
createLogger(name: string) {
return this.logger.child({ module: name });
}
}
import { Context } from '../app';
import { TlsOptions } from 'node:tls';
import fs from 'node:fs';
import path from 'node:path';
import {
X509Certificate,
createPrivateKey,
createPublicKey,
timingSafeEqual,
KeyObject,
} from 'node:crypto';
type LoadedCandidate = {
certPath: string;
keyPath: string;
certBuf: Buffer;
keyBuf: Buffer;
validToMs: number;
};
export class SSLFinder {
constructor(private ctx: Context) {}
private sslPath = this.ctx.getConfig('SSL_PATH', './ssl');
private sslKey = this.ctx.getConfig('SSL_KEY', '');
private sslCert = this.ctx.getConfig('SSL_CERT', '');
private logger = this.ctx.createLogger('SSLFinder');
private noSSL() {
if (this.sslPath || this.sslKey || this.sslCert) {
throw new Error(
`SSL configuration provided but no valid cert/key found. SSL_PATH=${this.sslPath}, SSL_KEY=${this.sslKey}, SSL_CERT=${this.sslCert}`,
);
}
return undefined;
}
findSSL(): TlsOptions | undefined {
// 1) 优先 SSL_CERT + SSL_KEY
const explicit = this.tryExplicit(this.sslCert, this.sslKey);
if (explicit) return { cert: explicit.certBuf, key: explicit.keyBuf };
// 2) 其次 sslPath:递归找 fullchain.pem + 同目录 privkey.pem,排除过期/不匹配;选有效期最长
const best = this.findBestFromPath(this.sslPath);
if (!best) return this.noSSL();
return { cert: best.certBuf, key: best.keyBuf };
}
private tryExplicit(
certValue: string,
keyValue: string,
): LoadedCandidate | undefined {
if (!certValue || !keyValue) return undefined;
const certPath = path.resolve(certValue);
const keyPath = path.resolve(keyValue);
const certBuf = this.readFileBuffer(certPath);
if (!certBuf) {
this.logger.warn(
{ certPath },
'SSL_CERT file not found or unreadable; falling back to SSL_PATH search',
);
return undefined;
}
const parsed = this.parseLeafCertFromBuffer(certBuf);
if (!parsed) {
this.logger.warn(
{ certPath },
'SSL_CERT does not contain a readable leaf certificate; falling back',
);
return undefined;
}
const now = Date.now();
if (now >= parsed.validToMs) {
this.logger.warn(
{ certPath, validTo: new Date(parsed.validToMs).toISOString() },
'SSL_CERT is expired; falling back',
);
return undefined;
}
const keyBuf = this.readFileBuffer(keyPath);
if (!keyBuf) {
this.logger.warn(
{ keyPath },
'SSL_KEY file not found or unreadable; falling back',
);
return undefined;
}
if (!this.isKeyMatching(parsed.x509, keyBuf, keyPath)) {
this.logger.warn(
{ certPath, keyPath },
'SSL_CERT and SSL_KEY do not match; falling back',
);
return undefined;
}
return { certPath, keyPath, certBuf, keyBuf, validToMs: parsed.validToMs };
}
private findBestFromPath(dirValue: string): LoadedCandidate | undefined {
const baseDir = path.resolve(dirValue);
if (!this.isDir(baseDir)) return undefined;
let best: LoadedCandidate | undefined;
const now = Date.now();
for (const fullchainPath of this.walkFindByName(baseDir, 'fullchain.pem')) {
// 先读 cert(一次),不合格就别读 key
const certBuf = this.readFileBuffer(fullchainPath);
if (!certBuf) continue;
const parsed = this.parseLeafCertFromBuffer(certBuf);
if (!parsed) continue;
if (now >= parsed.validToMs) {
this.logger.warn(
{
certPath: fullchainPath,
validTo: new Date(parsed.validToMs).toISOString(),
},
'Found fullchain.pem but it is expired; skipping',
);
continue;
}
const keyPath = path.join(path.dirname(fullchainPath), 'privkey.pem');
const keyBuf = this.readFileBuffer(keyPath);
if (!keyBuf) continue;
if (!this.isKeyMatching(parsed.x509, keyBuf, keyPath)) {
this.logger.warn(
{ certPath: fullchainPath, keyPath },
'Found cert/key pair but they do not match; skipping',
);
continue;
}
const cand: LoadedCandidate = {
certPath: fullchainPath,
keyPath,
certBuf,
keyBuf,
validToMs: parsed.validToMs,
};
if (!best || cand.validToMs > best.validToMs) best = cand;
}
return best;
}
private readFileBuffer(p: string): Buffer | undefined {
try {
return fs.readFileSync(p);
} catch {
return undefined;
}
}
private parseLeafCertFromBuffer(
certBuf: Buffer,
): { x509: X509Certificate; validToMs: number } | undefined {
// fullchain.pem / cert.pem 里通常第一个 CERT block 是 leaf
const pem = certBuf.toString('utf8');
const firstCertPem = this.extractFirstPemCertificate(pem);
if (!firstCertPem) return undefined;
try {
const x509 = new X509Certificate(firstCertPem);
const validToMs = Date.parse(x509.validTo);
if (!Number.isFinite(validToMs)) return undefined;
return { x509, validToMs };
} catch {
return undefined;
}
}
private extractFirstPemCertificate(pem: string): string | undefined {
const m = pem.match(
/-----BEGIN CERTIFICATE-----[\s\S]*?-----END CERTIFICATE-----/m,
);
return m?.[0];
}
private isKeyMatching(
x509: X509Certificate,
keyBuf: Buffer,
keyPathForLog: string,
): boolean {
try {
// cert 公钥
const certPub = x509.publicKey;
// private key -> derive public key
const priv = createPrivateKey(keyBuf);
const derivedPub = createPublicKey(priv);
return this.publicKeysEqual(certPub, derivedPub);
} catch (err: any) {
// 这里常见是:私钥被 passphrase 加密 / 格式不对
this.logger.warn(
{ keyPath: keyPathForLog, err: err?.message ?? String(err) },
'Failed to parse private key for match check; treating as mismatch',
);
return false;
}
}
private publicKeysEqual(a: KeyObject, b: KeyObject): boolean {
// 统一导出成 spki der 来对比
const aDer = a.export({ type: 'spki', format: 'der' }) as Buffer;
const bDer = b.export({ type: 'spki', format: 'der' }) as Buffer;
if (aDer.length !== bDer.length) return false;
return timingSafeEqual(aDer, bDer);
}
private *walkFindByName(root: string, filename: string): Generator<string> {
const stack: string[] = [root];
while (stack.length) {
const cur = stack.pop()!;
let entries: fs.Dirent[];
try {
entries = fs.readdirSync(cur, { withFileTypes: true });
} catch {
continue;
}
for (const ent of entries) {
const p = path.join(cur, ent.name);
if (ent.isDirectory()) {
stack.push(p);
} else if (ent.isFile() && ent.name === filename) {
yield p;
}
}
}
}
private isDir(p: string): boolean {
try {
return fs.statSync(p).isDirectory();
} catch {
return false;
}
}
}
import { Socket } from 'node:net';
import { Observable, fromEvent } from 'rxjs';
import { Context } from '../../app';
import { Client } from '../../client';
export class TcpClient extends Client {
constructor(
ctx: Context,
private sock: Socket,
) {
super(ctx);
}
_send(data: Buffer): Promise<void> {
return new Promise((resolve, reject) => {
this.sock.write(data, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
_receive(): Observable<Buffer> {
return fromEvent<Buffer>(this.sock, 'data');
}
disconnect(): Promise<void> {
if (this.sock.destroyed) {
return Promise.resolve();
}
return new Promise((resolve) => {
this.sock.once('close', () => resolve());
this.sock.end();
});
}
onDisconnect(): Observable<void> {
return fromEvent<void>(this.sock, 'close');
}
physicalIp(): string {
return this.sock.remoteAddress ?? '';
}
xffIp(): string | undefined {
return undefined;
}
}
import { Server as NetServer, Socket, createServer } from 'node:net';
import { Context } from '../../app';
import { ClientHandler } from '../../services/client-handler';
import { TcpClient } from './client';
export class TcpServer {
private server?: NetServer;
private logger = this.ctx.createLogger('TcpServer');
constructor(private ctx: Context) {}
async init(): Promise<void> {
const port = this.ctx.getConfig('PORT', '0');
if (!port || port === '0') {
this.logger.info('PORT not configured, TCP server will not start');
return;
}
const host = this.ctx.getConfig('HOST', '::');
const portNum = parseInt(port, 10);
this.server = createServer((socket) => {
this.handleConnection(socket);
});
await new Promise<void>((resolve, reject) => {
this.server!.listen(portNum, host, () => {
this.logger.info({ host, port: portNum }, 'TCP server listening');
resolve();
});
this.server!.on('error', (err) => {
this.logger.error({ err }, 'TCP server error');
reject(err);
});
});
}
private handleConnection(socket: Socket): void {
const client = new TcpClient(this.ctx, socket);
const handler = this.ctx.get(ClientHandler);
handler.handleClient(client).catch((err) => {
this.logger.error({ err }, 'Error handling client');
});
}
async stop(): Promise<void> {
if (!this.server) {
return;
}
await new Promise<void>((resolve) => {
this.server!.close(() => {
this.logger.info('TCP server closed');
resolve();
});
});
}
}
import { IncomingMessage } from 'node:http';
import { Socket } from 'node:net';
import { Observable, filter, fromEvent, map } from 'rxjs';
import WebSocket, { RawData } from 'ws';
import { Context } from '../../app';
import { Client } from '../../client';
export class WsClient extends Client {
constructor(
ctx: Context,
private sock: WebSocket,
private req?: IncomingMessage,
) {
super(ctx);
}
_send(data: Buffer): Promise<void> {
return new Promise((resolve, reject) => {
this.sock.send(data, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
_receive(): Observable<Buffer> {
return fromEvent<[RawData, boolean]>(this.sock, 'message').pipe(
filter(([, isBinary]) => isBinary),
map(([data]) => {
if (Buffer.isBuffer(data)) {
return data;
}
if (Array.isArray(data)) {
return Buffer.concat(data);
}
return Buffer.from(data);
}),
);
}
disconnect(): Promise<void> {
if (this.sock.readyState === WebSocket.CLOSED) {
return Promise.resolve();
}
return new Promise((resolve) => {
this.sock.once('close', () => resolve());
this.sock.close();
});
}
onDisconnect(): Observable<void> {
return fromEvent<void>(this.sock, 'close');
}
physicalIp(): string {
return (
this.req?.socket.remoteAddress ??
(this.sock as WebSocket & { _socket?: Socket })._socket?.remoteAddress ??
''
);
}
xffIp(): string | undefined {
const xff = this.req?.headers['x-forwarded-for'];
if (!xff) {
return undefined;
}
if (Array.isArray(xff)) {
return xff[0];
}
return xff;
}
}
import { IncomingMessage, createServer as createHttpServer } from 'node:http';
import { createServer as createHttpsServer } from 'node:https';
import { Server as WebSocketServer } from 'ws';
import { Context } from '../../app';
import { ClientHandler } from '../../services/client-handler';
import { SSLFinder } from '../../services/ssl-finder';
import { WsClient } from './client';
import { WebSocket } from 'ws';
import { IpResolver } from '../../services/ip-resolver';
export class WsServer {
private wss?: WebSocketServer;
private httpServer?: ReturnType<
typeof createHttpServer | typeof createHttpsServer
>;
private logger = this.ctx.createLogger('WsServer');
constructor(private ctx: Context) {}
async init(): Promise<void> {
const wsPort = this.ctx.getConfig('WS_PORT', '0');
if (!wsPort || wsPort === '0') {
this.logger.info(
'WS_PORT not configured, WebSocket server will not start',
);
return;
}
const host = this.ctx.getConfig('HOST', '::');
const portNum = parseInt(wsPort, 10);
// Try to get SSL configuration
const sslFinder = this.ctx.get(SSLFinder);
const sslOptions = sslFinder.findSSL();
if (sslOptions) {
this.logger.info(
'SSL configuration found, starting HTTPS WebSocket server',
);
this.httpServer = createHttpsServer(sslOptions);
} else {
this.logger.info('No SSL configuration, starting HTTP WebSocket server');
this.httpServer = createHttpServer();
}
this.wss = new WebSocketServer({ server: this.httpServer });
this.wss.on('connection', (ws, req) => {
this.handleConnection(ws, req);
});
await new Promise<void>((resolve, reject) => {
this.httpServer!.listen(portNum, host, () => {
this.logger.info(
{ host, port: portNum, secure: !!sslOptions },
'WebSocket server listening',
);
resolve();
});
this.httpServer!.on('error', (err) => {
this.logger.error({ err }, 'WebSocket server error');
reject(err);
});
});
}
private handleConnection(ws: WebSocket, req: IncomingMessage): void {
const client = new WsClient(this.ctx, ws, req);
if (this.ctx.get(IpResolver).setClientIp(client, client.xffIp())) return;
const handler = this.ctx.get(ClientHandler);
handler.handleClient(client).catch((err) => {
this.logger.error({ err }, 'Error handling client');
});
}
async stop(): Promise<void> {
if (this.wss) {
this.wss.close();
}
if (this.httpServer) {
await new Promise<void>((resolve) => {
this.httpServer!.close(() => {
this.logger.info('WebSocket server closed');
resolve();
});
});
}
}
}
import { Observable, OperatorFunction, EMPTY, from } from 'rxjs';
import { scan, mergeMap } from 'rxjs/operators';
import { YGOProCtos, YGOProStoc } from 'ygopro-msg-encode';
export type YGOProPipeOptions = {
maxFrameBytes?: number;
maxBufferBytes?: number;
onError?: (err: Error) => any;
};
type ScanState<T> = {
acc: Buffer;
out: T[]; // 本轮解析出来的消息
skipBytes?: number;
};
export const YGOProProtoPipe = <
R extends typeof YGOProCtos | typeof YGOProStoc,
>(
registry: R,
opts: YGOProPipeOptions = {},
): OperatorFunction<
Buffer<ArrayBufferLike>,
ReturnType<R['getInstanceFromPayload']>
> => {
const maxFrameBytes = opts.maxFrameBytes ?? 64 * 1024;
const maxBufferBytes = opts.maxBufferBytes ?? 4 * 1024 * 1024;
return (source: Observable<Buffer<ArrayBufferLike>>) =>
source.pipe(
scan<
Buffer<ArrayBufferLike>,
ScanState<ReturnType<R['getInstanceFromPayload']>>
>(
(state, chunk) => {
let acc =
!chunk || chunk.length === 0
? state.acc
: state.acc.length === 0
? Buffer.from(chunk)
: Buffer.concat([state.acc, chunk]);
// 输出先清空,本轮重新填
const out: ReturnType<R['getInstanceFromPayload']>[] = [];
if (acc.length > maxBufferBytes) {
opts.onError?.(
new Error(
'Buffer overflow: accumulated bytes exceed maxBufferBytes',
),
);
// reset
return { acc: Buffer.alloc(0), out };
}
while (acc.length >= 3) {
if (state.skipBytes && state.skipBytes > 0) {
if (acc.length < state.skipBytes) {
return {
acc: Buffer.alloc(0),
out,
skipBytes: state.skipBytes - acc.length,
};
} else {
acc = acc.subarray(state.skipBytes);
state.skipBytes = 0;
continue;
}
}
const len = acc.readUInt16LE(0);
if (len < 1) {
acc = acc.subarray(2);
continue;
}
const total = 2 + len;
if (total > maxFrameBytes) {
opts.onError?.(new Error('Frame size exceeds maxFrameBytes'));
state.skipBytes += total;
continue;
}
if (acc.length < total) break;
const frame = acc.subarray(0, total);
acc = acc.subarray(total);
try {
const inst = registry.getInstanceFromPayload(frame);
if (inst)
out.push(inst as ReturnType<R['getInstanceFromPayload']>);
} catch (err) {
opts.onError?.(err as Error);
// skip invalid frame
}
}
return { acc, out };
},
{ acc: Buffer.alloc(0), out: [] },
),
// 把每轮的 out 数组摊平成逐条 emit
mergeMap((s) => (s.out.length ? from(s.out) : EMPTY)),
);
};
describe('Sample test.', () => {
it('should pass', () => {
expect(true).toBe(true);
});
});
{
"compilerOptions": {
"outDir": "dist",
"module": "commonjs",
"target": "es2021",
"esModuleInterop": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"declaration": true,
"sourceMap": true,
"types": [
"node",
"jest"
]
},
"compileOnSave": true,
"allowJs": true,
"include": [
"*.ts",
"src/**/*.ts",
"test/**/*.ts",
"tests/**/*.ts"
]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment