Commit 25dc957a authored by nano's avatar nano

Queue

parent a141a695
......@@ -114,3 +114,4 @@ export function caculateSHA256(file: string): Promise<string> {
input.pipe(hash);
});
}
import {Context} from 'koa';
import {OSS} from 'aliyun-sdk';
import { Context } from 'koa';
import { OSS } from 'aliyun-sdk';
import * as busboy from 'async-busboy';
import * as mime from 'mime';
import * as uuid from 'uuid';
......@@ -7,12 +7,17 @@ import * as Client from 'aliyun-oss-upload-stream';
import * as fs from 'fs-extra-promise';
import * as path from 'path';
import * as Aria2 from 'aria2';
import {bundle} from '../../package/main';
import {mongodb} from '../models/Iridium';
import {toObjectID} from 'iridium';
import { bundle } from '../../package/main';
import { mongodb } from '../models/Iridium';
import { toObjectID } from 'iridium';
import config from '../../config';
import {UploadOSS} from '../utils';
import { UploadOSS } from '../utils';
import Router = require('koa-router');
import { Queue } from '../utils';
const queue = new Queue({
concurrency: 1
});
const checkFilePath = async (file) => {
if (['.gz', '.rar', '.zip', '.7z'].indexOf(path.extname(file.path)) === -1) {
......@@ -48,7 +53,7 @@ const router = new Router();
const UploadImage = async (ctx: Context) => {
try {
const {files} = await busboy(ctx.req);
const { files } = await busboy(ctx.req);
ctx.body = await Promise.all(files.map(async file => {
await checkImage(file);
......@@ -75,7 +80,7 @@ const UploadImage = async (ctx: Context) => {
export const UploadPackage = async (ctx: Context) => {
try {
const {files} = await busboy(ctx.req);
const { files } = await busboy(ctx.req);
ctx.body = await Promise.all(files.map(async file => {
await checkPackage(file);
......@@ -88,7 +93,7 @@ export const UploadPackage = async (ctx: Context) => {
const archive = fs.createWriteStream(path.join(archive_path, filename));
let pack = await mongodb.Packages.findOne({_id: toObjectID(ctx.params.id)});
let pack = await mongodb.Packages.findOne({ _id: toObjectID(ctx.params.id) });
if (!pack) {
return ctx.throw(400, 'pack not exists');
}
......@@ -104,8 +109,13 @@ export const UploadPackage = async (ctx: Context) => {
await pack!.save();
resolve(pack!);
// 上传完, 打包
const bundled = await bundle(filename);
let bundled;
queue.run(async (ctx, next) => {
bundled = await bundle(filename);
this.next();
});
// 打包完,上传阿里云
await UploadOSS(bundled.distPath);
......@@ -113,7 +123,7 @@ export const UploadPackage = async (ctx: Context) => {
Object.assign(pack, bundled);
pack!.status = 'uploaded';
await mongodb.Packages.update({id: pack!.id}, {$set: {status: 'deprecated'}}, {multi: true});
await mongodb.Packages.update({ id: pack!.id }, { $set: { status: 'deprecated' } }, { multi: true });
await pack!.save();
// 上传完,干掉本地目录
......@@ -151,13 +161,13 @@ const uploadPackageUrl = async (ctx: Context) => {
const downloader = new Aria2;
await downloader.open();
let pack = await mongodb.Packages.findOne({_id: toObjectID(ctx.request.body._id)});
let pack = await mongodb.Packages.findOne({ _id: toObjectID(ctx.request.body._id) });
if (!pack) {
return ctx.throw(400, 'pack not exists');
}
downloader.onDownloadStart = async ({gid}) => {
const {files} = await downloader.send('tellStatus', gid);
downloader.onDownloadStart = async ({ gid }) => {
const { files } = await downloader.send('tellStatus', gid);
const [file] = files;
const [url] = file.uris;
if (ctx.request.body.url == url.uri) {
......@@ -166,16 +176,22 @@ const uploadPackageUrl = async (ctx: Context) => {
}
};
downloader.onDownloadComplete = async ({gid}) => {
const {files} = await downloader.send('tellStatus', gid);
downloader.onDownloadComplete = async ({ gid }) => {
const { files } = await downloader.send('tellStatus', gid);
const [file] = files;
const [url] = file.uris;
if (ctx.request.body.url == url.uri) {
try {
await checkFilePath(file);
// 打包
const bundled = await bundle(path.basename(file.path));
let bundled;
queue.run(async (ctx, next) => {
bundled = await bundle(path.basename(file.path));
this.next();
});
// 打包完, 上传阿里云
await UploadOSS(bundled.distPath);
......@@ -183,7 +199,7 @@ const uploadPackageUrl = async (ctx: Context) => {
Object.assign(pack, bundled);
pack!.status = 'uploaded';
await mongodb.Packages.update({id: pack!.id}, {$set: {status: 'deprecated'}}, {multi: true});
await mongodb.Packages.update({ id: pack!.id }, { $set: { status: 'deprecated' } }, { multi: true });
await pack!.save();
// 上传完,干掉本地目录
......@@ -198,7 +214,7 @@ const uploadPackageUrl = async (ctx: Context) => {
};
downloader.onDownloadError = async (err) => {
const {files} = await downloader.send('tellStatus', err.gid);
const { files } = await downloader.send('tellStatus', err.gid);
const [file] = files;
const [url] = file.uris;
......@@ -220,7 +236,7 @@ const uploadPackageUrl = async (ctx: Context) => {
}
};
downloader.send('addUri', [ctx.request.body.url], {dir: config.upload_path});
downloader.send('addUri', [ctx.request.body.url], { dir: config.upload_path });
});
};
......
import {URL} from 'url';
import { URL } from 'url';
import * as child_process from 'child_process';
export const dot = '__<DOT>__';
......@@ -18,12 +18,12 @@ export const handleImg = (img) => {
};
export function renderChecksum(files: { path: string, hash?: string }[]) {
return files.map(({path, hash}) => `${hash || ''} ${path}`).join('\n');
return files.map(({ path, hash }) => `${hash || ''} ${path}`).join('\n');
}
export function UploadOSS(dist: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
let child = child_process.spawn('ossutil', ['cp', '--recursive', dist, 'oss://mycard/test-release'], {stdio: 'inherit'});
let child = child_process.spawn('ossutil', ['cp', '--recursive', dist, 'oss://mycard/test-release'], { stdio: 'inherit' });
child.on('exit', (code) => {
if (code == 0) {
resolve();
......@@ -36,3 +36,49 @@ export function UploadOSS(dist: string): Promise<void> {
});
});
}
type QueueParams = {
concurrency: number;
};
export class Queue {
concurrency: number;
running: number;
queue: Array<Function>;
constructor(params: QueueParams) {
Object.assign(this, params);
this.running = 0;
this.queue = [];
}
set(args: QueueParams): Queue {
Object.assign(this, args);
return this;
}
push(task: Function): Queue {
this.queue.push(task);
return this;
}
run(task: Function): Queue {
this.queue.push(task);
this.next();
return this;
}
next() {
while (this.running < this.concurrency && this.queue.length) {
let task: Function | undefined = this.queue.shift();
if (!task) {
return;
}
task(this, () => {
this.running--;
this.next();
});
this.running++;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment