Commit 87991e29 authored by nanamicat's avatar nanamicat

tcp

parent 2fdc99d8
mod router; mod router;
use crate::router::{Meta, Router, META_SIZE, SECRET_LENGTH};
use crate::Schema::{TCP, UDP}; use crate::Schema::{TCP, UDP};
use crate::router::{META_SIZE, Meta, Router, SECRET_LENGTH};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use crossbeam::epoch::{Owned, pin}; use crossbeam::epoch::{pin, Owned};
use crossbeam_utils::thread; use crossbeam_utils::thread;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Deserializer}; use serde::{Deserialize, Deserializer};
use socket2::Domain; use socket2::Domain;
use std::net::Shutdown; use std::net::Shutdown;
use std::sync::Arc;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{collections::HashMap, env, mem::MaybeUninit}; use std::{collections::HashMap, env, mem::MaybeUninit};
...@@ -64,7 +64,7 @@ fn main() -> Result<()> { ...@@ -64,7 +64,7 @@ fn main() -> Result<()> {
let config = serde_json::from_str::<Config>(env::args().nth(1).context("need param")?.as_str())?; let config = serde_json::from_str::<Config>(env::args().nth(1).context("need param")?.as_str())?;
let local_secret: [u8; SECRET_LENGTH] = Router::create_secret(config.local_secret.as_str())?; let local_secret: [u8; SECRET_LENGTH] = Router::create_secret(config.local_secret.as_str())?;
let routers = config let routers = &config
.routers .routers
.iter() .iter()
.cloned() .cloned()
...@@ -118,44 +118,41 @@ fn main() -> Result<()> { ...@@ -118,44 +118,41 @@ fn main() -> Result<()> {
.filter(|&r| r.config.schema == TCP && r.config.dst_port == 0) .filter(|&r| r.config.schema == TCP && r.config.dst_port == 0)
.unique_by(|r| r.config.src_port) .unique_by(|r| r.config.src_port)
{ {
s.spawn(|_| { s.spawn(|s| {
// accept 出错直接 panic // accept 出错直接 panic
loop { loop {
let (connection, _) = router.socket.accept().unwrap(); let (connection, _) = router.socket.accept().unwrap();
thread::scope(|s| { s.spawn(move |_| {
s.spawn(|_| { connection.set_tcp_nodelay(true).unwrap();
connection.set_tcp_nodelay(true).unwrap();
let mut meta_bytes = [MaybeUninit::uninit(); META_SIZE]; let mut meta_bytes = [MaybeUninit::uninit(); META_SIZE];
Router::recv_exact_tcp(&connection, &mut meta_bytes).unwrap(); Router::recv_exact_tcp(&connection, &mut meta_bytes).unwrap();
let meta: &Meta = Meta::from_bytes(&meta_bytes); let meta: &Meta = Meta::from_bytes(&meta_bytes);
if meta.reversed == 0 if meta.reversed == 0
&& meta.dst_id == config.local_id && meta.dst_id == config.local_id
&& let Some(router) = routers.get(&meta.src_id) && let Some(router) = routers.get(&meta.src_id)
{ {
let connection = Arc::new(connection); let connection = Arc::new(connection);
// tcp listener 只许一个连接,过来新连接就把前一个关掉。 // tcp listener 只许一个连接,过来新连接就把前一个关掉。
{ {
let guard = pin(); let guard = pin();
let new_shared = Owned::new(connection.clone()).into_shared(&guard); let new_shared = Owned::new(connection.clone()).into_shared(&guard);
let old_shared = router.tcp_listener_connection.swap(new_shared, Ordering::Release, &guard); let old_shared = router.tcp_listener_connection.swap(new_shared, Ordering::Release, &guard);
unsafe { unsafe {
if let Some(old) = old_shared.as_ref() { if let Some(old) = old_shared.as_ref() {
let _ = old.shutdown(Shutdown::Both); let _ = old.shutdown(Shutdown::Both);
}
guard.defer_destroy(old_shared)
} }
guard.defer_destroy(old_shared)
} }
let _ = thread::scope(|s| {
s.spawn(|_| router.handle_outbound_tcp(&connection));
s.spawn(|_| router.handle_inbound_tcp(&connection, &local_secret));
});
} }
});
}) let _ = thread::scope(|s| {
.unwrap(); s.spawn(|_| router.handle_outbound_tcp(&connection));
s.spawn(|_| router.handle_inbound_tcp(&connection, &local_secret));
});
}
});
} }
}); });
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment