Commit c4303824 authored by chechunchi's avatar chechunchi

add src/infra/stream.ts

parent 2869c08b
// Some implementation of infrastructure
export * from "./sleep";
export * from "./stream";
export const sleep = (delay: number) =>
new Promise((resolve) => setTimeout(resolve, delay));
import handleSocketMessage from "@/service/onSocketMessage";
import { sleep } from "./sleep";
const SLEEP_INTERVAL = 200;
export class WebSocketStream {
public ws: WebSocket;
stream: ReadableStream;
constructor(ip: string, onWsOpen?: (ws: WebSocket, ev: Event) => any) {
this.ws = new WebSocket("wss://" + ip);
if (onWsOpen) {
this.ws.onopen = (e) => onWsOpen(this.ws, e);
}
const ws = this.ws;
this.stream = new ReadableStream({
start(controller) {
// 当Websocket有数据到达时,加入队列
ws.onmessage = (event) => {
controller.enqueue(event);
};
ws.onclose = () => {
console.info("Websocket closed.");
controller.close();
};
},
pull(_) {
// currently not really need
},
cancel() {
// currently not
},
});
}
// 异步地从Websocket中获取数据并处理
async execute(onMessage: (event: MessageEvent) => Promise<void>) {
const reader: ReadableStreamDefaultReader<MessageEvent> =
this.stream.getReader();
const ws = this.ws;
reader.read().then(async function process({ done, value }): Promise<void> {
if (done) {
if (ws.readyState == WebSocket.CLOSED) {
// websocket connection has been closed
console.info("WebSocket closed, stream complete.");
return;
} else {
// websocket not closed, sleep sometime, wait for next message from server
await sleep(SLEEP_INTERVAL);
return reader.read().then(process);
}
}
if (value) {
await onMessage(value);
} else {
console.warn("value from ReadableStream is undefined!");
}
// read some more, and call process function again
return reader.read().then(process);
});
}
// 关闭流
close() {
this.ws.close();
}
}
......@@ -4,6 +4,8 @@
* 所有长连接/Websocket相关的逻辑都应该收敛在这里。
*
* */
import { WebSocketStream } from "@/infra";
import handleSocketMessage from "../service/onSocketMessage";
import handleSocketOpen from "../service/onSocketOpen";
......@@ -28,24 +30,19 @@ export interface socketAction {
payload?: Uint8Array;
}
let ws: WebSocket | null = null;
let ws: WebSocketStream | null = null;
// FIXME: 应该有个返回值,告诉业务方本次请求的结果。比如建立长连接失败。
export default function (action: socketAction) {
export default async function (action: socketAction) {
switch (action.cmd) {
case socketCmd.CONNECT: {
const info = action.initInfo;
if (info) {
ws = new WebSocket("wss://" + info.ip);
ws = new WebSocketStream(info.ip, (conn, _event) =>
handleSocketOpen(conn, info.ip, info.player, info.passWd)
);
ws.onopen = () => {
handleSocketOpen(ws, info.ip, info.player, info.passWd);
};
ws.onclose = () => {
console.log("WebSocket closed.");
ws = null;
};
ws.onmessage = handleSocketMessage;
await ws.execute(handleSocketMessage);
}
break;
......@@ -60,7 +57,7 @@ export default function (action: socketAction) {
case socketCmd.SEND: {
const payload = action.payload;
if (ws && payload) {
ws.send(payload);
ws.ws.send(payload);
}
break;
......
......@@ -28,7 +28,7 @@ const NeosConfig = useConfig();
* 然后再分发到各个处理函数中去处理。
*
* */
export default function handleSocketMessage(e: MessageEvent) {
export default async function handleSocketMessage(e: MessageEvent) {
const packet = YgoProPacket.deserialize(e.data);
const pb = adaptStoc(packet);
const delay = handleDelay(pb);
......
......@@ -12,7 +12,7 @@ const NeosConfig = useConfig();
*
* */
export default function handleSocketOpen(
ws: WebSocket | null,
ws: WebSocket | undefined,
_ip: string,
player: string,
passWd: string
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment