Commit 250e20c0 authored by nanahira's avatar nanahira

Merge branch 'master' of git.mycard.moe:mycard/console

parents a96136b3 bfcd8af1
Pipeline #38995 canceled with stages
in 3 minutes and 47 seconds
......@@ -5,7 +5,7 @@
#COPY console-web ./
#RUN npm run build
FROM node:16-bullseye-slim as api-base
FROM node:22-bookworm-slim as api-base
RUN apt update && \
apt -y install build-essential python3 libpq-dev tar zstd gzip && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
......
......@@ -210,12 +210,24 @@ export class PackagerService extends ConsoleLogger {
}
async archive(root: string, archiveTask: ArchiveTask): Promise<Archive> {
const archive = archiveTask.archive;
const archiveName = archiveTask.archiveFullPath;
const existing = await this.s3.fileExists(archiveName);
if (existing) {
const hash = await this.appService.lookForExistingArchiveHash(archiveTask.path);
if (hash) {
archive.hash = hash;
archive.size = existing.ContentLength;
this.log(`Archive ${archiveName} exists, skipping.`);
return archive;
}
}
return this.redlock.using([`archive:${archiveTask.path}`], 30000, async () => this.archiveProcess(root, archiveTask));
}
private archiveQueue = new PQueue({ concurrency: parseInt(process.env.PACKAGE_COCURRENCY) || os.cpus().length });
private async archiveProcess(root: string, archiveTask: ArchiveTask): Promise<Archive> {
private async archiveProcess(root: string, archiveTask: ArchiveTask, retry = 0): Promise<Archive> {
const archive = archiveTask.archive;
const archiveName = archiveTask.archiveFullPath;
const existing = await this.s3.fileExists(archiveName);
......@@ -223,11 +235,14 @@ export class PackagerService extends ConsoleLogger {
const hash = await this.appService.lookForExistingArchiveHash(archiveTask.path);
if (hash) {
archive.hash = hash;
archive.size = existing.Size;
archive.size = existing.ContentLength;
this.log(`Archive ${archiveName} exists, skipping.`);
return archive;
}
}
this.log(`Will archive ${archiveName}.`);
return this.archiveQueue.add(async () => {
const files = archiveTask.filePaths;
this.log(`Packaging archive ${archiveName} with ${archiveTask.exactFilePaths.length} files.`);
......@@ -248,23 +263,56 @@ export class PackagerService extends ConsoleLogger {
});
});
const hashObject = createHash('sha256');
// let length = 0;
child.stdout.on('data', (chunk) => {
// length += chunk.length;
// this.log(`Received ${length} bytes of archive ${archiveName}.`);
hashObject.update(chunk);
});
/* if (existing) {
await childPromise;
archive.hash = hashObject.digest('hex');
return archive;
let uploadStream = child.stdout;
let tmpFilename: string;
try {
/* if (files.length > 1000) {
this.warn(`Too many files in archive ${archiveName}, using tmp file.`);
// minio would skew the stream if it's too slow
// use a tmp file to put the stream
tmpFilename = path.join(this.packagerWorkingDirectory, `tmp-${archiveName}.tar.zst`);
// put the stream to the tmp file
const tmpStream = fs.createWriteStream(tmpFilename);
uploadStream.pipe(tmpStream);
// wait for the stream to finish
await new Promise((resolve, reject) => {
tmpStream.on('finish', resolve);
tmpStream.on('error', reject);
});
// open the tmp file as a new stream
uploadStream = fs.createReadStream(tmpFilename);
}*/
const uploadPromise = this.s3.uploadStream(archiveName, child.stdout, {
ContentType: 'application/tar+zstd',
});
const [, { object }] = await Promise.all([childPromise, uploadPromise]);
archive.hash = hashObject.digest('hex');
await this.redis.set(`hash:${archive.path}`, archive.hash, 'EX', 60 * 60 * 24);
archive.size = object.Size;
return archive;
const uploadPromise = this.s3.uploadStream(archiveName, uploadStream, {
ContentType: 'application/tar+zstd',
});
const [, { object }] = await Promise.all([childPromise, uploadPromise]);
archive.hash = hashObject.digest('hex');
await this.redis.set(`hash:${archive.path}`, archive.hash, 'EX', 60 * 60 * 24);
archive.size = object.Size;
this.log(`Finished archiving ${archiveName}.`);
return archive;
} catch (e) {
this.error(`Failed to archive ${archiveName}: ${e.toString()}`);
if (retry < 3) {
this.warn(`Retrying archive ${archiveName}.`);
return this.archiveProcess(root, archiveTask, retry + 1);
}
throw e;
} finally {
if (tmpFilename) {
await fs.promises.rm(tmpFilename);
}
}
});
}
......
import { ConsoleLogger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { _Object, DeleteObjectsCommand, ListObjectsCommand, PutObjectCommand, PutObjectCommandInput, S3Client } from '@aws-sdk/client-s3';
import {
_Object,
DeleteObjectsCommand,
HeadObjectCommand,
ListObjectsCommand,
PutObjectCommand,
PutObjectCommandInput,
S3Client,
S3ClientConfig,
} from '@aws-sdk/client-s3';
import { createHash } from 'crypto';
import internal from 'stream';
import { Upload } from '@aws-sdk/lib-storage';
......@@ -11,11 +20,6 @@ export interface S3StreamUploadResult {
}
export class S3Service extends ConsoleLogger {
private readonly bucket: string;
private readonly prefix: string;
public readonly cdnUrl: string;
private readonly s3: S3Client;
private getConfig(field: string) {
return this.config.get(`${this.servicePrefix}_${field}`) || this.config.get(field);
}
......@@ -24,20 +28,31 @@ export class S3Service extends ConsoleLogger {
return `${this.cdnUrl}/${path}`;
}
private readonly bucket = this.getConfig('S3_BUCKET');
private readonly prefix = this.getConfig('S3_PREFIX');
public readonly cdnUrl =
this.getConfig('S3_CDN_URL') || `${this.getConfig('S3_ENDPOINT')}/${this.bucket}${this.prefix ? `/${this.prefix}` : ''}`;
private readonly s3Config: S3ClientConfig = {
credentials: {
accessKeyId: this.getConfig('S3_KEY'),
secretAccessKey: this.getConfig('S3_SECRET'),
},
region: this.getConfig('S3_REGION') || 'us-west-1',
endpoint: this.getConfig('S3_ENDPOINT'),
forcePathStyle: true,
};
private readonly s3ConfigSkewed: S3ClientConfig = {
...this.s3Config,
// skew 14 mins ahead for long upload
systemClockOffset: 14 * 60 * 1000,
};
private readonly s3 = new S3Client(this.s3Config);
private readonly skewedS3 = new S3Client(this.s3ConfigSkewed);
constructor(private servicePrefix: string, private config: ConfigService) {
super(`${servicePrefix} s3`);
this.bucket = this.getConfig('S3_BUCKET');
this.prefix = this.getConfig('S3_PREFIX');
this.cdnUrl = this.getConfig('S3_CDN_URL') || `${this.getConfig('S3_ENDPOINT')}/${this.bucket}${this.prefix ? `/${this.prefix}` : ''}`;
this.s3 = new S3Client({
credentials: {
accessKeyId: this.getConfig('S3_KEY'),
secretAccessKey: this.getConfig('S3_SECRET'),
},
region: this.getConfig('S3_REGION') || 'us-west-1',
endpoint: this.getConfig('S3_ENDPOINT'),
forcePathStyle: true,
});
}
async listObjects(path: string) {
......@@ -61,9 +76,20 @@ export class S3Service extends ConsoleLogger {
}
async fileExists(path: string) {
const objects = await this.listObjects(path);
// const objects = await this.listObjects(path);
// this.log(objects);
return objects.Contents ? objects.Contents.find((obj) => obj.Key === this.getPathWithPrefix(path)) : null;
// return objects.Contents ? objects.Contents.find((obj) => obj.Key === this.getPathWithPrefix(path)) : null;
try {
const res = await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this.getPathWithPrefix(path),
})
);
return res;
} catch (e) {
return;
}
}
private getPathWithPrefix(filename: string) {
......@@ -115,7 +141,7 @@ export class S3Service extends ConsoleLogger {
async uploadStream(path: string, stream: internal.Readable, extras: Partial<PutObjectCommandInput> = {}) {
const key = this.getPathWithPrefix(path);
const upload = new Upload({
client: this.s3,
client: this.skewedS3,
params: {
Bucket: this.bucket,
Key: key,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment