Commit 87941503 authored by nanahira's avatar nanahira

add WorkflowDispatcher

parent 5d66c637
export * from './src/workflow';
export * from './src/dual-object';
\ No newline at end of file
export * from './src/dual-object';
export * from './src/workflow-dispatcher';
export interface DispatcherOptions {
/** Max attempts per task. Default 3. */
maxAttempts?: number;
/** Backoff base in ms. backoff = base * 2^failCount. Default 1000. */
backoffBaseMs?: number;
}
// Internal task record
type Task<F extends (...args: any[]) => Promise<any>> = {
args: Parameters<F>;
resolve: (v: Awaited<ReturnType<F>>) => void;
reject: (e: any) => void;
attempts: number;
lastError?: any;
triedWorkers: Set<number>; // for global dispatch
specificWorkerIndex?: number; // for dispatchSpecific only
};
type WorkerState = {
running: boolean;
failCount: number;
nextAvailableAt: number;
totalRuns: number;
};
type RemovalWaiter = () => void;
type ActiveSlot<F extends (...args: any[]) => Promise<any>> = {
kind: 'active';
fn: F;
state: WorkerState;
queue: Task<F>[];
removalWaiters: RemovalWaiter[];
removed?: boolean; // marker when removed while running; retained for completion callback
};
type PendingSlot<F extends (...args: any[]) => Promise<any>> = {
kind: 'pending';
promise: Promise<F>;
queue: Task<F>[];
removalWaiters: RemovalWaiter[];
error?: any;
removed?: boolean; // if removed before resolved
};
type RejectedSlot<F extends (...args: any[]) => Promise<any>> = {
kind: 'rejected';
error: any;
queue: Task<F>[];
removalWaiters: RemovalWaiter[];
removed?: boolean;
};
type Slot<F extends (...args: any[]) => Promise<any>> =
| ActiveSlot<F>
| PendingSlot<F>
| RejectedSlot<F>;
export type WorkerSnapshot<F extends (...args: any[]) => Promise<any>> =
| {
index: number;
status: 'active';
fn: F;
running: boolean;
failCount: number;
totalRuns: number;
blockedMs: number;
specificQueue: number;
}
| {
index: number;
status: 'pending';
promise: Promise<F>;
specificQueue: number;
}
| {
index: number;
status: 'rejected';
error: string;
specificQueue: number;
};
export class WorkflowDispatcher<F extends (...args: any[]) => Promise<any>> {
private readonly maxAttempts: number;
private readonly backoffBaseMs: number;
private readonly slots: Slot<F>[] = [];
private readonly globalQueue: Task<F>[] = [];
private pendingInits = 0;
private everActivated = false;
private drainScheduled = false;
constructor(
workersOrPromises: Array<F | Promise<F>>,
options: DispatcherOptions = {},
) {
if (!workersOrPromises?.length) throw new Error('workers cannot be empty');
this.maxAttempts = options.maxAttempts ?? 3;
this.backoffBaseMs = options.backoffBaseMs ?? 1000;
for (let i = 0; i < workersOrPromises.length; i++) {
const w = workersOrPromises[i];
if (typeof w === 'function') {
const slot: ActiveSlot<F> = {
kind: 'active',
fn: w,
state: {
running: false,
failCount: 0,
nextAvailableAt: 0,
totalRuns: 0,
},
queue: [],
removalWaiters: [],
};
this.slots.push(slot);
this.everActivated = true;
} else {
// Create a stable slot object and mutate it in-place on resolve/reject.
this.pendingInits++;
const slot: PendingSlot<F> = {
kind: 'pending',
promise: Promise.resolve(w),
queue: [],
removalWaiters: [],
};
this.slots.push(slot);
slot.promise
.then((fn) => {
if (slot.removed) return; // was removed; ignore resolution
// mutate in-place to active
(slot as any).kind = 'active';
(slot as any).fn = fn;
(slot as any).state = {
running: false,
failCount: 0,
nextAvailableAt: 0,
totalRuns: 0,
};
this.everActivated = true;
// keep queue & removalWaiters arrays as-is
})
.catch((err) => {
if (slot.removed) return; // was removed; ignore
// mutate in-place to rejected (keep queue/waiters)
(slot as any).kind = 'rejected';
(slot as any).error = err;
})
.finally(() => {
this.pendingInits--;
this.drain();
if (this.pendingInits === 0 && !this.hasAnyActive()) {
const err = new Error(
'No workers available (all failed to initialize).',
);
setTimeout(() => this.rejectAllQueued(err), 0);
}
});
}
}
if (this.everActivated) this.drain();
}
/** Dispatch: choose eligible active with least totalRuns; retry across workers on failure. */
dispatch(...args: Parameters<F>): Promise<Awaited<ReturnType<F>>> {
return new Promise((resolve, reject) => {
const task: Task<F> = {
args,
resolve,
reject,
attempts: 0,
triedWorkers: new Set(),
};
this.globalQueue.push(task);
this.drain();
});
}
/** Dispatch to a specific worker (ignore backoff), wait until it is free; retry on the same worker. */
dispatchSpecific(
index: number,
...args: Parameters<F>
): Promise<Awaited<ReturnType<F>>> {
if (index < 0 || index >= this.slots.length) {
return Promise.reject(new Error(`worker index out of range: ${index}`));
}
return new Promise((resolve, reject) => {
const task: Task<F> = {
args,
resolve,
reject,
attempts: 0,
triedWorkers: new Set(),
specificWorkerIndex: index,
};
const slot = this.slots[index];
slot.queue.push(task);
this.drain();
});
}
/** Replace a worker at index with a new active worker function. */
public replaceWorker(index: number, fn: F): void {
if (index < 0 || index >= this.slots.length) {
throw new Error(`worker index out of range: ${index}`);
}
const prevHadActive = this.hasAnyActive();
const slot = this.slots[index];
// Preserve queue & removal waiters; reset failure/backoff; keep totalRuns to avoid skew.
const preservedQueue = slot.queue;
const preservedWaiters = slot.removalWaiters ?? [];
const next: ActiveSlot<F> = {
kind: 'active',
fn,
state: {
running: false,
failCount: 0,
nextAvailableAt: 0,
totalRuns: (slot as any).state?.totalRuns ?? 0,
},
queue: preservedQueue,
removalWaiters: preservedWaiters,
};
// Mutate in-place if possible (keeps references stable), else replace array entry.
Object.assign(slot as any, next);
(slot as any).kind = 'active';
(slot as any).fn = fn;
(slot as any).state.failCount = 0;
(slot as any).state.nextAvailableAt = 0;
this.everActivated = true;
if (!prevHadActive && this.hasAnyActive()) {
this.drain();
} else {
this.drain();
}
}
/** Add a new active worker at the tail; return its index. */
public addWorker(fn: F): number {
const slot: ActiveSlot<F> = {
kind: 'active',
fn,
state: { running: false, failCount: 0, nextAvailableAt: 0, totalRuns: 0 },
queue: [],
removalWaiters: [],
};
const index = this.slots.length;
this.slots.push(slot);
this.everActivated = true;
this.drain();
return index;
}
/**
* Remove a worker completely (splice). It becomes unavailable immediately.
* Returns a Promise that resolves when its last running task (if any) finishes.
*/
public removeWorker(index: number): Promise<void> {
if (index < 0 || index >= this.slots.length) {
return Promise.reject(new Error(`worker index out of range: ${index}`));
}
const slot = this.slots[index];
// Reject all queued specific tasks on this worker (macro-task to avoid unhandled)
const queued = slot.queue.splice(0);
const removalErr = new Error(`Worker[${index}] removed`);
setTimeout(() => {
for (const t of queued) t.reject(removalErr);
}, 0);
// Decide completion promise:
let completion: Promise<void>;
const isRunning = slot.kind === 'active' && slot.state.running;
if (!isRunning) {
completion = Promise.resolve();
} else {
completion = new Promise<void>((resolve) => {
slot.removalWaiters.push(resolve);
});
}
// Mark as removed (so any pending init resolution is ignored)
(slot as any).removed = true;
// Physically remove the slot
this.slots.splice(index, 1);
// Re-map indices in all remaining tasks:
// 1) Fix specificWorkerIndex in every remaining slot.queue
for (let i = 0; i < this.slots.length; i++) {
const s = this.slots[i];
for (const t of s.queue) {
if (typeof t.specificWorkerIndex === 'number') {
if (t.specificWorkerIndex === index) {
// This should not happen because we just removed and flushed its queue,
// but guard anyway.
t.reject(new Error(`Worker[${index}] no longer exists`));
} else if (t.specificWorkerIndex > index) {
t.specificWorkerIndex -= 1;
}
}
}
}
// 2) Fix triedWorkers sets in globalQueue
for (const t of this.globalQueue) {
if (t.triedWorkers.has(index)) t.triedWorkers.delete(index);
const next = new Set<number>();
for (const w of t.triedWorkers) {
next.add(w > index ? w - 1 : w);
}
t.triedWorkers = next;
}
// Trigger scheduling for the remaining system
this.drain();
return completion;
}
snapshot(): WorkerSnapshot<F>[] {
const now = Date.now();
return this.slots.map((slot, i) => {
switch (slot.kind) {
case 'active': {
const s = slot.state;
return {
index: i,
status: 'active' as const,
fn: slot.fn,
running: s.running,
failCount: s.failCount,
totalRuns: s.totalRuns,
blockedMs: Math.max(0, s.nextAvailableAt - now),
specificQueue: slot.queue.length,
};
}
case 'pending':
return {
index: i,
status: 'pending' as const,
promise: slot.promise,
specificQueue: slot.queue.length,
};
case 'rejected':
return {
index: i,
status: 'rejected' as const,
error: String(slot.error ?? 'unknown error'),
specificQueue: slot.queue.length,
};
}
});
}
get pending(): number {
return this.globalQueue.length;
}
// ---------------- scheduling ----------------
private drain() {
if (this.drainScheduled) return;
this.drainScheduled = true;
queueMicrotask(() => {
this.drainScheduled = false;
this._drainLoop();
});
}
private _drainLoop() {
// If no active workers and still initializing, wait; if all inited and none active, constructor already rejects all.
if (!this.hasAnyActive()) {
if (this.pendingInits > 0) return;
return;
}
// First: flush rejected workers' specific queues (macro-task rejection)
for (let i = 0; i < this.slots.length; i++) {
const slot = this.slots[i];
if (slot.kind === 'rejected' && slot.queue.length > 0) {
const q = slot.queue.splice(0);
const err = new Error(
`Worker[${i}] failed to initialize: ${String(slot.error ?? 'unknown error')}`,
);
setTimeout(() => {
for (const t of q) t.reject(err);
}, 0);
}
}
let progressed = true;
while (progressed) {
progressed = false;
// 1) Run specific queues for active workers (ignore backoff)
for (let i = 0; i < this.slots.length; i++) {
const slot = this.slots[i];
if (slot.kind !== 'active') continue;
const st = slot.state;
if (!st.running && slot.queue.length > 0) {
const task = slot.queue.shift()!;
this.startTaskOnActiveSlot(i, slot, task, /*fromSpecific*/ true);
progressed = true;
}
}
// 2) Run global queue (choose eligible active with least totalRuns)
if (this.globalQueue.length > 0) {
const idx = this.pickBestActiveForGlobal();
if (idx !== -1) {
const slot = this.slots[idx] as ActiveSlot<F>;
const task = this.globalQueue.shift()!;
this.startTaskOnActiveSlot(idx, slot, task, /*fromSpecific*/ false);
progressed = true;
}
}
}
}
private hasAnyActive() {
return this.slots.some((s) => s.kind === 'active');
}
private rejectAllQueued(err: any) {
while (this.globalQueue.length) this.globalQueue.shift()!.reject(err);
for (const slot of this.slots) {
while (slot.queue.length) slot.queue.shift()!.reject(err);
}
}
private isEligibleActive(i: number, now: number) {
const slot = this.slots[i];
if (!slot || slot.kind !== 'active') return false;
const s = slot.state;
return !s.running && now >= s.nextAvailableAt;
// note: specific queues ignore backoff; this is only for global picks.
}
private pickBestActiveForGlobal(): number {
const now = Date.now();
const task = this.globalQueue[0];
// Prefer actives not tried yet
let best = -1,
bestRuns = Infinity;
for (let i = 0; i < this.slots.length; i++) {
if (!this.isEligibleActive(i, now)) continue;
if (task?.triedWorkers.has(i)) continue;
const slot = this.slots[i] as ActiveSlot<F>;
if (slot.state.totalRuns < bestRuns) {
bestRuns = slot.state.totalRuns;
best = i;
}
}
if (best !== -1) return best;
// Allow already-tried actives
best = -1;
bestRuns = Infinity;
for (let i = 0; i < this.slots.length; i++) {
if (!this.isEligibleActive(i, now)) continue;
const slot = this.slots[i] as ActiveSlot<F>;
if (slot.state.totalRuns < bestRuns) {
bestRuns = slot.state.totalRuns;
best = i;
}
}
return best;
}
private startTaskOnActiveSlot(
index: number,
slot: ActiveSlot<F>,
task: Task<F>,
fromSpecific: boolean,
) {
const st = slot.state;
st.running = true;
st.totalRuns += 1;
const finalize = () => {
st.running = false;
// If someone is waiting for this worker to finish (removeWorker), resolve them when idle.
if (
slot.removalWaiters.length > 0 &&
!st.running &&
slot.queue.length === 0
) {
const list = slot.removalWaiters.splice(0);
for (const w of list) w();
}
this.drain();
};
(async () => {
try {
const result = await slot.fn(...task.args);
st.failCount = Math.max(0, st.failCount - 1);
task.resolve(result as Awaited<ReturnType<F>>);
} catch (err) {
task.lastError = err;
st.failCount += 1;
st.nextAvailableAt =
Date.now() + this.backoffBaseMs * Math.pow(2, st.failCount);
if (fromSpecific) {
task.attempts += 1;
if (task.attempts >= this.maxAttempts) {
task.reject(task.lastError);
} else {
// retry on the same worker (ignore backoff)
slot.queue.push(task);
}
} else {
task.attempts += 1;
task.triedWorkers.add(index);
const activeCount = this.slots.filter(
(s) => s.kind === 'active',
).length;
const allActiveTriedOnce = task.triedWorkers.size >= activeCount;
const attemptsLimitReached = task.attempts >= this.maxAttempts;
if (allActiveTriedOnce || attemptsLimitReached) {
task.reject(task.lastError);
} else {
this.globalQueue.push(task);
}
}
} finally {
finalize();
}
})();
}
}
// __tests__/workflow-dispatcher.spec.ts
import { WorkflowDispatcher } from '../src/workflow-dispatcher';
type F = (x: number) => Promise<string>;
function makeSuccess(label: string): F {
const fn = jest.fn(async (x: number) => `${label}:${x}`);
return fn as F;
}
function makeAlwaysFail(label: string): F {
const fn = jest.fn(async () => {
throw new Error(`fail:${label}`);
});
return fn as F;
}
function makeFlaky(label: string, fails: number): F {
let c = 0;
const fn = jest.fn(async (x: number) => {
if (c < fails) {
c++;
throw new Error(`flaky-${label}-${c}`);
}
return `${label}:${x}`;
});
return fn as F;
}
function deferred<T>() {
let resolve!: (v: T) => void;
let reject!: (e: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
async function flush(n = 2) {
for (let i = 0; i < n; i++) await Promise.resolve();
}
describe('WorkflowDispatcher (10ms granularity, no fake timers)', () => {
test('waits for the first worker to resolve before scheduling', async () => {
const Adef = deferred<F>(); // pending
const B = makeSuccess('B'); // active now
const d = new WorkflowDispatcher<F>([Adef.promise, B], {
backoffBaseMs: 10,
});
const p = d.dispatch(1);
await flush();
await expect(p).resolves.toBe('B:1');
// later A becomes active, then it should be chosen (least-used)
Adef.resolve(makeSuccess('A'));
await flush();
const p2 = d.dispatch(2);
await expect(p2).resolves.toBe('A:2');
});
test('rejects all when all init promises reject', async () => {
const Adef = deferred<F>(),
Bdef = deferred<F>();
const d = new WorkflowDispatcher<F>([Adef.promise, Bdef.promise], {
backoffBaseMs: 10,
});
const p1 = d.dispatch(1);
const p2 = d.dispatch(2);
Adef.reject(new Error('A-init-fail'));
Bdef.reject(new Error('B-init-fail'));
await flush();
await expect(p1).rejects.toThrow(/No workers available/);
await expect(p2).rejects.toThrow(/No workers available/);
});
test('dispatch picks the least-used active worker', async () => {
const A = makeSuccess('A'),
B = makeSuccess('B');
const d = new WorkflowDispatcher<F>([A, B], { backoffBaseMs: 10 });
const r1 = await d.dispatch(1);
const r2 = await d.dispatch(2);
const r3 = await d.dispatch(3);
expect([r1, r2, r3].some((s) => s.startsWith('A'))).toBe(true);
expect([r1, r2, r3].some((s) => s.startsWith('B'))).toBe(true);
const actives = d.snapshot().filter((s) => s.status === 'active') as any[];
expect(actives.reduce((sum, s) => sum + s.totalRuns, 0)).toBe(3);
});
test('on failure, it switches workers and throws after all active failed once', async () => {
const A = makeAlwaysFail('A'),
B = makeSuccess('B');
const d = new WorkflowDispatcher<F>([A, B], { backoffBaseMs: 10 });
await expect(d.dispatch(10)).resolves.toBe('B:10');
const A2 = makeAlwaysFail('A2'),
B2 = makeAlwaysFail('B2');
const d2 = new WorkflowDispatcher<F>([A2, B2], { backoffBaseMs: 10 });
await expect(d2.dispatch(99)).rejects.toThrow(/fail:(A2|B2)/);
});
test('sets backoff and avoids the blocked worker while another is eligible', async () => {
const A = makeAlwaysFail('A'),
B = makeSuccess('B');
const d = new WorkflowDispatcher<F>([A, B], { backoffBaseMs: 10 });
// First dispatch may fail on A and then succeed elsewhere; we only need a fail to set backoff
try {
await d.dispatch(1);
} catch {
/* ignore */
}
const active = d.snapshot().filter((s) => s.status === 'active') as any[];
const blocked = active.find((s) => s.failCount > 0);
if (blocked) {
expect(blocked.blockedMs).toBeGreaterThanOrEqual(10 - 1); // ~10ms right after failure
const res = await d.dispatch(2);
expect(res === 'B:2' || res === 'A:2').toBe(true); // typically B since A is blocked
}
});
test('dispatchSpecific ignores backoff and retries on the same worker (FIFO)', async () => {
const flaky = makeFlaky('A', 2); // fail, fail, then success
const B = makeSuccess('B');
const d = new WorkflowDispatcher<F>([flaky, B], {
maxAttempts: 3,
backoffBaseMs: 10,
});
const p1 = d.dispatchSpecific(0, 100);
const p2 = d.dispatchSpecific(0, 200);
await expect(p1).resolves.toBe('A:100');
await expect(p2).resolves.toBe('A:200');
const snap0 = d.snapshot()[0] as any;
expect(snap0.totalRuns).toBeGreaterThanOrEqual(2);
});
test('dispatchSpecific waits for pending worker and fails if its init rejects', async () => {
const Adef = deferred<F>();
const B = makeSuccess('B');
const d = new WorkflowDispatcher<F>([Adef.promise, B], {
backoffBaseMs: 10,
});
// enqueue a specific task to worker 0 (still pending)
const p = d.dispatchSpecific(0, 1);
await flush(); // let dispatcher enqueue paths settle
// Trigger the reject *inside* the expect's promise via an async IIFE.
await expect(
(async () => {
// now reject the init; this happens after expect has attached handlers
Adef.reject(new Error('A-init-fail'));
// give the dispatcher a macrotask tick if your impl uses setTimeout(0) to reject
await new Promise((r) => setTimeout(r, 0));
// the awaited value for expect(...).rejects is p
return p;
})(),
).rejects.toThrow(/failed to initialize/i);
// Calling dispatchSpecific again on the same rejected worker should also reject
await expect(
(async () => {
const p2 = d.dispatchSpecific(0, 2);
await new Promise((r) => setTimeout(r, 0));
return p2;
})(),
).rejects.toThrow(/failed to initialize/i);
});
test('stops after reaching maxAttempts even if not all active were tried', async () => {
const A = makeAlwaysFail('A'),
B = makeAlwaysFail('B');
const d = new WorkflowDispatcher<F>([A, B], {
maxAttempts: 2,
backoffBaseMs: 10,
});
await expect(d.dispatch(3)).rejects.toThrow(/fail:(A|B)/);
});
test('failCount is decreased after a success (not below zero)', async () => {
const flaky = makeFlaky('A', 1); // one fail then success
const B = makeSuccess('B');
const d = new WorkflowDispatcher<F>([flaky, B], { backoffBaseMs: 10 });
try {
await d.dispatch(1);
} catch {}
await d.dispatchSpecific(0, 2); // succeed on A; failCount should step down
const snap0 = d.snapshot()[0] as any;
expect(snap0.failCount).toBeGreaterThanOrEqual(0);
});
});
// __tests__/workflow-dispatcher-extend.spec.ts
import { WorkflowDispatcher } from '../src/workflow-dispatcher';
type F = (x: number) => Promise<string>;
function makeSuccess(label: string): F {
const fn = jest.fn(async (x: number) => `${label}:${x}`);
return fn as F;
}
function makeAlwaysFail(label: string): F {
const fn = jest.fn(async () => {
throw new Error(`fail:${label}`);
});
return fn as F;
}
function deferred<T>() {
let resolve!: (v: T) => void;
let reject!: (e: any) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
async function flushMicro(n = 2) {
for (let i = 0; i < n; i++) await Promise.resolve();
}
async function nextMacrotask() {
await new Promise((r) => setTimeout(r, 0));
}
describe('replaceWorker()', () => {
test('replaces a pending worker to active and drains immediately if it becomes the first active', async () => {
const Adef = deferred<F>(); // slot[0] pending initially
const d = new WorkflowDispatcher<F>([Adef.promise], { backoffBaseMs: 10 });
// queue a specific task to slot 0 while pending
const p = d.dispatchSpecific(0, 1);
await flushMicro();
// replace pending with active fn; should start running and resolve
d.replaceWorker(0, makeSuccess('A'));
await flushMicro();
await expect(p).resolves.toBe('A:1');
// then a global dispatch should also use A
await expect(d.dispatch(2)).resolves.toBe('A:2');
});
test('replaces an active worker with a new fn, resets backoff but keeps totalRuns', async () => {
const bad = makeAlwaysFail('Old');
const good = makeSuccess('New');
const d = new WorkflowDispatcher<F>([bad], { backoffBaseMs: 10 });
// first dispatch will fail and be thrown (only one worker, hits maxAttempts=3 eventually)
await expect(d.dispatch(1)).rejects.toThrow(/fail:Old/);
const before = (d.snapshot()[0] as any).totalRuns;
d.replaceWorker(0, good);
// should succeed with new fn
await expect(d.dispatch(2)).resolves.toBe('New:2');
const after = (d.snapshot()[0] as any).totalRuns;
expect(after).toBeGreaterThanOrEqual(before + 1); // totalRuns not reset to 0
});
});
describe('addWorker()', () => {
test('adds a new active worker and immediately helps drain queued tasks', async () => {
// slot[0] will be a long-running worker to block
const gate = deferred<void>();
const longRunner: F = jest.fn(async (x: number) => {
await gate.promise;
return `L:${x}`;
});
const d = new WorkflowDispatcher<F>([longRunner], { backoffBaseMs: 10 });
// occupy slot[0]
const p1 = d.dispatch(1);
await flushMicro();
// second task will queue (no free worker)
const p2 = d.dispatch(2);
await flushMicro();
// add a new fast worker at tail
const idx = d.addWorker(makeSuccess('N'));
expect(idx).toBe(1);
// p2 should finish via the new worker immediately
await expect(p2).resolves.toBe('N:2');
// release p1 and it should finish too
gate.resolve();
await expect(p1).resolves.toBe('L:1');
});
});
describe('removeWorker()', () => {
test('removing a pending worker splices it and re-maps indices (specific queues + global triedWorkers)', async () => {
// slots: [pending A, active B, active C]
const Adef = deferred<F>();
const B = makeSuccess('B');
const C = makeSuccess('C');
const d = new WorkflowDispatcher<F>([Adef.promise, B, C], {
backoffBaseMs: 10,
});
// queue specific to index 0 (pending)
const pA1 = d.dispatchSpecific(0, 100);
await flushMicro();
// remove index 0 (pending A)
const removed = d.removeWorker(0);
await nextMacrotask(); // allow macro rejection for its queue
await expect(removed).resolves.toBeUndefined();
await expect(pA1).rejects.toThrow(/removed|failed to initialize/);
// now original [1,2] -> become [0,1]
// specific to "original 1" should now be index 0 and succeed
await expect(d.dispatchSpecific(0, 1)).resolves.toBe('B:1');
await expect(d.dispatchSpecific(1, 2)).resolves.toBe('C:2');
// check globalQueue triedWorkers re-map:
// trigger a fail on B to add it to triedWorkers of a global task
const badB: F = jest.fn(async () => {
throw new Error('fail:B');
});
d.replaceWorker(0, badB);
const g = d.dispatch(7); // will try slot[0] then retry others
await flushMicro();
// now remove the failing worker (index 0)
const done = d.removeWorker(0);
await nextMacrotask();
await expect(done).resolves.toBeUndefined();
// global task should still complete using C (now at index 0 after splice)
await expect(g).resolves.toBe('C:7');
});
test('removing an active running worker resolves when that last task finishes', async () => {
// slot[0] long running, slot[1] fast
const gate = deferred<void>();
const long: F = jest.fn(async (x: number) => {
await gate.promise;
return `L:${x}`;
});
const fast = makeSuccess('F');
const d = new WorkflowDispatcher<F>([long, fast], { backoffBaseMs: 10 });
// occupy slot[0] with a specific task so we know exactly which worker
const p1 = d.dispatchSpecific(0, 1);
await flushMicro();
// remove slot[0] while it is running -> removal promise should resolve only after p1 settles
const removing = d.removeWorker(0);
// the slot is no longer pickable; new specific(0) should now refer to old index 1 (fast)
await expect(d.dispatchSpecific(0, 2)).resolves.toBe('F:2');
// still running p1 should finish, then `removing` resolves
const settleOrder: string[] = [];
p1.then(() => settleOrder.push('p1'));
removing.then(() => settleOrder.push('removing'));
// release long running
gate.resolve();
await flushMicro();
await nextMacrotask();
expect(settleOrder).toEqual(['p1', 'removing']);
});
test('removing an idle active worker resolves immediately and re-maps indices correctly', async () => {
const A = makeSuccess('A');
const B = makeSuccess('B');
const C = makeSuccess('C');
const d = new WorkflowDispatcher<F>([A, B, C], { backoffBaseMs: 10 });
// Nothing running yet, remove middle index 1 (B)
const pr = d.removeWorker(1);
await expect(pr).resolves.toBeUndefined();
// Now original C becomes index 1
await expect(d.dispatchSpecific(0, 10)).resolves.toBe('A:10');
await expect(d.dispatchSpecific(1, 20)).resolves.toBe('C:20');
// Global dispatch still balances across remaining two
const r1 = await d.dispatch(1);
const r2 = await d.dispatch(2);
expect([r1, r2].some((s) => s.startsWith('A'))).toBe(true);
expect([r1, r2].some((s) => s.startsWith('C'))).toBe(true);
});
test('removing a rejected worker resolves immediately and flushes its specific queue', async () => {
// Build: [rejected X, active Y]
const Xdef = deferred<F>();
const Y = makeSuccess('Y');
const d = new WorkflowDispatcher<F>([Xdef.promise, Y], {
backoffBaseMs: 10,
});
// specific to 0 queues into X (pending)
const p = d.dispatchSpecific(0, 1);
await flushMicro();
// reject X init
Xdef.reject(new Error('X-init-fail'));
await nextMacrotask(); // allow rejected pending flush in drain()
// remove the rejected worker
const pr = d.removeWorker(0);
await expect(pr).resolves.toBeUndefined();
// the queued task should have already been rejected
await expect(p).rejects.toThrow(/failed to initialize|removed/);
// now only Y remains as index 0
await expect(d.dispatchSpecific(0, 2)).resolves.toBe('Y:2');
});
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment