Commit 7cf5ec91 authored by nanamicat's avatar nanamicat

http

parent aeade81e
[package]
name = "railgun-routing-client"
version = "0.1.0"
edition = "2024"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
config = "0.15.19"
average = "0.16.0"
serde_json = "1.0.145"
serde = { version = "1.0.228", features = ["derive", "rc"] }
# serde_derive = "1.0" # Included in serde features
tokio = { version = "1.48", features = ["full"] }
rtnetlink = "0.19.0"
anyhow = "1.0.100"
# bincode = { version = "2.0.1", features = ["derive"] } # Removed as we use JSON
axum = "0.7"
tower-http = { version = "0.5", features = ["fs", "cors", "trace"] }
tracing = "0.1"
once_cell = "1.19"
crossbeam-queue = "0.3"
dashmap = "5.5"
bincode = "2.0.1"
tokio-util = { version = "0.7.17", features = ["codec"] }
bytes = "1.11.0"
http = "1.4.0"
tokio-stream = "0.1.17"
futures = "0.3.31"
httparse = "1.10.1" # For concurrent map access
This diff is collapsed.
{
"name": "railgun-routing-server",
"version": "0.0.1",
"author": "zh99998 <zh99998@gmail.com>",
"scripts": {
"start": "ts-node src/main.ts"
},
"dependencies": {
"express": "^4.18.1",
"lodash": "^4.17.21"
},
"devDependencies": {
"@types/express": "^4.17.13",
"@types/lodash": "^4.14.172",
"@types/node": "^16.6.1",
"ts-node": "^10.2.0",
"typescript": "^4.3.5"
}
}
// 这个文件两端共用
export interface PeerQuality {
delay: number;
jitter: number;
reliability: number;
}
// 路由器向中心服务器发送的消息
export interface Report {
id: number;
ack: number;
peers?: Record<number, PeerQuality>;
}
// 中心服务器向路由器发送的消息
export interface Change {
seq: number,
via: Record<number, number>,
plan: Record<number, number>
}
// 路由器向路由器发送的消息
export interface Hello {
id: number; // 自己的id
seq: number; // 这个报文的顺序号,每次发送自增
time: number; // 自己的系统时间,EPOCH 以来的毫秒数
}
max_width = 180
\ No newline at end of file
import { RemoteInfo, Socket } from 'dgram'; import { RemoteInfo, Socket } from 'dgram';
import { Change, PeerQuality, Report } from '../protocol'; import { Change, PeerQuality, Report } from './protocol.rs';
import routers from '../import/data/Router.json'; import routers from '../import/data/Router.json';
import assert from 'assert'; import assert from 'assert';
import { Quality } from './Quality'; import { Quality } from './Quality';
......
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
pub struct RouterData {
pub id: u8,
pub name: String,
pub location: String,
pub host: String,
#[serde(default)]
pub ssh_port: u16,
pub user: String,
}
#[derive(Serialize,Deserialize)]
pub struct ConnectionData {
pub metric: u32,
}
use bytes::{Buf, BufMut, BytesMut};
use http::{Response, Version};
use std::io;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_stream::{StreamExt, StreamMap};
use tokio_util::codec::{Decoder, Encoder, Framed};
pub struct HttpMonitor {
listener: TcpListener,
connections: StreamMap<SocketAddr, Framed<TcpStream, SimpleHttpCodec>>,
}
impl HttpMonitor {
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Ok(Self {
listener: TcpListener::bind(addr).await?,
connections: StreamMap::new(),
})
}
/// 在 select! 中调用的核心方法
pub async fn next_request(&mut self) -> io::Result<(http::Request<()>, Framed<TcpStream, SimpleHttpCodec>)> {
loop {
tokio::select! {
// 1. 接受新连接
Ok((stream, addr)) = self.listener.accept() => {
// 优化:开启 TCP NoDelay,对 HTTP 这种请求-响应模式很重要,减少延迟
let _ = stream.set_nodelay(true);
self.connections.insert(addr, Framed::new(stream, SimpleHttpCodec));
}
// 2. 处理已有连接的数据
// next() 会轮询所有连接,一旦有任何一个解析出完整的 HTTP 包,就返回
res = self.connections.next(), if !self.connections.is_empty() => {
match res {
Some((addr, Ok(req))) => {
// 暂时移除连接,将其所有权移交给 Request 对象
return Ok((req, self.connections.remove(&addr).unwrap()));
}
Some((addr, Err(_))) => {
// 出错或断开,清理
self.connections.remove(&addr);
}
None => {}
}
}
}
}
}
}
// --- 底层 Codec (不需要关心细节) ---
const MAX_HEADER_SIZE: usize = 8192; // 8KB header limit
pub struct SimpleHttpCodec;
impl Decoder for SimpleHttpCodec {
type Item = http::Request<()>;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// 安全检查:防止 Payload 过大导致的 OOM 攻击
if src.len() > MAX_HEADER_SIZE {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Request too large"));
}
// httparse 需要预分配一些 slot 来存放 headers
// 对于简单的监控接口,16 个 header 足够了,甚至不需要分配堆内存
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut req = httparse::Request::new(&mut headers);
// httparse::Request::parse 尝试从字节切片中解析
match req.parse(src) {
// 1. 解析成功 (Complete)
Ok(httparse::Status::Complete(bytes_parsed)) => {
// 获取我们需要的数据
let method = req.method.unwrap_or("GET");
let path = req.path.unwrap_or("/");
let mut builder = http::Request::builder().method(method).uri(path).version(Version::HTTP_11);
for header in req.headers {
builder = builder.header(header.name, header.value);
}
// !!! 关键步骤 !!!
// 告诉 BytesMut 我们消耗了多少字节,把这些字节从缓存中移除
src.advance(bytes_parsed);
Ok(Some(builder.body(()).unwrap()))
}
// 2. 数据还没发完 (Partial)
// 比如只发了 "GET /sta",还没有发 "\r\n"
// 返回 Ok(None) 告诉 Framed:继续读 TCP,读到了再叫我
Ok(httparse::Status::Partial) => Ok(None),
// 3. 解析错误 (比如发了乱码)
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e.to_string())),
}
}
}
impl Encoder<Response<String>> for SimpleHttpCodec {
type Error = io::Error;
fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> Result<(), Self::Error> {
// 1. 写入 Status Line (例如: HTTP/1.1 200 OK)
// 注意:http crate 的 Version debug 输出就是 "HTTP/1.1" 这种格式
let version = match item.version() {
Version::HTTP_11 => "HTTP/1.1",
Version::HTTP_10 => "HTTP/1.0",
_ => "HTTP/1.1",
};
// 预估一个容量以减少内存重新分配
let body_bytes = item.body().as_bytes();
dst.reserve(256 + body_bytes.len());
// 使用 writer 避免中间字符串分配
use std::io::Write;
let mut writer = dst.writer();
// 1. 写入 Status Line
// write! 宏可以直接写入 BytesMut,比 format! 更高效
let _ = write!(writer, "{} {}\r\n", version, item.status());
// 2. 写入 Headers
for (key, value) in item.headers() {
let _ = writer.write(key.as_str().as_bytes());
let _ = writer.write(b": ");
let _ = writer.write(value.as_bytes());
let _ = writer.write(b"\r\n");
}
// 3. 自动补充 Headers
if !item.headers().contains_key("content-length") {
let _ = write!(writer, "content-length: {}\r\n", body_bytes.len());
}
if !item.headers().contains_key("connection") {
let _ = writer.write(b"connection: close\r\n");
}
// 4. Header 结束
let _ = writer.write(b"\r\n");
// 5. Body
let _ = writer.write(body_bytes);
Ok(())
}
}
import dgram from 'dgram';
import { Report } from '../protocol';
import assert from 'assert';
import { Router } from './Router';
import config from '../config/config.json';
import { api } from './Api';
import express from 'express';
import path from 'path';
// http
const app = express();
app.use(express.static(path.join(__dirname, '..', 'public')));
app.use('/api', api);
app.listen(config.port);
// udp routing protocol
const socket = dgram
.createSocket('udp4')
.on('listening', () => {
const address = socket.address();
console.log(`listening ${address.address}:${address.port}`);
})
.on('message', function (message, rinfo) {
try {
const hello: Report = JSON.parse(message.toString());
assert(hello.id);
const router: Router = Router.all.find((r) => r.id === hello.id)!;
assert(router);
router.rinfo = rinfo;
router.onMessage(socket, hello);
} catch (e) {
console.warn(e);
}
});
socket.bind(config.port);
setInterval(() => Router.update(socket), config.interval);
use std::collections::BTreeMap;
use bincode::{Decode, Encode};
#[derive(Encode, Decode)]
pub struct Hello {
pub time: u16,
}
#[derive(Encode, Decode)]
pub struct Report {
pub id: u8,
pub ack: u8,
pub syn: bool,
pub peers: Vec<PeerQuality>,
}
#[derive(Encode, Decode, Copy, Clone)]
pub struct PeerQuality {
pub delay: i16,
pub reliability: u8,
pub jitter: u8,
}
#[derive(Encode, Decode, Default)]
pub struct Change {
pub seq: u8,
pub rst: bool,
pub via: BTreeMap<u8, u8>,
pub plan: BTreeMap<u8, BTreeMap<u8, u8>>,
}
use serde::Deserialize;
use std::net::SocketAddr;
use std::time::Duration;
#[derive(Deserialize)]
pub struct Settings {
pub udp_bind: SocketAddr,
pub http_bind: SocketAddr,
// pub interval: u64,
// pub timeout: u64,
// pub timeout2: u64,
// pub throttle: f64,
}
pub const TIMEOUT: Duration = Duration::from_secs(10);
pub const THROTTLE: i32 = 10;
\ No newline at end of file
{
"compilerOptions": {
"module": "commonjs",
"target": "esnext",
"sourceMap": true,
"esModuleInterop": true,
"resolveJsonModule": true,
"strict": true
},
"exclude": ["node_modules"]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment