Commit cb150597 authored by nanahira's avatar nanahira

first

parents
import { Readable } from 'stream';
export function teex<T = any>(source: Readable, forks: number = 2): Readable[] {
const streams = new Array<Readable>(forks);
const status = new Array<boolean>(forks).fill(true);
let ended = false;
for (let i = 0; i < forks; i++) {
streams[i] = new Readable({
read(size) {
const shouldResume = !status[i];
status[i] = true;
if (shouldResume && allReadable()) source.resume();
},
});
}
source.on('end', () => {
ended = true;
for (const stream of streams) {
stream.push(null);
}
});
source.on('error', (err: Error) => {
for (const stream of streams) {
stream.destroy(err);
}
});
source.on('close', () => {
if (ended) return;
for (const stream of streams) {
stream.destroy();
}
});
source.on('data', (data: T) => {
let needsPause = false;
for (let i = 0; i < streams.length; i++) {
status[i] = streams[i].push(data);
if (!status[i]) needsPause = true;
}
if (needsPause) source.pause();
});
return streams;
function allReadable(): boolean {
return status.every(Boolean);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment