Commit 04c50739 authored by nanamicat's avatar nanamicat

fix

parent 66048b4c
/import/
/target/
\ No newline at end of file
/target/
/.idea/
\ No newline at end of file
This diff is collapsed.
[package]
name = "railgun-routing-client"
name = "railgun-routing-server"
version = "0.1.0"
edition = "2024"
......@@ -7,25 +7,9 @@ edition = "2024"
[dependencies]
config = "0.15.19"
average = "0.16.0"
serde_json = "1.0.145"
serde = { version = "1.0.228", features = ["derive", "rc"] }
# serde_derive = "1.0" # Included in serde features
tokio = { version = "1.48", features = ["full"] }
rtnetlink = "0.19.0"
anyhow = "1.0.100"
# bincode = { version = "2.0.1", features = ["derive"] } # Removed as we use JSON
axum = "0.7"
tower-http = { version = "0.5", features = ["fs", "cors", "trace"] }
tracing = "0.1"
once_cell = "1.19"
crossbeam-queue = "0.3"
dashmap = "5.5"
bincode = "2.0.1"
tokio-util = { version = "0.7.17", features = ["codec"] }
bytes = "1.11.0"
http = "1.4.0"
tokio-stream = "0.1.17"
futures = "0.3.31"
httparse = "1.10.1" # For concurrent map access
// use crate::protocol::PeerQuality;
// use crate::router::GlobalState;
// use axum::{Router as AxumRouter, extract::State, response::Json, routing::get};
// use serde::Serialize;
// use std::sync::Arc;
//
//
// struct Status {
// tick: u64,
// last_update_ms: u64,
// msg: String,
// }
//
// // Response structs
// #[derive(Serialize)]
// struct RouterStateResponse {
// id: u8,
// peers: std::collections::HashMap<u8, PeerQuality>,
// via: std::collections::HashMap<u8, u8>,
// }
//
// #[derive(Serialize)]
// struct RouterConfigResponse {
// id: u8,
// name: String,
// location: String,
// host: String,
// #[serde(rename = "sshPort")]
// ssh_port: u16,
// user: String,
// }
//
// =
// // serde_json::from_str(&std::fs::read_to_string("../import/connections.json")?)?;
// // let mut routers: BTreeMap<u8, Router>
//
// pub fn app(state: Arc<(&BTreeMap<u8, BTreeMap<u8, ConnectionData>>>)) -> AxumRouter {
// AxumRouter::new()
// .route("/api/state", get(get_state))
// .route("/api/routers", get(get_routers))
// .route("/api/connections", get(get_connections))
// .with_state(state)
// }
//
// async fn get_state(State(state): State<Arc<GlobalState>>) -> Json<Vec<RouterStateResponse>> {
// let mut result = Vec::new();
// // iterate routers
// for router_lock in &state.routers {
// let router = router_lock.read().await;
// result.push(RouterStateResponse {
// id: router.id,
// peers: router.peers.clone(),
// via: router.via.clone(),
// });
// }
// Json(result)
// }
//
// async fn get_routers(State(state): State<Arc<GlobalState>>) -> Json<Vec<RouterConfigResponse>> {
// let mut result = Vec::new();
// for router_lock in &state.routers {
// let router = router_lock.read().await;
// result.push(RouterConfigResponse {
// id: router.config.id,
// name: router.config.name.clone(),
// location: router.config.location.clone(),
// host: router.config.host.clone(),
// ssh_port: router.config.ssh_port,
// user: router.config.user.clone(),
// });
// }
// Json(result)
// }
//
// async fn get_connections(
// State(state): State<Arc<GlobalState>>,
// ) -> Json<std::collections::HashMap<u8, std::collections::HashMap<u8, crate::router::ConnectionData>>>
// {
// Json(state.connections.clone())
// }
use bytes::{Buf, BufMut, BytesMut};
use http::{Response, Version};
use std::io;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_stream::{StreamExt, StreamMap};
use tokio_util::codec::{Decoder, Encoder, Framed};
pub struct HttpMonitor {
listener: TcpListener,
connections: StreamMap<SocketAddr, Framed<TcpStream, SimpleHttpCodec>>,
}
impl HttpMonitor {
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
Ok(Self {
listener: TcpListener::bind(addr).await?,
connections: StreamMap::new(),
})
}
/// 在 select! 中调用的核心方法
pub async fn next_request(&mut self) -> io::Result<(http::Request<()>, Framed<TcpStream, SimpleHttpCodec>)> {
loop {
tokio::select! {
// 1. 接受新连接
Ok((stream, addr)) = self.listener.accept() => {
// 优化:开启 TCP NoDelay,对 HTTP 这种请求-响应模式很重要,减少延迟
let _ = stream.set_nodelay(true);
self.connections.insert(addr, Framed::new(stream, SimpleHttpCodec));
}
// 2. 处理已有连接的数据
// next() 会轮询所有连接,一旦有任何一个解析出完整的 HTTP 包,就返回
res = self.connections.next(), if !self.connections.is_empty() => {
match res {
Some((addr, Ok(req))) => {
// 暂时移除连接,将其所有权移交给 Request 对象
return Ok((req, self.connections.remove(&addr).unwrap()));
}
Some((addr, Err(_))) => {
// 出错或断开,清理
self.connections.remove(&addr);
}
None => {}
}
}
}
}
}
}
// --- 底层 Codec (不需要关心细节) ---
const MAX_HEADER_SIZE: usize = 8192; // 8KB header limit
pub struct SimpleHttpCodec;
impl Decoder for SimpleHttpCodec {
type Item = http::Request<()>;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// 安全检查:防止 Payload 过大导致的 OOM 攻击
if src.len() > MAX_HEADER_SIZE {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Request too large"));
}
// httparse 需要预分配一些 slot 来存放 headers
// 对于简单的监控接口,16 个 header 足够了,甚至不需要分配堆内存
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut req = httparse::Request::new(&mut headers);
// httparse::Request::parse 尝试从字节切片中解析
match req.parse(src) {
// 1. 解析成功 (Complete)
Ok(httparse::Status::Complete(bytes_parsed)) => {
// 获取我们需要的数据
let method = req.method.unwrap_or("GET");
let path = req.path.unwrap_or("/");
let mut builder = http::Request::builder().method(method).uri(path).version(Version::HTTP_11);
for header in req.headers {
builder = builder.header(header.name, header.value);
}
// !!! 关键步骤 !!!
// 告诉 BytesMut 我们消耗了多少字节,把这些字节从缓存中移除
src.advance(bytes_parsed);
Ok(Some(builder.body(()).unwrap()))
}
// 2. 数据还没发完 (Partial)
// 比如只发了 "GET /sta",还没有发 "\r\n"
// 返回 Ok(None) 告诉 Framed:继续读 TCP,读到了再叫我
Ok(httparse::Status::Partial) => Ok(None),
// 3. 解析错误 (比如发了乱码)
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e.to_string())),
}
}
}
impl Encoder<Response<String>> for SimpleHttpCodec {
type Error = io::Error;
fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> Result<(), Self::Error> {
// 1. 写入 Status Line (例如: HTTP/1.1 200 OK)
// 注意:http crate 的 Version debug 输出就是 "HTTP/1.1" 这种格式
let version = match item.version() {
Version::HTTP_11 => "HTTP/1.1",
Version::HTTP_10 => "HTTP/1.0",
_ => "HTTP/1.1",
};
// 预估一个容量以减少内存重新分配
let body_bytes = item.body().as_bytes();
dst.reserve(256 + body_bytes.len());
// 使用 writer 避免中间字符串分配
use std::io::Write;
let mut writer = dst.writer();
// 1. 写入 Status Line
// write! 宏可以直接写入 BytesMut,比 format! 更高效
let _ = write!(writer, "{} {}\r\n", version, item.status());
// 2. 写入 Headers
for (key, value) in item.headers() {
let _ = writer.write(key.as_str().as_bytes());
let _ = writer.write(b": ");
let _ = writer.write(value.as_bytes());
let _ = writer.write(b"\r\n");
}
// 3. 自动补充 Headers
if !item.headers().contains_key("content-length") {
let _ = write!(writer, "content-length: {}\r\n", body_bytes.len());
}
if !item.headers().contains_key("connection") {
let _ = writer.write(b"connection: close\r\n");
}
// 4. Header 结束
let _ = writer.write(b"\r\n");
// 5. Body
let _ = writer.write(body_bytes);
Ok(())
}
}
use crate::data::{ConnectionData, RouterData};
use crate::http::HttpMonitor;
use crate::protocol::{Change, Report};
use crate::router::Router;
use crate::settings::{Settings, TIMEOUT};
use ::config::Config;
use ::http::Response;
use anyhow::Result;
use ::config::Config;
use config::{Environment, File};
use futures::SinkExt;
use std::collections::BTreeMap;
use tokio::net::UdpSocket;
use tokio::select;
use tokio::time::{Duration, interval};
use tokio::time::{interval, Duration};
mod api;
mod data;
mod gateway_group;
mod http;
mod protocol;
mod quality;
mod router;
......@@ -68,10 +63,7 @@ async fn main() -> Result<()> {
let socket = UdpSocket::bind(config.udp_bind).await?;
println!("UDP listening on {}", config.udp_bind);
let mut buf = [0u8; 65535]; // Max UDP size
let mut http = HttpMonitor::bind(config.http_bind).await?;
println!("HTTP listening on {}", config.http_bind);
let mut buf = [0u8; u16::MAX as usize]; // Max UDP size
let mut interval = interval(Duration::from_secs(1));
......@@ -85,7 +77,10 @@ async fn main() -> Result<()> {
bincode::decode_from_slice::<Report, _>(&buf[..len], bincode::config::standard())
&& let Some(router) = routers.get_mut(&report.id)
{
router.on_message(report,&socket, addr, &mut updating);
if let Some(change) = router.on_message(report, addr, &mut updating) {
let len = bincode::encode_into_slice(change, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], addr).await;
}
}
}
......@@ -93,7 +88,8 @@ async fn main() -> Result<()> {
if updating.router_id != 0 {
let router = routers.get_mut(&updating.router_id).expect("updating router_id should exist");
if now.duration_since(router.time) < TIMEOUT {
let _ = router.send(&updating.change, &socket, router.addr.unwrap()); // updating 期间 addr 一定存在
let len = bincode::encode_into_slice(&updating.change, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], router.addr.unwrap()).await;
} else {
router.offline();
router.finish(&mut updating);
......@@ -105,25 +101,11 @@ async fn main() -> Result<()> {
router.offline();
}
}
if let Some(result) = routers.values().find_map(|r|r.update(&routers, &connections)) {
updating = result;
let router = routers.get_mut(&updating.router_id).expect("updating router_id should exist");
let _ = router.send(&updating.change, &socket, router.addr.unwrap());
}
}
}
result = http.next_request() => {
if let Ok((req, mut connection)) = result {
if req.uri().path() == "/status" {
let json = serde_json::to_string(&connections)?;
let response = Response::builder()
.header("Content-Type", "application/json")
.body(json)?;
tokio::spawn(async move {
connection.send(response).await
});
if let Some((router, change)) = routers.values().find_map(|r|r.update(&routers, &connections).map(|change|(r,change))) {
updating.router_id = router.id;
updating.change = change;
let len = bincode::encode_into_slice(&updating.change, &mut buf, bincode::config::standard())?;
let _ = socket.send_to(&buf[..len], router.addr.unwrap()).await;
}
}
}
......
use std::collections::BTreeMap;
use bincode::{Decode, Encode};
#[derive(Encode, Decode)]
pub struct Hello {
pub time: u16,
}
use std::collections::BTreeMap;
#[derive(Encode, Decode)]
pub struct Report {
......
use crate::protocol::PeerQuality;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct Quality {
pub delay: i32,
pub jitter: u32,
......@@ -35,7 +35,7 @@ impl Quality {
}
}
fn default() -> Self {
pub(crate) fn default() -> Self {
Self {
delay: 0,
jitter: 0,
......
......@@ -5,7 +5,6 @@ use crate::settings::THROTTLE;
use crate::UpdatingState;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use tokio::net::UdpSocket;
use tokio::time::Instant;
pub struct Router {
......@@ -68,7 +67,7 @@ impl Router {
self.time = Instant::now();
}
pub fn on_message(&mut self, data: Report, socket: &UdpSocket, addr: SocketAddr, updating: &mut UpdatingState) {
pub fn on_message(&mut self, data: Report, addr: SocketAddr, updating: &mut UpdatingState) -> Option<Change> {
// self.addr None (+不满足上线条件) => 客户端不认为自己新上线,但是服务端不认识,立即发送 rst 当前状态,暂不承认上线
// syn + peer => 新上线的客户端,立即发送 rst 当前状态,暂不承认上线,有 updating 解除 updating, addr=none
// syn + no peer + 正确的 ack 号 => 确认了关于 rst 的指令,上线
......@@ -78,50 +77,41 @@ impl Router {
match (self.addr != None, data.syn, data.peers.len() != 0, data.ack == self.seq.wrapping_add(1)) {
(false, true, false, true) => {
self.online(addr);
None
}
(false, _, _, _) | (_, true, true, _) => {
self.offline();
self.finish(updating);
let change = Change {
Some(Change {
seq: self.seq,
via: self.via.clone(),
plan: self.plan.clone(),
rst: true,
};
self.send(&change, socket, addr);
})
}
(true, false, true, true) => {
self.online(addr);
for (current, new) in self.peers.values_mut().zip(data.peers) {
*current = new
}
None
}
(true, false, false, true) => {
self.online(addr);
self.finish(updating);
None
}
_ => {}
_ => None,
}
}
pub async fn change(&self, message: Change, updating: &mut UpdatingState, socket: &UdpSocket) {
updating.router_id = self.id;
updating.change = message;
self.send(&updating.change, socket, self.addr.unwrap()).await;
}
pub fn finish(&self, updating: &mut UpdatingState) {
if updating.router_id == self.id {
updating.router_id = 0;
}
}
pub async fn send(&self, message: &Change, socket: &UdpSocket, addr: SocketAddr) {
let buf = bincode::encode_to_vec(message, bincode::config::standard()).unwrap();
socket.send_to(&buf, addr).await;
}
pub fn update(&self, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Option<UpdatingState> {
pub fn update(&self, routers: &BTreeMap<u8, Router>, connections: &BTreeMap<u8, BTreeMap<u8, ConnectionData>>) -> Option<Change> {
if self.addr.is_none() {
return None;
}
......@@ -145,14 +135,11 @@ impl Router {
}
if changed_via.len() > 0 {
Some(UpdatingState {
router_id: self.id,
change: Change {
seq: self.seq,
rst: false,
via: changed_via,
plan: BTreeMap::new(),
},
Some(Change {
seq: self.seq,
rst: false,
via: changed_via,
plan: BTreeMap::new(),
})
} else {
None
......@@ -163,7 +150,7 @@ impl Router {
assert!(self != to);
assert!(self != via);
let mut result = Quality::default();
let mut result: Quality = Quality::default();
let mut route = vec![self, via];
let mut current = self;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment