Commit f990309b authored by nanahira's avatar nanahira

fix queues

parent 2037f749
...@@ -140,8 +140,7 @@ export class RedisDriver extends BaseDriver { ...@@ -140,8 +140,7 @@ export class RedisDriver extends BaseDriver {
async queueItems(key: string): Promise<Buffer[]> { async queueItems(key: string): Promise<Buffer[]> {
const _key = this.getQueueKey(key); const _key = this.getQueueKey(key);
const items = await this.pool.use((r) => r.redis.lrangeBuffer(_key, 0, -1)); return this.pool.use((r) => r.redis.lrangeBuffer(_key, 0, -1));
return items.reverse();
} }
override async queueAdd( override async queueAdd(
...@@ -152,16 +151,16 @@ export class RedisDriver extends BaseDriver { ...@@ -152,16 +151,16 @@ export class RedisDriver extends BaseDriver {
const _key = this.getQueueKey(key); const _key = this.getQueueKey(key);
await this.pool.use(async (r) => { await this.pool.use(async (r) => {
if (prior) { if (prior) {
await r.redis.lpush(_key, value);
} else {
await r.redis.rpush(_key, value); await r.redis.rpush(_key, value);
} else {
await r.redis.lpush(_key, value);
} }
}); });
} }
override async queueGather(key: string): Promise<Buffer> { override async queueGather(key: string): Promise<Buffer> {
const _key = this.getQueueKey(key); const _key = this.getQueueKey(key);
const value = await this.pool.use((r) => r.redis.rpopBuffer(_key)); const value = await this.pool.use((r) => r.redis.lpopBuffer(_key));
return value || undefined; return value || undefined;
} }
...@@ -173,22 +172,18 @@ export class RedisDriver extends BaseDriver { ...@@ -173,22 +172,18 @@ export class RedisDriver extends BaseDriver {
override async queueGatherBlocking(key: string): Promise<Buffer> { override async queueGatherBlocking(key: string): Promise<Buffer> {
if (this.quitted) return; if (this.quitted) return;
const _key = this.getQueueKey(key); const _key = this.getQueueKey(key);
return this.useTempRedisClient(async (redisClient) => { const res = await this.useTempRedisClient(async (redisClient) => {
try { try {
const valueProm = redisClient.brpopBuffer(_key, 0); const valueProm = redisClient.blpopBuffer(_key, 0);
const exitProm = new Promise<void>((resolve) => { const exitProm = new Promise<void>((resolve) => {
this.waitingBlockingProms.set(valueProm, resolve); this.waitingBlockingProms.set(valueProm, resolve);
}); });
const value = await Promise.race([valueProm, exitProm]); const value = await Promise.race([valueProm, exitProm]);
this.waitingBlockingProms.delete(valueProm); this.waitingBlockingProms.delete(valueProm);
if (value) return value?.[1]; if (value) return value?.[1];
//console.log('wait2'); } catch (e) {}
return await this.queueGatherBlocking(key);
} catch (e) {
//console.log(e);
return await this.queueGatherBlocking(key);
}
}); });
return res || this.queueGatherBlocking(key);
} }
async queueClear(key: string): Promise<void> { async queueClear(key: string): Promise<void> {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment