Commit d9679841 authored by nanahira's avatar nanahira

add lock for archive

parent 6718fcf5
Pipeline #5366 passed with stages
in 4 minutes and 48 seconds
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
"dependencies": { "dependencies": {
"@aws-sdk/client-s3": "^3.26.0", "@aws-sdk/client-s3": "^3.26.0",
"@aws-sdk/lib-storage": "^3.26.0", "@aws-sdk/lib-storage": "^3.26.0",
"@nestjs/cli": "^8.0.0",
"@nestjs/common": "^8.0.0", "@nestjs/common": "^8.0.0",
"@nestjs/config": "^1.0.1", "@nestjs/config": "^1.0.1",
"@nestjs/core": "^8.0.0", "@nestjs/core": "^8.0.0",
...@@ -34,6 +35,7 @@ ...@@ -34,6 +35,7 @@
"busboy": "^0.2.14", "busboy": "^0.2.14",
"class-transformer": "^0.4.0", "class-transformer": "^0.4.0",
"class-validator": "^0.13.1", "class-validator": "^0.13.1",
"delay": "^5.0.0",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"moment": "^2.29.1", "moment": "^2.29.1",
"mustache": "^4.2.0", "mustache": "^4.2.0",
...@@ -44,8 +46,7 @@ ...@@ -44,8 +46,7 @@
"rimraf": "^3.0.2", "rimraf": "^3.0.2",
"rxjs": "^7.2.0", "rxjs": "^7.2.0",
"swagger-ui-express": "^4.1.6", "swagger-ui-express": "^4.1.6",
"typeorm": "^0.2.37", "typeorm": "^0.2.37"
"@nestjs/cli": "^8.0.0"
}, },
"optionalDependencies": { "optionalDependencies": {
"pg-native": "^3.0.0" "pg-native": "^3.0.0"
......
...@@ -14,6 +14,7 @@ import { ConsoleLogger, forwardRef, Inject, Injectable } from '@nestjs/common'; ...@@ -14,6 +14,7 @@ import { ConsoleLogger, forwardRef, Inject, Injectable } from '@nestjs/common';
import { Archive, ArchiveType } from '../entities/Archive.entity'; import { Archive, ArchiveType } from '../entities/Archive.entity';
import { AppService } from '../app.service'; import { AppService } from '../app.service';
import { createHash } from 'crypto'; import { createHash } from 'crypto';
import delay from 'delay';
export interface FileWithHash { export interface FileWithHash {
file: readdirp.EntryInfo; file: readdirp.EntryInfo;
...@@ -63,6 +64,8 @@ export class PackagerService extends ConsoleLogger { ...@@ -63,6 +64,8 @@ export class PackagerService extends ConsoleLogger {
bucket_enter = 1 * 1024 ** 2; bucket_enter = 1 * 1024 ** 2;
packagerWorkingDirectory: string; packagerWorkingDirectory: string;
uploadLock = new Set<string>();
constructor( constructor(
@Inject(forwardRef(() => AppService)) private readonly appService: AppService, @Inject(forwardRef(() => AppService)) private readonly appService: AppService,
private s3: PackageS3Service, private s3: PackageS3Service,
...@@ -74,6 +77,19 @@ export class PackagerService extends ConsoleLogger { ...@@ -74,6 +77,19 @@ export class PackagerService extends ConsoleLogger {
this.packagerWorkingDirectory = config.get('PACKAGE_WORKING_DIRECTORY') || os.tmpdir(); this.packagerWorkingDirectory = config.get('PACKAGE_WORKING_DIRECTORY') || os.tmpdir();
} }
private async waitForLock(key: string) {
while (this.uploadLock.has(key)) {
await delay(10);
}
this.uploadLock.add(key);
}
private releaseLock(key: string) {
if (this.uploadLock.has(key)) {
this.uploadLock.delete(key);
}
}
async build( async build(
stream: NodeJS.ReadableStream, stream: NodeJS.ReadableStream,
pathPrefix?: string, pathPrefix?: string,
...@@ -186,38 +202,45 @@ export class PackagerService extends ConsoleLogger { ...@@ -186,38 +202,45 @@ export class PackagerService extends ConsoleLogger {
return archive; return archive;
} }
} }
await this.waitForLock(archiveTask.path);
const files = archiveTask.filePaths; const files = archiveTask.filePaths;
this.log(`Packaging archive ${archiveName} with ${archiveTask.exactFilePaths.length} files.`); this.log(`Packaging archive ${archiveName} with ${archiveTask.exactFilePaths.length} files.`);
const child = child_process.spawn('tar', ['--zstd', '-cf', '-'].concat(files), { try {
cwd: root, const child = child_process.spawn('tar', ['--zstd', '-cf', '-'].concat(files), {
}); cwd: root,
const childPromise = new Promise<void>((resolve, reject) => {
child.on('exit', (code) => {
if (code == 0) {
resolve();
} else {
reject(code);
}
}); });
child.on('error', (error) => { const childPromise = new Promise<void>((resolve, reject) => {
reject(error); child.on('exit', (code) => {
if (code == 0) {
resolve();
} else {
reject(code);
}
});
child.on('error', (error) => {
reject(error);
});
}); });
}); const hashObject = createHash('sha256');
const hashObject = createHash('sha256'); child.stdout.on('data', (chunk) => {
child.stdout.on('data', (chunk) => { hashObject.update(chunk);
hashObject.update(chunk); });
}); /* if (existing) {
/* if (existing) { await childPromise;
await childPromise; archive.hash = hashObject.digest('hex');
return archive;
}*/
const uploadPromise = this.s3.uploadStream(archiveName, child.stdout, {
ContentType: 'application/tar+zstd',
});
const [, { object }] = await Promise.all([childPromise, uploadPromise]);
archive.hash = hashObject.digest('hex'); archive.hash = hashObject.digest('hex');
return archive; archive.size = object.Size;
}*/ } catch (e) {
const uploadPromise = this.s3.uploadStream(archiveName, child.stdout, { throw e;
ContentType: 'application/tar+zstd', } finally {
}); this.releaseLock(archiveTask.path);
const [, { object }] = await Promise.all([childPromise, uploadPromise]); }
archive.hash = hashObject.digest('hex');
archive.size = object.Size;
return archive; return archive;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment