Commit 4011c4b8 authored by 神楽坂玲奈's avatar 神楽坂玲奈 Committed by 铃兰

fix

parent c7fba0e4
Pipeline #16078 passed with stages
in 2 minutes and 8 seconds
...@@ -50,8 +50,8 @@ interface Change { ...@@ -50,8 +50,8 @@ interface Change {
} }
``` ```
如果报文的 seq 为 0,重置状态和路由表 如果报文的 seq 为 0,重置状态和路由表
如果报文的 seq 跟自己的 report seq 不同,忽略。 如果报文的 seq 跟自己的 ack 不同,忽略。
seq自增,按照要求修改路由表,改完之后立即额外发送一次 Report 报文 (可以不带 peers) 按照要求修改路由表,seq 自增, 额外发送一次 Report 报文 (可以不带 peers)
## 服务器行为 ## 服务器行为
...@@ -62,20 +62,18 @@ seq自增,按照要求修改路由表,改完之后立即额外发送一次 R ...@@ -62,20 +62,18 @@ seq自增,按照要求修改路由表,改完之后立即额外发送一次 R
- 非 正在变更 状态: - 非 正在变更 状态:
为每个路由器检查,目标到每个路由器,下一跳哪个路由器最优。 为每个路由器检查,目标到每个路由器,下一跳哪个路由器最优。
只假设这台路由器的下一跳会变化,其他路由表用旧的不考虑改变。 只假设这台路由器的下一跳会变化,其他路由表用旧的不考虑改变。
如果发现有比现在更优的,就为这个路由器 下发 Change 报文。之后进入正在变更状态,跳过其他路由器的检查 如果发现有比现在更优的,就为这个路由器 seq++ 并 下发 Change 报文。之后进入正在变更状态,跳过其他路由器的检查
- 正在变更 状态: - 正在变更 状态:
向路由器发送 Change 报文 向路由器发送 Change 报文
若正在变更的客户端被标记为下线,退出正在变更状态 若正在变更的客户端被标记为下线,退出正在变更状态
### 收到客户端 Report 报文时 ### 收到客户端 Report 报文时
如果报文顺序号为 0,重置状态并立即发送 Change(seq=0) 如果报文 ack 为 0,重置状态并立即发送 Change
如果报文顺序号不为0而服务器顺序号为0,立即发送 Change 如果报文 ack 不为 0 而服务器 seq 为 0,立即发送 Change
如果报文顺序号不为 change seq +1,忽略。 如果报文 ack 不为 change seq +1,忽略。
如果携带了 peers 信息,存下来。 如果携带了 peers 信息,存下来。
- 如果这个路由器正在变更 如果这个路由器正在变更,退出正在变更状态
seq 自增
退出正在变更状态
## 关于顺序号,断线、重启的一些解释: ## 关于顺序号,断线、重启的一些解释:
### 客户端跟客户端之间 ### 客户端跟客户端之间
......
...@@ -7,22 +7,22 @@ export interface PeerQuality { ...@@ -7,22 +7,22 @@ export interface PeerQuality {
} }
// 路由器向中心服务器发送的消息 // 路由器向中心服务器发送的消息
export interface UploadMessage { export interface Report {
id: number; // router id id: number;
ack: number; ack: number;
peers?: Record<number, PeerQuality>; peers?: Record<number, PeerQuality>;
} }
// 中心服务器向路由器发送的消息 // 中心服务器向路由器发送的消息
export interface DownloadMessage { export interface Change {
seq: number, seq: number,
via: Record<number, number>, via: Record<number, number>,
plan: Record<number, number> // plan: Record<number, number>
} }
// 路由器向路由器发送的消息 // 路由器向路由器发送的消息
export interface PeerMessage { export interface Hello {
id: number; id: number; // 自己的id
seq: number; seq: number; // 这个报文的顺序号,每次发送自增
time: number; time: number; // 自己的系统时间,EPOCH 以来的毫秒数
} }
import { RemoteInfo, Socket } from 'dgram'; import { RemoteInfo, Socket } from 'dgram';
import { DownloadMessage, PeerQuality, UploadMessage } from '../protocol'; import { Change, PeerQuality, Report } from '../protocol';
import routers from '../import/data/Router.json'; import routers from '../import/data/Router.json';
// import plans from '../config/plans.json'; // import plans from '../config/plans.json';
import assert from 'assert'; import assert from 'assert';
...@@ -12,24 +12,19 @@ import _connections from '../import/connections.json'; ...@@ -12,24 +12,19 @@ import _connections from '../import/connections.json';
const connections: Record<number, Record<number, { metric: number, protocol: string }>> = _connections; const connections: Record<number, Record<number, { metric: number, protocol: string }>> = _connections;
export class Router { export class Router {
static updating?: Router;
static updating_timer: any;
static timeout_timer: any;
static all: Router[] = routers.map(s => new Router(s.id)); static all: Router[] = routers.map(s => new Router(s.id));
static updating?: Router;
// config
// id!: number;
seq = 0; seq = 0;
peers: Record<number, PeerQuality> = {}; peers: Record<number, PeerQuality> = {};
via: Map<Router, Router> = new Map(); via: Map<Router, Router> = new Map();
// plan: Record<number, number> = {}; // plan: Record<number, number> = {};
time: number = 0;
rinfo?: RemoteInfo; rinfo?: RemoteInfo;
constructor(public id: number) { constructor(public id: number) {
// Object.assign(this, config); this.reset();
} }
reset() { reset() {
...@@ -38,59 +33,39 @@ export class Router { ...@@ -38,59 +33,39 @@ export class Router {
for (const router of Router.all.filter(r => r.id !== this.id)) { for (const router of Router.all.filter(r => r.id !== this.id)) {
this.via.set(router, router); this.via.set(router, router);
} }
if (Router.updating == this) Router.updating = undefined;
// for (const plan of plans.filter(plan => !plan.routers.includes(this.id))) { // for (const plan of plans.filter(plan => !plan.routers.includes(this.id))) {
// this.plan[plan.id] = this.id; // this.plan[plan.id] = this.id;
// } // }
} }
lost() { onMessage(socket: Socket, data: Report) {
this.seq = 0; // 客户端重启
this.rinfo = undefined; if (data.ack === 0) {
return console.log(`router ${this.id} lost connection.`); this.reset();
} this.time = Date.now();
this.send(socket, {}, {});
route_quality(to: Router, via: Router = this.via.get(to)!) { // 服务器重启或客户端下线
assert(via !== undefined); } else if (this.seq == 0) {
assert(this !== via); this.time = Date.now();
assert(this !== to); this.send(socket, {}, {});
} else if (data.ack == this.seq + 1) {
const result = new Quality(); this.time = Date.now();
if (data.peers) this.peers = data.peers;
let current: Router = this; if (Router.updating === this) Router.updating = undefined;
let next = via;
const route = [current, next];
while (true) {
const quality = next.peers[current.id];
if (!quality || quality.reliability <= 0) return Quality.unreachable; // 不通
result.concat(quality.delay, quality.jitter, quality.reliability, connections[current.id][next.id].metric);
if (next === to) return result; // 到达
// 寻找下一跳
current = next;
next = current.via.get(to)!;
assert(next); //to server_id 型路由,由于 server 两两相连,下一跳一定是存在的,至少能直达
if (route.includes(next)) {
// 环路
return Quality.unreachable;
} else {
route.push(next);
}
} }
} }
onMessage(hello: UploadMessage) { update(socket: Socket) {
if (hello.ack === 0) this.reset(); if (!this.rinfo) return;
if (hello.peers) this.peers = hello.peers; if (Date.now() - this.time > config.timeout) {
if (Router.updating === this && hello.ack === this.seq + 1) { console.log(`router ${this.id} lost connection.`);
clearInterval(Router.updating_timer); this.rinfo = undefined;
clearTimeout(Router.timeout_timer); this.reset();
Router.updating = undefined; return;
this.seq++;
} }
} if (Router.updating) return;
update(socket: Socket) {
const changedVia: Record<number, number> = {}; const changedVia: Record<number, number> = {};
const metric: Record<number, number> = {}; const metric: Record<number, number> = {};
for (const to of Router.all.filter(r => r.id !== this.id)) { for (const to of Router.all.filter(r => r.id !== this.id)) {
...@@ -130,25 +105,45 @@ export class Router { ...@@ -130,25 +105,45 @@ export class Router {
// } // }
if (!_.isEmpty(changedVia) || !_.isEmpty(changedPlan)) { if (!_.isEmpty(changedVia) || !_.isEmpty(changedPlan)) {
this.seq++;
Router.updating = this; Router.updating = this;
Router.updating_timer = setInterval(() => this.send(socket, changedVia, changedPlan), config.interval);
this.send(socket, changedVia, changedPlan); this.send(socket, changedVia, changedPlan);
Router.timeout_timer = setTimeout(() => {
this.lost();
clearInterval(Router.updating_timer);
Router.updating = undefined;
}, config.timeout * config.interval);
} }
} }
send(socket: Socket, via: Record<number, number>, plan: Record<number, number>) { send(socket: Socket, via: Record<number, number>, plan: Record<number, number>) {
if (!this.rinfo) return; const message: Change = { seq: this.seq, via };
const message: DownloadMessage = { seq: this.seq, via, plan };
console.log(this.id, message); console.log(this.id, message);
return socket.send(JSON.stringify(message), this.rinfo.port, this.rinfo.address); return socket.send(JSON.stringify(message), this.rinfo!.port, this.rinfo!.address);
} }
}
for (const router of Router.all) { route_quality(to: Router, via: Router = this.via.get(to)!) {
router.reset(); assert(via !== undefined);
assert(this !== via);
assert(this !== to);
const result = new Quality();
let current: Router = this;
let next = via;
const route = [current, next];
while (true) {
const quality = next.peers[current.id];
if (!quality || quality.reliability <= 0) return Quality.unreachable; // 不通
result.concat(quality.delay, quality.jitter, quality.reliability, connections[current.id][next.id].metric);
if (next === to) return result; // 到达
// 寻找下一跳
current = next;
next = current.via.get(to)!;
assert(next); //to server_id 型路由,由于 server 两两相连,下一跳一定是存在的,至少能直达
if (route.includes(next)) {
// 环路
return Quality.unreachable;
} else {
route.push(next);
}
}
}
} }
import dgram from 'dgram'; import dgram from 'dgram';
import { UploadMessage } from '../protocol'; import { Report } from '../protocol';
import assert from 'assert'; import assert from 'assert';
import { Router } from './Router'; import { Router } from './Router';
import config from '../config/config.json'; import config from '../config/config.json';
...@@ -20,16 +20,16 @@ const socket = dgram ...@@ -20,16 +20,16 @@ const socket = dgram
const address = socket.address(); const address = socket.address();
console.log(`listening ${address.address}:${address.port}`); console.log(`listening ${address.address}:${address.port}`);
}) })
.on('message', function (message, rinfo) { .on('message', function(message, rinfo) {
try { try {
const hello: UploadMessage = JSON.parse(message.toString()); const hello: Report = JSON.parse(message.toString());
assert(hello.id); assert(hello.id);
const router: Router = Router.all.find((r) => r.id === hello.id)!; const router: Router = Router.all.find((r) => r.id === hello.id)!;
assert(router); assert(router);
router.rinfo = rinfo; router.rinfo = rinfo;
router.onMessage(hello); router.onMessage(socket, hello);
} catch (e) { } catch (e) {
console.warn(e); console.warn(e);
} }
...@@ -37,8 +37,5 @@ const socket = dgram ...@@ -37,8 +37,5 @@ const socket = dgram
socket.bind(config.port); socket.bind(config.port);
setInterval(() => { setInterval(() => {
for (const from of Router.all) { for (const from of Router.all) from.update(socket);
if (Router.updating) return;
from.update(socket);
}
}, config.interval); }, config.interval);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment