Commit f06c222c authored by nanahira's avatar nanahira

add pqueue for data handle

parent 3b205665
Pipeline #38329 passed with stages
in 13 minutes and 29 seconds
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
"moment": "^2.29.1", "moment": "^2.29.1",
"mysql": "^2.18.1", "mysql": "^2.18.1",
"node-os-utils": "^1.3.2", "node-os-utils": "^1.3.2",
"p-queue": "6.6.2", "p-queue": "^6.6.2",
"pg": "^6.4.2", "pg": "^6.4.2",
"q": "^1.5.1", "q": "^1.5.1",
"querystring": "^0.2.0", "querystring": "^0.2.0",
...@@ -2318,6 +2318,7 @@ ...@@ -2318,6 +2318,7 @@
"version": "6.6.2", "version": "6.6.2",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz", "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz",
"integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==", "integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==",
"license": "MIT",
"dependencies": { "dependencies": {
"eventemitter3": "^4.0.4", "eventemitter3": "^4.0.4",
"p-timeout": "^3.2.0" "p-timeout": "^3.2.0"
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
"moment": "^2.29.1", "moment": "^2.29.1",
"mysql": "^2.18.1", "mysql": "^2.18.1",
"node-os-utils": "^1.3.2", "node-os-utils": "^1.3.2",
"p-queue": "6.6.2", "p-queue": "^6.6.2",
"pg": "^6.4.2", "pg": "^6.4.2",
"q": "^1.5.1", "q": "^1.5.1",
"querystring": "^0.2.0", "querystring": "^0.2.0",
......
...@@ -2010,6 +2010,12 @@ netRequestHandler = (client) -> ...@@ -2010,6 +2010,12 @@ netRequestHandler = (client) ->
client.pre_establish_buffers = new Array() client.pre_establish_buffers = new Array()
client_data_queue = new PQueue
concurrency: 1
server_data_queue = new PQueue
concurrency: 1
dataHandler = (ctos_buffer) -> dataHandler = (ctos_buffer) ->
if client.is_post_watcher if client.is_post_watcher
room=ROOM_all[client.rid] room=ROOM_all[client.rid]
...@@ -2060,13 +2066,18 @@ netRequestHandler = (client) -> ...@@ -2060,13 +2066,18 @@ netRequestHandler = (client) ->
return return
queuedDataHandler = (ctos_buffer) ->
if client.isClosed or client.system_kicked
return
return await client_data_queue.add(() -> dataHandler(ctos_buffer))
if client.isWs if client.isWs
client.on 'message', dataHandler client.on 'message', queuedDataHandler
else else
client.on 'data', dataHandler client.on 'data', queuedDataHandler
# 服务端到客户端(stoc) # 服务端到客户端(stoc)
server.on 'data', (stoc_buffer)-> serverDataHandler = (stoc_buffer)->
handle_data = await ygopro.helper.handleBuffer(stoc_buffer, "STOC", null, { handle_data = await ygopro.helper.handleBuffer(stoc_buffer, "STOC", null, {
client: server.client, client: server.client,
server: server server: server
...@@ -2080,6 +2091,13 @@ netRequestHandler = (client) -> ...@@ -2080,6 +2091,13 @@ netRequestHandler = (client) ->
await ygopro.helper.send(server.client, buffer) for buffer in handle_data.datas await ygopro.helper.send(server.client, buffer) for buffer in handle_data.datas
return return
queuedServerDataHandler = (stoc_buffer) ->
if server.isClosed
return
return await server_data_queue.add(() -> serverDataHandler(stoc_buffer))
server.on 'data', queuedServerDataHandler
return return
deck_name_match = global.deck_name_match = (deck_name, player_name) -> deck_name_match = global.deck_name_match = (deck_name, player_name) ->
......
...@@ -2556,7 +2556,7 @@ ...@@ -2556,7 +2556,7 @@
// 网络连接 // 网络连接
netRequestHandler = function(client) { netRequestHandler = function(client) {
var closeHandler, dataHandler, server; var client_data_queue, closeHandler, dataHandler, queuedDataHandler, queuedServerDataHandler, server, serverDataHandler, server_data_queue;
if (!client.isWs) { if (!client.isWs) {
client.physical_ip = client.remoteAddress || ""; client.physical_ip = client.remoteAddress || "";
if (CLIENT_set_ip(client)) { if (CLIENT_set_ip(client)) {
...@@ -2683,6 +2683,12 @@ ...@@ -2683,6 +2683,12 @@
// 需要重构 // 需要重构
// 客户端到服务端(ctos)协议分析 // 客户端到服务端(ctos)协议分析
client.pre_establish_buffers = new Array(); client.pre_establish_buffers = new Array();
client_data_queue = new PQueue({
concurrency: 1
});
server_data_queue = new PQueue({
concurrency: 1
});
dataHandler = async function(ctos_buffer) { dataHandler = async function(ctos_buffer) {
var bad_ip_count, buffer, ctos_filter, handle_data, j, l, len, len1, preconnect, ref, ref1, room; var bad_ip_count, buffer, ctos_filter, handle_data, j, l, len, len1, preconnect, ref, ref1, room;
if (client.is_post_watcher) { if (client.is_post_watcher) {
...@@ -2751,13 +2757,21 @@ ...@@ -2751,13 +2757,21 @@
} }
} }
}; };
queuedDataHandler = async function(ctos_buffer) {
if (client.isClosed || client.system_kicked) {
return;
}
return (await client_data_queue.add(function() {
return dataHandler(ctos_buffer);
}));
};
if (client.isWs) { if (client.isWs) {
client.on('message', dataHandler); client.on('message', queuedDataHandler);
} else { } else {
client.on('data', dataHandler); client.on('data', queuedDataHandler);
} }
// 服务端到客户端(stoc) // 服务端到客户端(stoc)
server.on('data', async function(stoc_buffer) { serverDataHandler = async function(stoc_buffer) {
var buffer, handle_data, j, len, ref; var buffer, handle_data, j, len, ref;
handle_data = (await ygopro.helper.handleBuffer(stoc_buffer, "STOC", null, { handle_data = (await ygopro.helper.handleBuffer(stoc_buffer, "STOC", null, {
client: server.client, client: server.client,
...@@ -2777,7 +2791,16 @@ ...@@ -2777,7 +2791,16 @@
await ygopro.helper.send(server.client, buffer); await ygopro.helper.send(server.client, buffer);
} }
} }
}); };
queuedServerDataHandler = async function(stoc_buffer) {
if (server.isClosed || !server.client || server.client.isClosed) {
return;
}
return (await server_data_queue.add(function() {
return serverDataHandler(stoc_buffer);
}));
};
server.on('data', queuedServerDataHandler);
}; };
deck_name_match = global.deck_name_match = function(deck_name, player_name) { deck_name_match = global.deck_name_match = function(deck_name, player_name) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment