Commit b1a5e871 authored by nanahira's avatar nanahira

abortable

parent 111c5625
......@@ -2,3 +2,4 @@ export * from './src/workflow';
export * from './src/dual-object';
export * from './src/workflow-dispatcher';
export * from './src/round-robin';
export * from './src/abortable';
export class AbortedError extends Error {
name = 'AbortError';
constructor(
msg = 'Operation aborted',
public cause?: unknown,
) {
super(msg);
}
}
type AnyFunc = (...args: any[]) => any;
const isObjectLike = (v: unknown): v is object | AnyFunc =>
(typeof v === 'object' && v !== null) || typeof v === 'function';
function boxPrimitive(v: unknown): object {
return Object(v as any);
}
export interface AbortableOpts {
/** 是否把原始值装箱后再代理(会改变 typeof/=== 语义,默认 false) */
boxPrimitives?: boolean;
/** 不递归代理子对象(仅代理顶层对象/函数,默认 false) */
noRecursive?: boolean;
}
export function abortable<T>(
obj: T,
signal: AbortSignal,
opts?: AbortableOpts,
): T {
const { boxPrimitives = false, noRecursive = false } = opts ?? {};
const throwIfAborted = () => {
if (signal.aborted) {
const r = (signal as any).reason;
if (r instanceof Error) throw r;
throw new AbortedError('Operation aborted', r);
}
};
throwIfAborted();
const targetToProxy = new WeakMap<object, any>();
const proxyToTarget = new WeakMap<object, any>();
const wrapNativePromise = <V>(pLike: PromiseLike<V>): Promise<V> => {
const base = pLike instanceof Promise ? pLike : Promise.resolve(pLike);
return base.then(
async (v) => {
throwIfAborted();
return proxify(v);
},
async (e) => {
throwIfAborted();
throw e;
},
);
};
let shortCircuit = false;
const proxify = <V>(value: V): V => {
if (noRecursive) {
if (shortCircuit) {
return value;
} else {
shortCircuit = true;
}
}
// Promise:按“真 Promise + 代理”的方案处理
if (typeof (value as any).then === 'function') {
return wrapNativePromise(value as any) as unknown as V;
}
// 原始值
if (!isObjectLike(value)) {
if (!boxPrimitives) return value;
return proxify(boxPrimitive(value) as unknown as V);
}
// 复用
const cached = targetToProxy.get(value as object);
if (cached) return cached;
const handler: ProxyHandler<any> = {
get(target, p, receiver) {
throwIfAborted();
const out = Reflect.get(target, p, receiver);
return proxify(out);
},
set(target, p, val, receiver) {
throwIfAborted();
return Reflect.set(target, p, val, receiver);
},
apply(target, thisArg, argArray) {
throwIfAborted();
const unwrappedThis =
(isObjectLike(thisArg) && proxyToTarget.get(thisArg as any)) ||
thisArg;
const ret = Reflect.apply(target as AnyFunc, unwrappedThis, argArray);
return proxify(ret);
},
construct(target, argArray, newTarget) {
throwIfAborted();
const instance = Reflect.construct(
target as AnyFunc,
argArray,
newTarget,
);
return proxify(instance);
},
defineProperty(t, p, attrs) {
throwIfAborted();
return Reflect.defineProperty(t, p, attrs);
},
deleteProperty(t, p) {
throwIfAborted();
return Reflect.deleteProperty(t, p);
},
getOwnPropertyDescriptor(t, p) {
throwIfAborted();
return Reflect.getOwnPropertyDescriptor(t, p);
},
getPrototypeOf(t) {
throwIfAborted();
return Reflect.getPrototypeOf(t);
},
setPrototypeOf(t, proto) {
throwIfAborted();
return Reflect.setPrototypeOf(t, proto);
},
has(t, p) {
throwIfAborted();
return Reflect.has(t, p);
},
isExtensible(t) {
throwIfAborted();
return Reflect.isExtensible(t);
},
ownKeys(t) {
throwIfAborted();
return Reflect.ownKeys(t);
},
preventExtensions(t) {
throwIfAborted();
return Reflect.preventExtensions(t);
},
};
const proxy = new Proxy(value as any, handler);
targetToProxy.set(value as object, proxy);
proxyToTarget.set(proxy, value);
return proxy;
};
return proxify(obj);
}
// aborted.promise.spec.ts
import { abortable, AbortedError } from '../src/abortable';
const delay = (ms: number) => new Promise<void>((r) => setTimeout(r, ms));
describe('abortable<T>() - 属性/方法同步 throw + Promise 用 IIFE .rejects 断言', () => {
test('属性读取:未 abort 正常;abort 后访问同步 throw', () => {
const ac = new AbortController();
const o = abortable({ a: 1, b: { c: 2 } }, ac.signal);
// 未 abort:OK
expect(o.a).toBe(1);
expect(o.b.c).toBe(2);
// abort 后:访问即抛
ac.abort('stop');
expect(() => {
void o.a;
}).toThrow(AbortedError);
expect(() => {
const _ = 'a' in o;
}).toThrow(AbortedError);
expect(() => {
Object.keys(o);
}).toThrow(AbortedError);
});
test('方法调用:未 abort 正常;abort 后读取/调用同步 throw(含 this)', () => {
const ac = new AbortController();
class Counter {
n = 0;
inc() {
this.n++;
return this.n;
}
}
const c = abortable(new Counter(), ac.signal);
// 未 abort:方法可用且 this 正确
expect(c.inc()).toBe(1);
expect(c.n).toBe(1);
// abort:连“取到方法引用/调用”都应同步抛
ac.abort('cut');
expect(() => {
const _ = (c as any).inc;
}).toThrow(AbortedError);
expect(() => {
(c as any).inc();
}).toThrow(AbortedError);
});
test('noRecursive 选项:仅顶层属性/方法受 abort 影响,子对象不受影响', () => {
const ac = new AbortController();
const base = {
x: 10,
child: {
y: 20,
getY() {
return this.y;
},
},
getX() {
return this.x;
},
};
const o = abortable(base, ac.signal, { noRecursive: true });
// 未 abort:正常
expect(o.x).toBe(10);
expect(o.getX()).toBe(10);
expect(o.child.y).toBe(20);
expect(o.child.getY()).toBe(20);
const childRef = o.child;
// abort 后:顶层受影响,子对象不受影响
ac.abort('now');
expect(() => {
void o.x;
}).toThrow(AbortedError);
expect(() => {
o.getX();
}).toThrow(AbortedError);
expect(() => {
void o.child.y;
}).toThrow(AbortedError);
expect(() => {
o.child.getY();
}).toThrow(AbortedError);
// 子对象仍然可用
expect(childRef.y).toBe(20);
expect(childRef.getY()).toBe(20);
});
describe('Promise 场景(统一用 IIFE 包装后的 .rejects)', () => {
test('未 abort:await o.p 与 await o.fn() 都 resolve', async () => {
const ac = new AbortController();
const base = {
// 属性 Promise
p: delay(10).then(() => 42),
// 方法返回 Promise
fn: async () => {
await delay(10);
return 7;
},
};
const o = abortable(base, ac.signal);
await expect((async () => o.p)()).resolves.toBe(42);
await expect((async () => o.fn())()).resolves.toBe(7);
});
test('已 abort 再访问:o.p / o.fn() 都是 rejected(访问到就立即得到一个拒绝的真 Promise)', async () => {
const ac = new AbortController();
const base = {
p: delay(10).then(() => 1),
fn: async () => {
await delay(10);
return 2;
},
};
const o = abortable(base, ac.signal);
ac.abort('already');
await expect((async () => o.p)()).rejects.toBeInstanceOf(AbortedError);
await expect((async () => o.fn())()).rejects.toBeInstanceOf(AbortedError);
});
test('先拿到 Promise,再 abort:随后 await/then/catch/finally 全部 rejected', async () => {
const ac = new AbortController();
const base = {
p: delay(50).then(() => 10),
fn: async () => {
await delay(50);
return 20;
},
};
const o = abortable(base, ac.signal);
// 先“成功取得” Promise(此时未 abort)
const p = o.p;
const q = o.fn();
// 中途再 abort
ac.abort('cut-chain');
await expect((async () => await p)()).rejects.toBeInstanceOf(
AbortedError,
);
await expect((async () => await q)()).rejects.toBeInstanceOf(
AbortedError,
);
await expect(
(async () => (p as any).then((v) => v))(),
).rejects.toBeInstanceOf(AbortedError);
await expect(
(async () => (q as any).then((v) => v * 2))(),
).rejects.toBeInstanceOf(AbortedError);
});
test('属性 Promise 与 方法 Promise 的混合链:中途 abort,下一次挂接就 rejected', async () => {
const ac = new AbortController();
const o = abortable(
{
p: delay(30).then(() => 3),
fn: async () => {
await delay(30);
return 5;
},
},
ac.signal,
);
const p = o.p;
const q = o.fn();
// 稍等 1ms,模拟“promise 已创建但未 settle”
await delay(1);
ac.abort('now');
await expect(
(async () => (p as any).then((x) => x + 1))(),
).rejects.toBeInstanceOf(AbortedError);
await expect(
(async () => (q as any).then((x) => x + 1))(),
).rejects.toBeInstanceOf(AbortedError);
});
});
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment