Commit 0b7ddcdd authored by nanahira's avatar nanahira

fix stream and purge

parent b8337626
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
"license": "UNLICENSED", "license": "UNLICENSED",
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.26.0", "@aws-sdk/client-s3": "^3.26.0",
"@aws-sdk/lib-storage": "^3.26.0",
"@nestjs/common": "^8.0.0", "@nestjs/common": "^8.0.0",
"@nestjs/config": "^1.0.1", "@nestjs/config": "^1.0.1",
"@nestjs/core": "^8.0.0", "@nestjs/core": "^8.0.0",
...@@ -732,6 +733,23 @@ ...@@ -732,6 +733,23 @@
"node": ">= 10.0.0" "node": ">= 10.0.0"
} }
}, },
"node_modules/@aws-sdk/lib-storage": {
"version": "3.26.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/lib-storage/-/lib-storage-3.26.0.tgz",
"integrity": "sha512-LI4XPSE2Pl5ea3OnYNj7WQBIKEFlYte74OcMfP1LvieoOi8FLVyza4JBxS76w9KuexjU/M9nSAOTQdGhm3wzlQ==",
"dependencies": {
"buffer": "^5.6.0",
"stream-browserify": "^3.0.0",
"tslib": "^2.3.0"
},
"engines": {
"node": ">= 10.0.0"
},
"peerDependencies": {
"@aws-sdk/abort-controller": "^3.0.0",
"@aws-sdk/client-s3": "^3.0.0"
}
},
"node_modules/@aws-sdk/md5-js": { "node_modules/@aws-sdk/md5-js": {
"version": "3.25.0", "version": "3.25.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/md5-js/-/md5-js-3.25.0.tgz", "resolved": "https://registry.npmjs.org/@aws-sdk/md5-js/-/md5-js-3.25.0.tgz",
...@@ -4192,7 +4210,6 @@ ...@@ -4192,7 +4210,6 @@
"version": "5.7.1", "version": "5.7.1",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz",
"integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==",
"dev": true,
"funding": [ "funding": [
{ {
"type": "github", "type": "github",
...@@ -9760,6 +9777,55 @@ ...@@ -9760,6 +9777,55 @@
"node": ">= 0.6" "node": ">= 0.6"
} }
}, },
"node_modules/stream-browserify": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/stream-browserify/-/stream-browserify-3.0.0.tgz",
"integrity": "sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==",
"dependencies": {
"inherits": "~2.0.4",
"readable-stream": "^3.5.0"
}
},
"node_modules/stream-browserify/node_modules/readable-stream": {
"version": "3.6.0",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.0.tgz",
"integrity": "sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==",
"dependencies": {
"inherits": "^2.0.3",
"string_decoder": "^1.1.1",
"util-deprecate": "^1.0.1"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/stream-browserify/node_modules/safe-buffer": {
"version": "5.2.1",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz",
"integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
]
},
"node_modules/stream-browserify/node_modules/string_decoder": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",
"integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==",
"dependencies": {
"safe-buffer": "~5.2.0"
}
},
"node_modules/streamsearch": { "node_modules/streamsearch": {
"version": "0.1.2", "version": "0.1.2",
"resolved": "https://registry.npmjs.org/streamsearch/-/streamsearch-0.1.2.tgz", "resolved": "https://registry.npmjs.org/streamsearch/-/streamsearch-0.1.2.tgz",
...@@ -12137,6 +12203,16 @@ ...@@ -12137,6 +12203,16 @@
"tslib": "^2.3.0" "tslib": "^2.3.0"
} }
}, },
"@aws-sdk/lib-storage": {
"version": "3.26.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/lib-storage/-/lib-storage-3.26.0.tgz",
"integrity": "sha512-LI4XPSE2Pl5ea3OnYNj7WQBIKEFlYte74OcMfP1LvieoOi8FLVyza4JBxS76w9KuexjU/M9nSAOTQdGhm3wzlQ==",
"requires": {
"buffer": "^5.6.0",
"stream-browserify": "^3.0.0",
"tslib": "^2.3.0"
}
},
"@aws-sdk/md5-js": { "@aws-sdk/md5-js": {
"version": "3.25.0", "version": "3.25.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/md5-js/-/md5-js-3.25.0.tgz", "resolved": "https://registry.npmjs.org/@aws-sdk/md5-js/-/md5-js-3.25.0.tgz",
...@@ -14804,7 +14880,6 @@ ...@@ -14804,7 +14880,6 @@
"version": "5.7.1", "version": "5.7.1",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz",
"integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==",
"dev": true,
"requires": { "requires": {
"base64-js": "^1.3.1", "base64-js": "^1.3.1",
"ieee754": "^1.1.13" "ieee754": "^1.1.13"
...@@ -19053,6 +19128,40 @@ ...@@ -19053,6 +19128,40 @@
"resolved": "https://registry.npmjs.org/statuses/-/statuses-1.5.0.tgz", "resolved": "https://registry.npmjs.org/statuses/-/statuses-1.5.0.tgz",
"integrity": "sha1-Fhx9rBd2Wf2YEfQ3cfqZOBR4Yow=" "integrity": "sha1-Fhx9rBd2Wf2YEfQ3cfqZOBR4Yow="
}, },
"stream-browserify": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/stream-browserify/-/stream-browserify-3.0.0.tgz",
"integrity": "sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==",
"requires": {
"inherits": "~2.0.4",
"readable-stream": "^3.5.0"
},
"dependencies": {
"readable-stream": {
"version": "3.6.0",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.0.tgz",
"integrity": "sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==",
"requires": {
"inherits": "^2.0.3",
"string_decoder": "^1.1.1",
"util-deprecate": "^1.0.1"
}
},
"safe-buffer": {
"version": "5.2.1",
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz",
"integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ=="
},
"string_decoder": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz",
"integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==",
"requires": {
"safe-buffer": "~5.2.0"
}
}
}
},
"streamsearch": { "streamsearch": {
"version": "0.1.2", "version": "0.1.2",
"resolved": "https://registry.npmjs.org/streamsearch/-/streamsearch-0.1.2.tgz", "resolved": "https://registry.npmjs.org/streamsearch/-/streamsearch-0.1.2.tgz",
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
}, },
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.26.0", "@aws-sdk/client-s3": "^3.26.0",
"@aws-sdk/lib-storage": "^3.26.0",
"@nestjs/common": "^8.0.0", "@nestjs/common": "^8.0.0",
"@nestjs/config": "^1.0.1", "@nestjs/config": "^1.0.1",
"@nestjs/core": "^8.0.0", "@nestjs/core": "^8.0.0",
......
...@@ -5,7 +5,7 @@ import { FileInterceptor } from '@nestjs/platform-express'; ...@@ -5,7 +5,7 @@ import { FileInterceptor } from '@nestjs/platform-express';
import { FileUploadDto } from '../dto/FileUpload.dto'; import { FileUploadDto } from '../dto/FileUpload.dto';
import { AppsJson } from '../utility/apps-json-type'; import { AppsJson } from '../utility/apps-json-type';
import { AppService } from '../app.service'; import { AppService } from '../app.service';
import { BlankReturnMessageDto } from '../dto/ReturnMessage.dto'; import { BlankReturnMessageDto, ReturnMessageDto } from '../dto/ReturnMessage.dto';
import { AssignAppDto } from '../dto/AssignApp.dto'; import { AssignAppDto } from '../dto/AssignApp.dto';
import { AppPrefixDto } from '../dto/AppPrefix.dto'; import { AppPrefixDto } from '../dto/AppPrefix.dto';
...@@ -59,4 +59,11 @@ export class AdminController { ...@@ -59,4 +59,11 @@ export class AdminController {
async setAppPrefix(@Param('id') id: string, @Body(new ValidationPipe({ transform: true })) appPrefix: AppPrefixDto) { async setAppPrefix(@Param('id') id: string, @Body(new ValidationPipe({ transform: true })) appPrefix: AppPrefixDto) {
return this.appService.setAppPrefix(id, appPrefix._prefix); return this.appService.setAppPrefix(id, appPrefix._prefix);
} }
@Post('purge')
@ApiOperation({ summary: '清理没有用的 app 包' })
@ApiOkResponse({ type: BlankReturnMessageDto })
async purgeOldArchives() {
return new ReturnMessageDto(201, 'success', await this.appService.purgeOldArchives());
}
} }
...@@ -137,17 +137,16 @@ export class AppController { ...@@ -137,17 +137,16 @@ export class AppController {
const packagePromise = new Promise<ReturnMessageDto<PackageResult>>((resolve, reject) => { const packagePromise = new Promise<ReturnMessageDto<PackageResult>>((resolve, reject) => {
let gotFile = false; let gotFile = false;
busboy.on('file', async (fieldname, fileStream, filename, encoding, mimetype) => { busboy.on('file', async (fieldname, fileStream, filename, encoding, mimetype) => {
fileStream.pause();
if (fieldname !== 'file') { if (fieldname !== 'file') {
reject(new BlankReturnMessageDto(400, 'invalid field').toException()); reject(new BlankReturnMessageDto(400, 'invalid field').toException());
return; return;
} }
gotFile = true; gotFile = true;
// console.log(`got file ${fieldname}`); // console.log(`got file ${fieldname}`);
const stream = new Stream.Readable().wrap(fileStream);
try { try {
resolve(await this.appService.makeBuild(user, stream, id, depot, version)); resolve(await this.appService.makeBuild(user, fileStream, id, depot, version));
} catch (e) { } catch (e) {
stream.destroy();
reject(e); reject(e);
} }
}); });
......
import { Connection, IsNull, Not } from 'typeorm'; import { Connection, In, IsNull, Not, SelectQueryBuilder } from 'typeorm';
import { InjectConnection } from '@nestjs/typeorm'; import { InjectConnection } from '@nestjs/typeorm';
import { ConsoleLogger, Injectable } from '@nestjs/common'; import { ConsoleLogger, Injectable } from '@nestjs/common';
import { AppsJson } from './utility/apps-json-type'; import { AppsJson } from './utility/apps-json-type';
...@@ -11,19 +11,25 @@ import { Depot } from './entities/Depot.entity'; ...@@ -11,19 +11,25 @@ import { Depot } from './entities/Depot.entity';
import { DepotDto } from './dto/Depot.dto'; import { DepotDto } from './dto/Depot.dto';
import { Build } from './entities/Build.entity'; import { Build } from './entities/Build.entity';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { Archive } from './entities/Archive.entity'; import { Archive, ArchiveType } from './entities/Archive.entity';
import { PackageS3Service } from './package-s3/package-s3.service';
@Injectable() @Injectable()
export class AppService extends ConsoleLogger { export class AppService extends ConsoleLogger {
private readonly packageVersionTraceCount: number; private readonly packageVersionTraceCount: number;
private readonly packageVersionPreserveCount: number;
constructor( constructor(
@InjectConnection('app') @InjectConnection('app')
private db: Connection, private db: Connection,
private packager: PackagerService, private packager: PackagerService,
private packageS3: PackageS3Service,
config: ConfigService config: ConfigService
) { ) {
super('app'); super('app');
// 打包追溯几个版本的更新包
this.packageVersionTraceCount = parseInt(config.get('PACKAGE_VERSION_TRACE_COUNT')) || 5; this.packageVersionTraceCount = parseInt(config.get('PACKAGE_VERSION_TRACE_COUNT')) || 5;
// 清理掉几个版本之前的部分包
this.packageVersionPreserveCount = parseInt(config.get('PACKAGE_VERSION_PRESERVE_COUNT')) || 5;
} }
async getAppsJson() { async getAppsJson() {
...@@ -140,7 +146,7 @@ export class AppService extends ConsoleLogger { ...@@ -140,7 +146,7 @@ export class AppService extends ConsoleLogger {
}, 201); }, 201);
} }
async getOrCreateDepot(app: App, depotDto: DepotDto) { private async getOrCreateDepot(app: App, depotDto: DepotDto) {
const depotOption = depotDto.toActual; const depotOption = depotDto.toActual;
let depot = await this.db.getRepository(Depot).findOne({ where: { app, ...depotOption } }); let depot = await this.db.getRepository(Depot).findOne({ where: { app, ...depotOption } });
if (!depot) { if (!depot) {
...@@ -155,11 +161,11 @@ export class AppService extends ConsoleLogger { ...@@ -155,11 +161,11 @@ export class AppService extends ConsoleLogger {
return depot; return depot;
} }
async checkExistingBuild(depot: Depot, version: string) { private async checkExistingBuild(depot: Depot, version: string) {
return this.db.getRepository(Build).findOne({ where: { depot, version }, select: ['id'] }); return this.db.getRepository(Build).findOne({ where: { depot, version }, select: ['id'] });
} }
async makeBuild(user: MyCardUser, stream: internal.Readable, id: string, depotDto: DepotDto, version: string) { async makeBuild(user: MyCardUser, stream: NodeJS.ReadableStream, id: string, depotDto: DepotDto, version: string) {
if (!user) { if (!user) {
throw new BlankReturnMessageDto(401, 'Needs login').toException(); throw new BlankReturnMessageDto(401, 'Needs login').toException();
} }
...@@ -198,6 +204,67 @@ export class AppService extends ConsoleLogger { ...@@ -198,6 +204,67 @@ export class AppService extends ConsoleLogger {
} }
} }
private packageReferenceSubQuery(query: SelectQueryBuilder<any>) {
const subQuery = query
.subQuery()
.select('referencingArchive.id')
.from(Archive, 'referencingArchive')
.where('referencingArchive.path = archive.path')
.andWhere('referencingArchive.buildId != archive.buildId');
query.andWhere(`not exists ${subQuery.getQuery()}`);
}
async getPurgeOldArchivePaths() {
const query = this.db
.getRepository(Build)
.createQueryBuilder('unusedBuild')
.select('distinct(archive.path)', 'pathToPurge')
.innerJoin('unusedBuild.archives', 'archive')
.where('archive.role != :latestRole', { latestRole: ArchiveType.Full });
const subQuery = query
.subQuery()
.select('latestBuild.id')
.from(Build, 'latestBuild')
.where('latestBuild.depotId = unusedBuild.depotId')
.orderBy('latestBuild.id', 'DESC')
.take(this.packageVersionPreserveCount);
query.andWhere(`unusedBuild.id not in ${subQuery.getQuery()}`);
this.packageReferenceSubQuery(query);
return (await query.getRawMany()).map((s) => s.pathToPurge as string);
}
private async getArchivePathsToPurge(buildId: number) {
const query = this.db
.getRepository(Archive)
.createQueryBuilder('archive')
.select('distinct(archive.path)', 'pathToPurge')
.where('archive.buildId = :buildId', { buildId });
this.packageReferenceSubQuery(query);
// this.log(`SQL: ${query.getQueryAndParameters()}`);
return (await query.getRawMany()).map((s) => s.pathToPurge as string);
}
async purgeOldArchives() {
const paths = await this.getPurgeOldArchivePaths();
if (!paths.length) {
return;
}
this.log(`Will purge file ${paths.join(',')} for old archives.`);
return this.packageS3.removeObjects(paths);
}
private async purgeRelatedArchives(build: Build) {
const paths = await this.getArchivePathsToPurge(build.id);
if (!paths.length) {
return;
}
this.log(`Will purge file ${paths.join(',')} for build ${build.id} removal.`);
return this.packageS3.removeObjects(paths);
}
async removeBuild(user: MyCardUser, id: string, depotDto: DepotDto, version: string) { async removeBuild(user: MyCardUser, id: string, depotDto: DepotDto, version: string) {
if (!user) { if (!user) {
throw new BlankReturnMessageDto(401, 'Needs login').toException(); throw new BlankReturnMessageDto(401, 'Needs login').toException();
...@@ -217,6 +284,7 @@ export class AppService extends ConsoleLogger { ...@@ -217,6 +284,7 @@ export class AppService extends ConsoleLogger {
if (!build) { if (!build) {
throw new BlankReturnMessageDto(404, 'Build not found').toException(); throw new BlankReturnMessageDto(404, 'Build not found').toException();
} }
await this.purgeRelatedArchives(build);
await this.db.transaction(async (edb) => { await this.db.transaction(async (edb) => {
await edb.getRepository(Archive).delete({ build }); await edb.getRepository(Archive).delete({ build });
await edb.getRepository(Build).delete(build); await edb.getRepository(Build).delete(build);
......
...@@ -6,7 +6,7 @@ import util from 'util'; ...@@ -6,7 +6,7 @@ import util from 'util';
import _ from 'lodash'; import _ from 'lodash';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import internal from 'stream'; import internal, { Stream } from 'stream';
import { PackageResult } from '../dto/PackageResult.dto'; import { PackageResult } from '../dto/PackageResult.dto';
import { PackageS3Service } from '../package-s3/package-s3.service'; import { PackageS3Service } from '../package-s3/package-s3.service';
import readdirp from 'readdirp'; import readdirp from 'readdirp';
...@@ -14,6 +14,7 @@ import readdirp from 'readdirp'; ...@@ -14,6 +14,7 @@ import readdirp from 'readdirp';
import { ConsoleLogger, Injectable } from '@nestjs/common'; import { ConsoleLogger, Injectable } from '@nestjs/common';
import { createHash } from 'crypto'; import { createHash } from 'crypto';
import { Archive, ArchiveType } from '../entities/Archive.entity'; import { Archive, ArchiveType } from '../entities/Archive.entity';
import { Build } from '../entities/Build.entity';
export interface FileWithHash { export interface FileWithHash {
file: readdirp.EntryInfo; file: readdirp.EntryInfo;
...@@ -57,87 +58,101 @@ export class ArchiveTask { ...@@ -57,87 +58,101 @@ export class ArchiveTask {
export class PackagerService extends ConsoleLogger { export class PackagerService extends ConsoleLogger {
bucket_max = 10 * 1024 ** 2; bucket_max = 10 * 1024 ** 2;
bucket_enter = 1 * 1024 ** 2; bucket_enter = 1 * 1024 ** 2;
packagerWorkingDirectory: string;
constructor(private s3: PackageS3Service, config: ConfigService) { constructor(private s3: PackageS3Service, config: ConfigService) {
super('packager'); super('packager');
this.bucket_max = (parseInt(config.get('PACKAGE_BUCKET_MAX')) || 10) * 1024 ** 2; this.bucket_max = (parseInt(config.get('PACKAGE_BUCKET_MAX')) || 10) * 1024 ** 2;
this.bucket_enter = (parseInt(config.get('PACKAGE_BUCKET_ENTER')) || 1) * 1024 ** 2; this.bucket_enter = (parseInt(config.get('PACKAGE_BUCKET_ENTER')) || 1) * 1024 ** 2;
this.packagerWorkingDirectory = config.get('PACKAGE_WORKING_DIRECTORY') || os.tmpdir();
} }
async build(stream: internal.Readable, pathPrefix?: string, lastBuildChecksums: Record<string, string>[] = []): Promise<PackageResult> { async build(
stream: NodeJS.ReadableStream,
pathPrefix?: string,
lastBuildChecksums: Record<string, string>[] = []
): Promise<PackageResult> {
this.log(`Start packaging.`); this.log(`Start packaging.`);
const root = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'mycard-console-')); const root = await fs.promises.mkdtemp(path.join(this.packagerWorkingDirectory, 'mycard-console-'));
const tarballRoot = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'mycard-console-tarball-')); // const tarballRoot = await fs.promises.mkdtemp(path.join(this.packagerWorkingDirectory, 'mycard-console-tarball-'));
let extractRoot = root; let extractRoot = root;
if (pathPrefix) { if (pathPrefix) {
extractRoot = path.join(root, pathPrefix); extractRoot = path.join(root, pathPrefix);
await fs.promises.mkdir(extractRoot, { recursive: true }); await fs.promises.mkdir(extractRoot, { recursive: true });
} }
this.log(`Extracting package to ${extractRoot}.`); try {
await this.spawnAsync('tar', ['-zxf', '-'], { cwd: extractRoot }, stream); this.log(`Extracting package to ${extractRoot}.`);
stream.resume();
this.log(`Package extracted to ${extractRoot}.`); //stream.on('data', (data) => this.log(`data => ${data}`));
await this.spawnAsync('tar', ['-zxf', '-'], { cwd: extractRoot }, stream);
//const packagesSequence: string[][] = [];
const entries = await readdirp.promise(root, { alwaysStat: true, type: 'files_directories' }); this.log(`Package extracted to ${extractRoot}.`);
const [directories, files] = _.partition(entries, (item) => item.stats.isDirectory());
//const packagesSequence: string[][] = [];
// checksum const entries = await readdirp.promise(root, { alwaysStat: true, type: 'files_directories' });
const checksum = await this.checksum( const [directories, files] = _.partition(entries, (item) => item.stats.isDirectory());
root,
directories.map((d) => d.path), // checksum
files.map((f) => f.path) const checksum = await this.checksum(
); root,
const archiveTasks: ArchiveTask[] = []; directories.map((d) => d.path),
files.map((f) => f.path)
const filesWithHash: FileWithHash[] = files.map((f) => ({ file: f, hash: checksum[f.path] })); );
const archiveTasks: ArchiveTask[] = [];
// 整包
new ArchiveTask(ArchiveType.Full, filesWithHash, await fs.promises.readdir(root)).addToTask(archiveTasks); const filesWithHash: FileWithHash[] = files.map((f) => ({ file: f, hash: checksum[f.path] }));
// 更新包 // 整包
for (const lastChecksum of lastBuildChecksums) { new ArchiveTask(ArchiveType.Full, filesWithHash, await fs.promises.readdir(root)).addToTask(archiveTasks);
const changedFiles = filesWithHash.filter((f) => !lastChecksum[f.file.path] || lastChecksum[f.file.path] !== f.hash);
if (changedFiles.length) { // 更新包
new ArchiveTask(ArchiveType.Update, changedFiles).addToTask(archiveTasks); for (const lastChecksum of lastBuildChecksums) {
const changedFiles = filesWithHash.filter((f) => !lastChecksum[f.file.path] || lastChecksum[f.file.path] !== f.hash);
if (changedFiles.length) {
new ArchiveTask(ArchiveType.Update, changedFiles).addToTask(archiveTasks);
}
} }
}
// 散包 // 散包
const buckets: Record<string, [FileWithHash[], number]> = {}; const buckets: Record<string, [FileWithHash[], number]> = {};
for (const file of filesWithHash) { for (const file of filesWithHash) {
if (file.file.stats.size < this.bucket_enter) { if (file.file.stats.size < this.bucket_enter) {
const extname = path.extname(file.file.basename); const extname = path.extname(file.file.basename);
buckets[extname] ??= [[], 0]; buckets[extname] ??= [[], 0];
const bucket = buckets[extname]; const bucket = buckets[extname];
if (bucket[1] + file.file.stats.size >= this.bucket_max) { if (bucket[1] + file.file.stats.size >= this.bucket_max) {
new ArchiveTask(ArchiveType.Part, bucket[0]).addToTask(archiveTasks); new ArchiveTask(ArchiveType.Part, bucket[0]).addToTask(archiveTasks);
bucket[0] = []; bucket[0] = [];
bucket[1] = 0; bucket[1] = 0;
} else {
bucket[0].push(file);
bucket[1] += file.file.stats.size;
}
} else { } else {
bucket[0].push(file); new ArchiveTask(ArchiveType.Part, [file]).addToTask(archiveTasks);
bucket[1] += file.file.stats.size;
} }
} else {
new ArchiveTask(ArchiveType.Part, [file]).addToTask(archiveTasks);
} }
} for (const bucket of Object.values(buckets)) {
for (const bucket of Object.values(buckets)) { if (bucket[0].length) {
if (bucket[0].length) { new ArchiveTask(ArchiveType.Part, bucket[0]).addToTask(archiveTasks);
new ArchiveTask(ArchiveType.Part, bucket[0]).addToTask(archiveTasks); }
} }
}
const packages = await Promise.all(archiveTasks.map((t) => this.archive(root, tarballRoot, t))); // 这个 await 过后,checksum 和 打包上传都已经跑完了 const packages = await Promise.all(archiveTasks.map((t) => this.archive(root, t))); // 这个 await 过后,checksum 和 打包上传都已经跑完了
//for (let i = 0; i < packagesSequence.length; ++i) {
// packages[gotPackages[i]] = packagesSequence[i];
//}
//for (let i = 0; i < packagesSequence.length; ++i) { // this.log({ checksum, packages });
// packages[gotPackages[i]] = packagesSequence[i];
//}
// this.log({ checksum, packages }); return new PackageResult(checksum, packages);
await fs.promises.rm(root, { recursive: true }); } catch (e) {
await fs.promises.rm(tarballRoot, { recursive: true }); throw e;
return new PackageResult(checksum, packages); } finally {
await fs.promises.rm(root, { recursive: true });
// await fs.promises.rm(tarballRoot, { recursive: true });
}
} }
async checksum(root: string, directories: string[], files: string[]): Promise<Record<string, string>> { async checksum(root: string, directories: string[], files: string[]): Promise<Record<string, string>> {
...@@ -149,7 +164,7 @@ export class PackagerService extends ConsoleLogger { ...@@ -149,7 +164,7 @@ export class PackagerService extends ConsoleLogger {
]); ]);
} }
async archive(root: string, tarballRoot: string, archiveTask: ArchiveTask): Promise<Archive> { async archive(root: string, archiveTask: ArchiveTask): Promise<Archive> {
const archive = archiveTask.archive; const archive = archiveTask.archive;
const archiveName = archiveTask.path; const archiveName = archiveTask.path;
const existing = await this.s3.fileExists(archiveName); const existing = await this.s3.fileExists(archiveName);
...@@ -158,39 +173,37 @@ export class PackagerService extends ConsoleLogger { ...@@ -158,39 +173,37 @@ export class PackagerService extends ConsoleLogger {
return archive; return archive;
} }
const files = archiveTask.filePaths; const files = archiveTask.filePaths;
const archivePath = path.join(tarballRoot, archiveName); this.log(`Packaging archive ${archiveName} with ${files.length} files.`);
this.log(`Packaging archive ${archivePath} with ${files.length} files.`); const stream = new Stream.Writable();
await this.spawnAsync('tar', ['-zcvf', archivePath].concat(files), { const child = child_process.spawn('tar', ['-zcvf', '-'].concat(files), {
cwd: root, cwd: root,
}); });
const fileSize = (await fs.promises.stat(archivePath)).size; const childPromise = new Promise<void>((resolve, reject) => {
await this.s3.uploadFile(archiveName, fs.createReadStream(archivePath), { child.on('exit', (code) => {
if (code == 0) {
resolve();
} else {
reject(code);
}
});
child.on('error', (error) => {
reject(error);
});
});
const uploadPromise = this.s3.uploadStream(archiveName, child.stdout, {
ContentType: 'application/tar+gzip', ContentType: 'application/tar+gzip',
ContentLength: fileSize,
}); });
archive.size = fileSize; const [, { object }] = await Promise.all([childPromise, uploadPromise]);
archive.size = object.Size;
return archive; return archive;
} }
private spawnAsync( private spawnAsync(command: string, args: string[], options: child_process.SpawnOptions, stdinStream?: NodeJS.ReadableStream) {
command: string,
args: string[],
options: child_process.SpawnOptions,
stdinStream?: internal.Readable,
stdoutStream?: internal.Writable,
stderrStream?: internal.Writable
) {
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
const child = child_process.spawn(command, args, options); const child = child_process.spawn(command, args, options);
if (stdinStream) { if (stdinStream) {
stdinStream.pipe(child.stdin); stdinStream.pipe(child.stdin);
} }
if (stdoutStream) {
child.stdout.pipe(stdoutStream);
}
if (stderrStream) {
child.stderr.pipe(stderrStream);
}
child.on('exit', (code) => { child.on('exit', (code) => {
if (code == 0) { if (code == 0) {
resolve(); resolve();
......
import { ConsoleLogger } from '@nestjs/common'; import { ConsoleLogger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { ListObjectsCommand, PutObjectCommand, PutObjectCommandInput, S3Client } from '@aws-sdk/client-s3'; import { _Object, DeleteObjectsCommand, ListObjectsCommand, PutObjectCommand, PutObjectCommandInput, S3Client } from '@aws-sdk/client-s3';
import { createHash } from 'crypto'; import { createHash } from 'crypto';
import internal from 'stream'; import internal from 'stream';
import Path from 'path'; import Path from 'path';
import { Upload } from '@aws-sdk/lib-storage';
export interface S3StreamUploadResult {
url: string;
object: _Object;
}
export class S3Service extends ConsoleLogger { export class S3Service extends ConsoleLogger {
private readonly bucket: string; private readonly bucket: string;
private readonly prefix: string; private readonly prefix: string;
private readonly cdnUrl: string; public readonly cdnUrl: string;
private readonly s3: S3Client; private readonly s3: S3Client;
private getConfig(field: string) { private getConfig(field: string) {
...@@ -39,6 +45,18 @@ export class S3Service extends ConsoleLogger { ...@@ -39,6 +45,18 @@ export class S3Service extends ConsoleLogger {
return this.s3.send(command); return this.s3.send(command);
} }
async removeObjects(paths: string[]) {
const command = new DeleteObjectsCommand({
Bucket: this.bucket,
Delete: {
Objects: paths.map((path) => ({
Key: this.getPathWithPrefix(path),
})),
},
});
return this.s3.send(command);
}
async fileExists(path: string) { async fileExists(path: string) {
const objects = await this.listObjects(path); const objects = await this.listObjects(path);
// this.log(objects); // this.log(objects);
...@@ -67,7 +85,7 @@ export class S3Service extends ConsoleLogger { ...@@ -67,7 +85,7 @@ export class S3Service extends ConsoleLogger {
} }
} }
async uploadFile(path: string, content: Buffer | internal.Readable, extras: Partial<PutObjectCommandInput> = {}) { async uploadFile(path: string, content: Buffer, extras: Partial<PutObjectCommandInput> = {}) {
await this.s3.send( await this.s3.send(
new PutObjectCommand({ new PutObjectCommand({
Bucket: this.bucket, Bucket: this.bucket,
...@@ -78,4 +96,22 @@ export class S3Service extends ConsoleLogger { ...@@ -78,4 +96,22 @@ export class S3Service extends ConsoleLogger {
); );
return `${this.cdnUrl}/${path}`; return `${this.cdnUrl}/${path}`;
} }
async uploadStream(path: string, stream: internal.Readable, extras: Partial<PutObjectCommandInput> = {}) {
const key = this.getPathWithPrefix(path);
const upload = new Upload({
client: this.s3,
params: {
Bucket: this.bucket,
Key: key,
Body: stream,
...extras,
},
});
await upload.done();
const {
Contents: [object],
} = await this.listObjects(path);
return { object, url: `${this.cdnUrl}/${path}` };
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment