Commit 99382700 authored by nanahira's avatar nanahira

fix lock connection stack

parent 4b8ca8c3
import { BaseDriver } from '../base-driver'; import { BaseDriver } from '../base-driver';
import Redis from 'ioredis'; import Redis from 'ioredis';
import Redlock from '@nanahira/redlock'; import Redlock from '@nanahira/redlock';
import { RedisDriverOptions } from '../def'; import { Awaitable, RedisDriverOptions } from '../def';
import { createPool } from 'generic-pool'; import { createPool } from 'generic-pool';
export class RedisDriver extends BaseDriver { export class RedisDriver extends BaseDriver {
...@@ -16,6 +16,15 @@ export class RedisDriver extends BaseDriver { ...@@ -16,6 +16,15 @@ export class RedisDriver extends BaseDriver {
return redis; return redis;
} }
async useTempRedisClient<T>(cb: (redis: Redis) => Awaitable<T>) {
const redis = await this.createRedisClient();
try {
return await cb(redis);
} finally {
await redis.quit();
}
}
private pool = createPool({ private pool = createPool({
create: async () => { create: async () => {
const redis = await this.createRedisClient(); const redis = await this.createRedisClient();
...@@ -86,13 +95,14 @@ export class RedisDriver extends BaseDriver { ...@@ -86,13 +95,14 @@ export class RedisDriver extends BaseDriver {
} }
override async lock<R>(keys: string[], cb: () => Promise<R>): Promise<R> { override async lock<R>(keys: string[], cb: () => Promise<R>): Promise<R> {
const redis = await this.createRedisClient(); return this.useTempRedisClient(async (redis) => {
const redlock = new Redlock([redis], this.options.lock); const redlock = new Redlock([redis], this.options.lock);
return redlock.using( return redlock.using(
keys.map((key) => `${this.options.lock?.prefix || '_lock'}:${key}`), keys.map((key) => `${this.options.lock?.prefix || '_lock'}:${key}`),
this.options.lock?.duration || 5000, this.options.lock?.duration || 5000,
cb, cb,
); );
});
} }
override async isFree(keys: string[]): Promise<boolean> { override async isFree(keys: string[]): Promise<boolean> {
...@@ -154,7 +164,7 @@ export class RedisDriver extends BaseDriver { ...@@ -154,7 +164,7 @@ export class RedisDriver extends BaseDriver {
override async queueGatherBlocking(key: string): Promise<Buffer> { override async queueGatherBlocking(key: string): Promise<Buffer> {
if (this.quitted) return; if (this.quitted) return;
const _key = this.getQueueKey(key); const _key = this.getQueueKey(key);
const redisClient = await this.createRedisClient(); return this.useTempRedisClient(async (redisClient) => {
try { try {
const valueProm = redisClient.brpopBuffer(_key, 0); const valueProm = redisClient.brpopBuffer(_key, 0);
const exitProm = new Promise<void>((resolve) => { const exitProm = new Promise<void>((resolve) => {
...@@ -168,9 +178,8 @@ export class RedisDriver extends BaseDriver { ...@@ -168,9 +178,8 @@ export class RedisDriver extends BaseDriver {
} catch (e) { } catch (e) {
//console.log(e); //console.log(e);
return await this.queueGatherBlocking(key); return await this.queueGatherBlocking(key);
} finally {
await redisClient.quit();
} }
});
} }
async queueClear(key: string): Promise<void> { async queueClear(key: string): Promise<void> {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment