Commit e4234270 authored by nanahira's avatar nanahira

update queue

parent 4f02a00c
...@@ -23,9 +23,9 @@ ...@@ -23,9 +23,9 @@
"typescript": "^4.8.3" "typescript": "^4.8.3"
}, },
"peerDependencies": { "peerDependencies": {
"@nestjs/common": "^9.1.2", "@nestjs/common": "^9.1.2 || ^10.0.0",
"@nestjs/core": "^9.4.2", "@nestjs/core": "^9.4.2 || ^10.0.0",
"aragami": "^1.1.8", "aragami": "^1.2.0",
"rxjs": "^7.8.1" "rxjs": "^7.8.1"
} }
}, },
...@@ -1092,9 +1092,9 @@ ...@@ -1092,9 +1092,9 @@
} }
}, },
"node_modules/@nanahira/redlock": { "node_modules/@nanahira/redlock": {
"version": "1.0.0", "version": "1.0.1",
"resolved": "https://registry.npmjs.org/@nanahira/redlock/-/redlock-1.0.0.tgz", "resolved": "https://registry.npmjs.org/@nanahira/redlock/-/redlock-1.0.1.tgz",
"integrity": "sha512-kf48X8tSbZm1DQT3Aj3B/Fza+L9toi5+j47BX9WJ9hIqyuCDRWMC2wQOJB1XAcOFZ+FGrfAjTa21bdTEuVlYeQ==", "integrity": "sha512-gpHYiz0P57SJDNsgr/sh/1Qn12fbdfVx1AlWWbfd56M0PWPzewqwJhY3U6wInvxC0yX6ynTFioqnvJ8vNGJIDw==",
"peer": true, "peer": true,
"dependencies": { "dependencies": {
"node-abort-controller": "^3.0.1" "node-abort-controller": "^3.0.1"
...@@ -1653,12 +1653,12 @@ ...@@ -1653,12 +1653,12 @@
} }
}, },
"node_modules/aragami": { "node_modules/aragami": {
"version": "1.1.8", "version": "1.2.0",
"resolved": "https://registry.npmjs.org/aragami/-/aragami-1.1.8.tgz", "resolved": "https://registry.npmjs.org/aragami/-/aragami-1.2.0.tgz",
"integrity": "sha512-kHwOawwEFfEyxAt6/dRm5YORgIfFtZl2b/TFoZx0LxQzP3uqNgf1tkbfjLEVi5DI+nRAe6iDd+43bn1c5ZNXCg==", "integrity": "sha512-uTRn34cF8jpmxsjjbDCIlbpERJuZmAcGZHfasFEfV1mt/zDL+pEqheZ9j18VCt0yOlJTf9VUHBY7xES6Zyv3ag==",
"peer": true, "peer": true,
"dependencies": { "dependencies": {
"@nanahira/redlock": "^1.0.0", "@nanahira/redlock": "^1.0.1",
"better-lock": "^2.0.3", "better-lock": "^2.0.3",
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"encoded-buffer": "^0.2.6", "encoded-buffer": "^0.2.6",
...@@ -3915,9 +3915,9 @@ ...@@ -3915,9 +3915,9 @@
"dev": true "dev": true
}, },
"node_modules/node-abort-controller": { "node_modules/node-abort-controller": {
"version": "3.0.1", "version": "3.1.1",
"resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.0.1.tgz", "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz",
"integrity": "sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw==", "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==",
"peer": true "peer": true
}, },
"node_modules/node-fetch": { "node_modules/node-fetch": {
...@@ -5990,9 +5990,9 @@ ...@@ -5990,9 +5990,9 @@
"peer": true "peer": true
}, },
"@nanahira/redlock": { "@nanahira/redlock": {
"version": "1.0.0", "version": "1.0.1",
"resolved": "https://registry.npmjs.org/@nanahira/redlock/-/redlock-1.0.0.tgz", "resolved": "https://registry.npmjs.org/@nanahira/redlock/-/redlock-1.0.1.tgz",
"integrity": "sha512-kf48X8tSbZm1DQT3Aj3B/Fza+L9toi5+j47BX9WJ9hIqyuCDRWMC2wQOJB1XAcOFZ+FGrfAjTa21bdTEuVlYeQ==", "integrity": "sha512-gpHYiz0P57SJDNsgr/sh/1Qn12fbdfVx1AlWWbfd56M0PWPzewqwJhY3U6wInvxC0yX6ynTFioqnvJ8vNGJIDw==",
"peer": true, "peer": true,
"requires": { "requires": {
"node-abort-controller": "^3.0.1" "node-abort-controller": "^3.0.1"
...@@ -6379,12 +6379,12 @@ ...@@ -6379,12 +6379,12 @@
} }
}, },
"aragami": { "aragami": {
"version": "1.1.8", "version": "1.2.0",
"resolved": "https://registry.npmjs.org/aragami/-/aragami-1.1.8.tgz", "resolved": "https://registry.npmjs.org/aragami/-/aragami-1.2.0.tgz",
"integrity": "sha512-kHwOawwEFfEyxAt6/dRm5YORgIfFtZl2b/TFoZx0LxQzP3uqNgf1tkbfjLEVi5DI+nRAe6iDd+43bn1c5ZNXCg==", "integrity": "sha512-uTRn34cF8jpmxsjjbDCIlbpERJuZmAcGZHfasFEfV1mt/zDL+pEqheZ9j18VCt0yOlJTf9VUHBY7xES6Zyv3ag==",
"peer": true, "peer": true,
"requires": { "requires": {
"@nanahira/redlock": "^1.0.0", "@nanahira/redlock": "^1.0.1",
"better-lock": "^2.0.3", "better-lock": "^2.0.3",
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"encoded-buffer": "^0.2.6", "encoded-buffer": "^0.2.6",
...@@ -8103,9 +8103,9 @@ ...@@ -8103,9 +8103,9 @@
"dev": true "dev": true
}, },
"node-abort-controller": { "node-abort-controller": {
"version": "3.0.1", "version": "3.1.1",
"resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.0.1.tgz", "resolved": "https://registry.npmjs.org/node-abort-controller/-/node-abort-controller-3.1.1.tgz",
"integrity": "sha512-/ujIVxthRs+7q6hsdjHMaj8hRG9NuWmwrz+JdRwZ14jdFoKSkm+vDsCbF9PLpnSqjaWQJuTmVtcWHNLr+vrOFw==", "integrity": "sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==",
"peer": true "peer": true
}, },
"node-fetch": { "node-fetch": {
......
...@@ -55,7 +55,7 @@ ...@@ -55,7 +55,7 @@
"peerDependencies": { "peerDependencies": {
"@nestjs/common": "^9.1.2 || ^10.0.0", "@nestjs/common": "^9.1.2 || ^10.0.0",
"@nestjs/core": "^9.4.2 || ^10.0.0", "@nestjs/core": "^9.4.2 || ^10.0.0",
"aragami": "^1.1.8", "aragami": "^1.2.0",
"rxjs": "^7.8.1" "rxjs": "^7.8.1"
} }
} }
...@@ -24,6 +24,11 @@ export class _QueueRunner<T> { ...@@ -24,6 +24,11 @@ export class _QueueRunner<T> {
aragami: Aragami; aragami: Aragami;
async onApplicationBootstrap() { async onApplicationBootstrap() {
await this.aragami.queueResumeAll(
this._queueClass,
this._queueRunnerOptions.key,
true,
);
if (this._queueRunnerOptions.logTask) { if (this._queueRunnerOptions.logTask) {
this.logger.log(`Starting ${this._maxConcurrency} workers`); this.logger.log(`Starting ${this._maxConcurrency} workers`);
} }
...@@ -52,33 +57,35 @@ export class _QueueRunner<T> { ...@@ -52,33 +57,35 @@ export class _QueueRunner<T> {
while (!this._quit) { while (!this._quit) {
logger.log('Looping'); logger.log('Looping');
try { try {
const task: T = await this.aragami.queueGatherBlocking( await this.aragami.runQueueOnce(
this._queueClass, this._queueClass,
async (task) => {
if (this._queueRunnerOptions.logTask) {
logger.log(`Got task: ${JSON.stringify(task)}`);
}
try {
await this.runTask(task, i);
} catch (e) {
logger.error(`Task failed: ${e}`);
if (
this._queueRunnerOptions.retry &&
(!task['__retry'] ||
task['__retry'] < this._queueRunnerOptions.retry)
) {
logger.log(`Retrying task: ${JSON.stringify(task)}`);
if (this._queueRunnerOptions.retry > 0) {
task['__retry'] = (task['__retry'] || 0) + 1;
}
await this.aragami.queueAdd(task, {
key: this._queueKey,
prior: true,
});
}
}
},
this._queueKey, this._queueKey,
); );
if (!task) continue; //
if (this._queueRunnerOptions.logTask) {
logger.log(`Got task: ${JSON.stringify(task)}`);
}
try {
await this.runTask(task, i);
} catch (e) {
logger.error(`Task failed: ${e}`);
if (
this._queueRunnerOptions.retry &&
(!task['__retry'] ||
task['__retry'] < this._queueRunnerOptions.retry)
) {
logger.log(`Retrying task: ${JSON.stringify(task)}`);
if (this._queueRunnerOptions.retry > 0) {
task['__retry'] = (task['__retry'] || 0) + 1;
}
await this.aragami.queueAdd(task, {
key: this._queueKey,
prior: true,
});
}
}
} catch (e) { } catch (e) {
logger.error(`Loop failed: ${e}`); logger.error(`Loop failed: ${e}`);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment