Commit a0cb2939 authored by nanahira's avatar nanahira

support queue ack

parent 90c2f2f2
...@@ -311,4 +311,31 @@ export class Aragami { ...@@ -311,4 +311,31 @@ export class Aragami {
const baseKey = this.getBaseKey(prototype); const baseKey = this.getBaseKey(prototype);
await this.driver.queueClear(baseKey + ':' + key); await this.driver.queueClear(baseKey + ':' + key);
} }
async queueResumeAll<T>(
prototype: ClassType<T>,
key = 'default',
prior?: boolean,
) {
const baseKey = this.getBaseKey(prototype);
await this.driver.queueResumeAll(baseKey + ':' + key, prior);
}
async runQueueOnce<T, R>(
prototype: ClassType<T>,
cb: (item: T) => Awaitable<R>,
key = 'default',
) {
const baseKey = this.getBaseKey(prototype);
const buffer = await this.driver.queueGatherBlocking(baseKey + ':' + key);
const object = this.decode(prototype, buffer);
try {
const res = await cb(object);
await this.driver.queueAck(baseKey + ':' + key, buffer);
return res;
} catch (e) {
await this.driver.queueResume(baseKey + ':' + key, buffer, true);
throw e;
}
}
} }
...@@ -214,9 +214,9 @@ export class RedisDriver extends BaseDriver { ...@@ -214,9 +214,9 @@ export class RedisDriver extends BaseDriver {
await this.pool.use(async (r) => { await this.pool.use(async (r) => {
if (!(await r.redis.lrem(backupKey, 0, value))) return; if (!(await r.redis.lrem(backupKey, 0, value))) return;
if (prior) { if (prior) {
await r.redis.rpush(_key, value);
} else {
await r.redis.lpush(_key, value); await r.redis.lpush(_key, value);
} else {
await r.redis.rpush(_key, value);
} }
}); });
} }
...@@ -227,6 +227,14 @@ export class RedisDriver extends BaseDriver { ...@@ -227,6 +227,14 @@ export class RedisDriver extends BaseDriver {
const backupKey = this.getQueueBackupKey(key); const backupKey = this.getQueueBackupKey(key);
await this.pool.use(async (r) => { await this.pool.use(async (r) => {
const values = await r.redis.lrangeBuffer(backupKey, 0, -1); const values = await r.redis.lrangeBuffer(backupKey, 0, -1);
const commands = r.redis.multi().del(backupKey);
if (prior) {
values.reverse();
values.forEach((value) => commands.lpush(_key, value));
} else {
values.forEach((value) => commands.rpush(_key, value));
}
await commands.exec();
}); });
} }
......
...@@ -7,7 +7,7 @@ describe('Aragami.', () => { ...@@ -7,7 +7,7 @@ describe('Aragami.', () => {
beforeEach(() => { beforeEach(() => {
aragami = new Aragami({ aragami = new Aragami({
redis: process.env.REDIS ? { uri: 'redis://localhost:6379' } : undefined, // redis: { uri: 'redis://127.0.0.1:6379' },
}); });
}); });
...@@ -162,13 +162,41 @@ describe('Aragami.', () => { ...@@ -162,13 +162,41 @@ describe('Aragami.', () => {
const _task = await aragami.queueGather(Task); const _task = await aragami.queueGather(Task);
expect(_task.id).toBe(1); expect(_task.id).toBe(1);
await expect(aragami.queueLength(Task)).resolves.toBe(0); await expect(aragami.queueLength(Task)).resolves.toBe(0);
const taskProm = aragami.queueGatherBlocking(Task); });
console.log('before wait');
await new Promise((resolve) => setTimeout(resolve, 100)); it('should run queue with ack', async () => {
class Task {
id: number;
}
const task = new Task();
task.id = 1;
await aragami.queueClear(Task);
await expect(aragami.queueLength(Task)).resolves.toBe(0);
await aragami.queueAdd(task); await aragami.queueAdd(task);
await new Promise((resolve) => setTimeout(resolve, 100)); await expect(aragami.queueLength(Task)).resolves.toBe(1);
console.log('wait'); const _task = await aragami.runQueueOnce(Task, async (task) => {
const _task2 = await taskProm; expect(task.id).toBe(1);
return task;
});
expect(_task.id).toBe(1);
await expect(aragami.queueLength(Task)).resolves.toBe(0);
// test with throw
await aragami.queueAdd(task);
await expect(aragami.queueLength(Task)).resolves.toBe(1);
await expect(
aragami.runQueueOnce(Task, async (task) => {
throw new Error('test');
}),
).rejects.toThrow('test');
await expect(aragami.queueLength(Task)).resolves.toBe(1);
const _task2 = await aragami.runQueueOnce(Task, async (task) => {
expect(task.id).toBe(1);
return task;
});
expect(_task2.id).toBe(1); expect(_task2.id).toBe(1);
await expect(aragami.queueLength(Task)).resolves.toBe(0);
}); });
}); });
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment