Commit 7d4d292d authored by nanamicat's avatar nanamicat

tcp

parent 2117fd01
......@@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "anyhow"
version = "1.0.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
[[package]]
name = "async-channel"
version = "2.3.1"
......@@ -154,6 +160,12 @@ version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "event-listener"
version = "5.3.1"
......@@ -286,6 +298,15 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.14"
......@@ -506,9 +527,11 @@ dependencies = [
name = "tun1"
version = "0.1.0"
dependencies = [
"anyhow",
"base64",
"crossbeam",
"crossbeam-utils",
"itertools",
"libc",
"serde",
"serde_json",
......
......@@ -12,3 +12,5 @@ base64 = "0.22.1"
crossbeam = "0.8.4"
crossbeam-utils = "0.8.21"
libc = "0.2.178"
itertools = "0.14.0"
anyhow = "1.0.100"
max_width = 160
\ No newline at end of file
mod router;
use crate::router::{Router, SECRET_LENGTH};
use crate::router::{Router, META_SIZE, SECRET_LENGTH};
use crate::Schema::{IP, TCP, UDP};
use anyhow::{bail, ensure, Context, Result};
use crossbeam_utils::thread;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::io::{Read, Write};
use std::mem::MaybeUninit;
use std::mem::{size_of, transmute};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use itertools::Itertools;
use serde::{Deserialize, Deserializer};
use socket2::Domain;
use std::net::Shutdown;
use std::time::Duration;
use std::{
collections::HashMap,
env,
mem::MaybeUninit,
sync::{atomic::Ordering, Arc},
};
#[repr(C)]
pub struct Meta {
......@@ -18,16 +23,13 @@ pub struct Meta {
pub reversed: u16,
}
use serde::{Deserialize, Deserializer};
use socket2::{Domain, Socket};
#[derive(Deserialize)]
pub struct Config {
pub local_id: u8,
pub local_secret: String,
pub routers: Vec<ConfigRouter>,
}
#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct ConfigRouter {
pub remote_id: u8,
#[serde(default)]
......@@ -47,13 +49,12 @@ pub struct ConfigRouter {
pub up: String,
}
#[derive(Deserialize, Default)]
#[derive(Deserialize, Default, PartialEq, Clone, Copy)]
pub enum Schema {
#[default]
IP,
UDP,
TCP,
FakeTCP,
}
fn deserialize_domain<'de, D>(d: D) -> Result<Domain, D::Error>
......@@ -67,94 +68,177 @@ where
}
}
fn main() -> Result<(), Box<dyn Error>> {
fn main() -> Result<()> {
println!("Starting");
let config: Config = serde_json::from_str(env::args().nth(1).ok_or("need param")?.as_str())?;
let config = Arc::new(serde_json::from_str::<Config>(env::args().nth(1).context("need param")?.as_str())?);
let local_secret: [u8; SECRET_LENGTH] = Router::create_secret(config.local_secret.as_str())?;
let mut udp_groups: HashMap<u16, (Arc<Socket>, Vec<u8>)> = HashMap::new();
let routers: Vec<Router> = config
let routers = Arc::new(
config
.routers
.into_iter()
.map(|c| Router::new(c, config.local_id, &mut udp_groups))
.collect::<Result<Vec<_>, _>>()?;
for (socket, group) in udp_groups.values() {
Router::attach_filter_udp(socket, group, config.local_id)?;
.iter()
.cloned()
.sorted_by_key(|r| r.remote_id)
.map(|c| {
let remote_id = c.remote_id;
Router::new(c, config.local_id).map(|r| (remote_id, Arc::new(r)))
})
.collect::<Result<HashMap<u8, Arc<Router>>, _>>()?,
);
for (_, group) in &routers
.values()
.filter(|r| r.config.schema == UDP && r.config.src_port != 0)
.chunk_by(|r| r.config.src_port)
{
Router::attach_filter_udp(group.sorted_by_key(|r| r.config.remote_id).collect(), config.local_id)?;
}
println!("created tuns");
const META_SIZE: usize = size_of::<Meta>();
const TCP_RECONNECT: u64 = 10;
thread::scope(|s| {
for router in routers {
let (mut reader, mut writer) = router.split();
s.spawn(move |_| {
// IP, UDP
for router in routers.values().filter(|&r| !(r.config.schema != TCP)) {
s.spawn(|_| {
let mut buffer = [0u8; 1500];
// Pre-initialize with our Meta header (local -> remote)
let meta = Meta {
src_id: config.local_id,
dst_id: reader.config.remote_id,
dst_id: router.config.remote_id,
reversed: 0,
};
// Turn the Meta struct into bytes
let meta_bytes: &[u8; META_SIZE] =
unsafe { &*(&meta as *const Meta as *const [u8; META_SIZE]) };
buffer[..META_SIZE].copy_from_slice(meta_bytes);
buffer[..META_SIZE].copy_from_slice(unsafe { &*(&meta as *const Meta as *const [u8; META_SIZE]) });
loop {
let n = reader.tun_reader.read(&mut buffer[META_SIZE..]).unwrap();
let n = router.tun.recv(&mut buffer[META_SIZE..]).unwrap(); // recv 失败直接 panic
let guard = crossbeam::epoch::pin();
let shared = reader.endpoint.load(Ordering::Acquire, &guard);
if let Some(addr) = unsafe { shared.as_ref() } {
reader.encrypt(&mut buffer[META_SIZE..META_SIZE + n]);
let _ = reader.socket.send_to(&buffer[..META_SIZE + n], addr);
let endpoint_ref = router.endpoint.load(Ordering::Relaxed, &guard);
if let Some(endpoint) = unsafe { endpoint_ref.as_ref() } {
router.encrypt(&mut buffer[META_SIZE..META_SIZE + n]);
let _ = router.socket.send_to(&buffer[..META_SIZE + n], endpoint);
}
}
});
s.spawn(move |_| {
s.spawn(|_| {
let mut recv_buf = [MaybeUninit::uninit(); 1500];
loop {
let _ = (|| {
let (len, addr) = writer.socket.recv_from(&mut recv_buf).unwrap();
let packet: &mut [u8] = unsafe { transmute(&mut recv_buf[..len]) };
let _ = (|| -> Result<()> {
// 收到一个非法报文只丢弃一个报文
let (len, addr) = { router.socket.recv_from(&mut recv_buf).unwrap() }; // recv 出错直接 panic
let packet: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(recv_buf.as_mut_ptr() as *mut u8, len) };
// if addr.is_ipv6() { println!("{:X?}", packet) }
// 只有 ipv4 raw 会给 IP报头
let offset =
if addr.is_ipv4() && addr.as_socket_ipv4().ok_or("?")?.port() == 0 {
let offset = if router.config.family == Domain::IPV4 && router.config.schema == IP {
(packet[0] & 0x0f) as usize * 4
} else {
0
} + META_SIZE;
{
let guard = crossbeam::epoch::pin();
let current_shared = writer.endpoint.load(Ordering::SeqCst, &guard);
let is_same = unsafe { current_shared.as_ref() }
.map(|c| *c == addr)
.unwrap_or(false);
let current_shared = router.endpoint.load(Ordering::Relaxed, &guard);
let is_same = unsafe { current_shared.as_ref() }.map(|c| *c == addr).unwrap_or(false);
if !is_same {
let new_shared = crossbeam::epoch::Owned::new(addr).into_shared(&guard);
let old_shared =
writer.endpoint.swap(new_shared, Ordering::SeqCst, &guard);
unsafe {
guard.defer_destroy(old_shared);
let old_shared = router.endpoint.swap(new_shared, Ordering::Release, &guard);
unsafe { guard.defer_destroy(old_shared) }
}
}
let payload = &mut packet[offset..];
writer.decrypt(payload, &local_secret);
writer.tun_writer.write_all(payload)?;
writer.last_activity = std::time::Instant::now();
Ok::<(), Box<dyn Error>>(())
router.decrypt(payload, &local_secret);
router.tun.send(payload)?;
Ok(())
})();
}
});
}
for router in routers.values().filter(|&r| r.config.schema == TCP && r.config.dst_port != 0) {
s.spawn(|_| {
loop {
if let Ok(connection) = router.tcp_connect(config.local_id) {
let _ = handle_tcp(router, connection, &local_secret);
}
std::thread::sleep(Duration::from_secs(TCP_RECONNECT));
}
});
}
// tcp listeners
for router in routers
.values()
.filter(|&r| r.config.schema == TCP && r.config.dst_port == 0)
.unique_by(|r| r.config.src_port)
{
s.spawn(|_| {
// listen 或 accept 出错直接 panic
let listener = router.tcp_listen().unwrap();
loop {
let (connection, _) = listener.accept().unwrap();
let _ = (|| -> Result<()> {
// 为了写起来方便,每个 tcp connection 有两秒钟时间发送握手报文,如果没收到就关闭连接再响应下一个
// 正常的连接是 fast open 的,期望会瞬间连好
connection.set_read_timeout(Option::from(Duration::from_secs(2)))?;
let mut meta_bytes = [MaybeUninit::uninit(); META_SIZE];
Router::recv_exact(&connection, &mut meta_bytes)?;
let meta: &Meta = unsafe { &*(meta_bytes.as_ptr() as *const Meta) };
if meta.reversed == 0
&& meta.dst_id == config.local_id
&& let Some(router) = routers.get(&meta.src_id)
{
connection.set_read_timeout(None)?;
handle_tcp(router, connection, &local_secret)?;
}
Ok(())
})();
}
});
}
})
.map_err(|_| anyhow::anyhow!("Thread panicked"))?;
Ok(())
}
fn handle_tcp(router: &Arc<Router>, connection: socket2::Socket, local_secret: &[u8; 32]) -> Result<()> {
thread::scope(|s| {
s.spawn(|_| {
let _ = (|| -> Result<()> {
let mut buffer = [0u8; 1500];
loop {
let n = router.tun.recv(&mut buffer)?;
router.encrypt(&mut buffer[..n]);
Router::send_all(&connection, &buffer[..n])?;
}
})();
let _ = connection.shutdown(Shutdown::Both);
});
s.spawn(|_| {
let _ = (|| -> Result<()> {
let mut buf = [MaybeUninit::uninit(); 1500];
let packet: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr() as *mut u8, buf.len()) };
loop {
Router::recv_exact(&connection, &mut buf[0..6])?;
router.decrypt2(packet, &local_secret, 0..6);
let version = packet[0] >> 4;
let total_len = match version {
4 => u16::from_be_bytes([packet[2], packet[3]]) as usize,
6 => u16::from_be_bytes([packet[4], packet[5]]) as usize + 40,
_ => bail!("Invalid IP version"),
};
ensure!(6 < total_len && total_len <= buf.len(), "Invalid total length");
Router::recv_exact(&connection, &mut buf[6..total_len])?;
router.decrypt2(packet, &local_secret, 6..total_len);
router.tun.send(&packet[..total_len])?;
}
})();
let _ = connection.shutdown(Shutdown::Both);
});
})
.map_err(|_| "Thread panicked")?;
.map_err(|_| anyhow::anyhow!("Thread panicked"))?;
Ok(())
}
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use socket2::{Domain, Protocol, SockAddr, SockFilter, Socket, Type};
use std::collections::HashMap;
use std::ffi::c_void;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, ToSocketAddrs};
use std::os::fd::AsRawFd;
use std::process::{Command, ExitStatus};
use std::sync::Arc;
use tun::{Reader, Writer};
pub const SECRET_LENGTH: usize = 32;
use crate::{ConfigRouter, Meta, Schema};
use crossbeam::epoch::Atomic;
use libc::{
sock_filter, sock_fprog, socklen_t, BPF_ABS, BPF_B, BPF_IND, BPF_JEQ, BPF_JMP, BPF_K, BPF_LD, BPF_LDX,
BPF_MSH, BPF_RET, BPF_W, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF,
use std::{
ffi::c_void,
mem::MaybeUninit,
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
ops::Range,
os::fd::{AsRawFd, FromRawFd},
process::{Command, ExitStatus},
sync::atomic::Ordering,
sync::Arc,
};
// tun -> raw
pub struct RouterReader {
pub config: ConfigRouter,
pub secret: [u8; SECRET_LENGTH],
pub tun_reader: Reader,
pub socket: Arc<Socket>,
pub endpoint: Arc<Atomic<SockAddr>>,
}
impl RouterReader {
pub(crate) fn encrypt(&self, data: &mut [u8]) {
for (i, b) in data.iter_mut().enumerate() {
*b ^= self.secret[i % SECRET_LENGTH];
}
}
}
use crate::{ConfigRouter, Meta, Schema};
use anyhow::{bail, ensure, Result};
use tun::Device;
// raw -> tun
pub struct RouterWriter {
pub tun_writer: Writer,
pub socket: Arc<Socket>,
pub endpoint: Arc<Atomic<SockAddr>>,
pub last_activity: std::time::Instant,
}
use crossbeam::epoch::Atomic;
use libc::{sock_filter, sock_fprog, socklen_t, BPF_ABS, BPF_B, BPF_IND, BPF_JEQ, BPF_JMP, BPF_K, BPF_LD, BPF_LDX, BPF_MSH, BPF_RET, BPF_W, MSG_FASTOPEN, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF};
impl RouterWriter {
pub(crate) fn decrypt(&self, data: &mut [u8], secret: &[u8; SECRET_LENGTH]) {
for (i, b) in data.iter_mut().enumerate() {
*b ^= secret[i % SECRET_LENGTH];
}
}
}
pub const SECRET_LENGTH: usize = 32;
pub const META_SIZE: usize = size_of::<Meta>();
pub struct Router {
pub config: ConfigRouter,
pub secret: [u8; SECRET_LENGTH],
pub tun_reader: Reader,
pub tun_writer: Writer,
pub tun: Device,
pub socket: Arc<Socket>,
pub endpoint: Arc<Atomic<SockAddr>>,
}
impl Router {
pub(crate) fn create_secret(
config: &str,
) -> Result<[u8; SECRET_LENGTH], Box<dyn std::error::Error>> {
pub(crate) fn create_secret(config: &str) -> Result<[u8; SECRET_LENGTH]> {
let mut secret = [0u8; SECRET_LENGTH];
let decoded = BASE64_STANDARD.decode(config)?;
let len = decoded.len().min(SECRET_LENGTH);
......@@ -69,73 +39,47 @@ impl Router {
Ok(secret)
}
fn create_socket(
config: &ConfigRouter,
local_id: u8,
groups: &mut HashMap<u16, (Arc<Socket>, Vec<u8>)>,
) -> Result<Arc<Socket>, Box<dyn std::error::Error>> {
pub(crate) fn decrypt(&self, data: &mut [u8], secret: &[u8; SECRET_LENGTH]) {
for (i, b) in data.iter_mut().enumerate() {
*b ^= secret[i % SECRET_LENGTH];
}
}
pub(crate) fn decrypt2(&self, data: &mut [u8], secret: &[u8; SECRET_LENGTH], range: Range<usize>) {
for i in range {
data[i] ^= secret[i % SECRET_LENGTH];
}
}
pub(crate) fn encrypt(&self, data: &mut [u8]) {
for (i, b) in data.iter_mut().enumerate() {
*b ^= self.secret[i % SECRET_LENGTH];
}
}
pub fn create_socket(config: &ConfigRouter, local_id: u8) -> Result<Arc<Socket>> {
match config.schema {
Schema::IP => {
let result = Socket::new(
config.family,
Type::RAW,
Some(Protocol::from(config.proto as i32)),
)?;
#[cfg(target_os = "linux")]
let result = Socket::new(config.family, Type::RAW, Some(Protocol::from(config.proto as i32)))?;
result.set_mark(config.mark)?;
Self::attach_filter_raw(config, local_id, &result)?;
Ok(Arc::new(result))
}
Schema::UDP => {
let result = Socket::new(config.family, Type::DGRAM, Some(Protocol::UDP))?;
result.set_mark(config.mark)?;
if config.src_port != 0 {
result.set_reuse_port(true)?;
let addr = match config.family {
Domain::IPV4 => SockAddr::from(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
config.src_port,
)),
Domain::IPV6 => SockAddr::from(SocketAddrV6::new(
Ipv6Addr::UNSPECIFIED,
config.src_port,
0,
0,
)),
_ => return Err("unsupported family".into()),
};
let addr = Self::bind_addr(config);
result.bind(&addr)?;
let result1 = Arc::new(result);
match groups.get_mut(&config.src_port) {
None => {
groups
.insert(config.src_port, (result1.clone(), vec![config.remote_id]));
}
Some((_, group)) => {
group.push(config.remote_id);
}
}
Ok(result1)
} else {
Ok(Arc::new(result))
}
}
Schema::TCP => {
let result = Socket::new(config.family, Type::STREAM, Some(Protocol::TCP))?;
Ok(Arc::new(result))
}
Schema::FakeTCP => {
let result = Socket::new(config.family, Type::STREAM, Some(Protocol::TCP))?;
Ok(Arc::new(result))
}
// TCP 不需要一开始创建 socket,在运行时管理
Schema::TCP => Ok(Arc::new(unsafe { Socket::from_raw_fd(0) })),
}
}
fn attach_filter_raw(
config: &ConfigRouter,
local_id: u8,
socket: &Socket,
) -> Result<(), Box<dyn std::error::Error>> {
fn attach_filter_raw(config: &ConfigRouter, local_id: u8, socket: &Socket) -> Result<()> {
// 由于多个对端可能会使用相同的 ipprpto 号,这里确保每个 socket 上只会收到自己对应的对端发来的消息
const META_SIZE: usize = size_of::<Meta>();
let meta = Meta {
......@@ -143,8 +87,7 @@ impl Router {
dst_id: local_id,
reversed: 0,
};
let meta_bytes: [u8; META_SIZE] =
unsafe { *(&meta as *const Meta as *const [u8; META_SIZE]) };
let meta_bytes: [u8; META_SIZE] = unsafe { *(&meta as *const Meta as *const [u8; META_SIZE]) };
let value = u32::from_be_bytes(meta_bytes);
// 如果你的协议是 UDP,这里必须是 8 (跳过 UDP 头: SrcPort(2)+DstPort(2)+Len(2)+Sum(2))
......@@ -174,29 +117,40 @@ impl Router {
// 【拒绝 (False 路径)】
bpf_stmt(BPF_RET | BPF_K, 0),
],
_ => Err("unsupported family")?,
_ => bail!("unsupported family"),
};
socket.attach_filter(filters)?;
Ok(())
}
pub fn attach_filter_udp(
socket: &Arc<Socket>,
group: &Vec<u8>,
local_id: u8,
) -> Result<(), Box<dyn std::error::Error>> {
pub(crate) fn recv_exact(sock: &Socket, mut buf: &mut [MaybeUninit<u8>]) -> Result<()> {
while !buf.is_empty() {
let n = sock.recv(buf)?;
ensure!(n != 0, std::io::ErrorKind::UnexpectedEof);
buf = &mut buf[n..];
}
Ok(())
}
pub(crate) fn send_all(sock: &Socket, mut buf: &[u8]) -> Result<()> {
while !buf.is_empty() {
let n = sock.send(buf)?;
buf = &buf[n..];
}
Ok(())
}
pub fn attach_filter_udp(group: Vec<&Arc<Router>>, local_id: u8) -> Result<()> {
let values: Vec<u32> = group
.iter()
.map(|&f| {
const META_SIZE: usize = size_of::<Meta>();
let meta = Meta {
src_id: f,
src_id: f.config.remote_id,
dst_id: local_id,
reversed: 0,
};
let meta_bytes: [u8; META_SIZE] =
unsafe { *(&meta as *const Meta as *const [u8; META_SIZE]) };
let meta_bytes: [u8; META_SIZE] = unsafe { *(&meta as *const Meta as *const [u8; META_SIZE]) };
u32::from_be_bytes(meta_bytes)
})
.collect();
......@@ -212,14 +166,11 @@ impl Router {
}
// If no match found after all comparisons, drop the packet
filters.push(bpf_stmt(BPF_RET | BPF_K, u32::MAX));
Self::attach_reuseport_cbpf(socket, &mut filters)?;
Self::attach_reuseport_cbpf(&group[0].socket, &mut filters)?;
Ok(())
}
fn attach_reuseport_cbpf(
sock: &Arc<Socket>,
code: &mut [SockFilter],
) -> Result<(), Box<dyn std::error::Error>> {
fn attach_reuseport_cbpf(sock: &Socket, code: &mut [SockFilter]) -> Result<()> {
let prog = sock_fprog {
len: code.len() as u16,
filter: code.as_mut_ptr() as *mut sock_filter,
......@@ -243,17 +194,15 @@ impl Router {
Ok(())
}
fn create_tun_device(
config: &ConfigRouter,
) -> Result<(Reader, Writer), Box<dyn std::error::Error>> {
fn create_tun_device(config: &ConfigRouter) -> Result<Device> {
let mut tun_config = tun::Configuration::default();
tun_config.tun_name(config.dev.as_str()).up();
let dev = tun::create(&tun_config)?;
Ok(dev.split())
Ok(dev)
}
fn run_up_script(config: &ConfigRouter) -> std::io::Result<ExitStatus> {
Command::new("sh").args(["-c", config.up.as_str()]).status()
fn run_up_script(config: &ConfigRouter) -> Result<ExitStatus> {
Ok(Command::new("sh").args(["-c", config.up.as_str()]).status()?)
}
fn create_endpoint(config: &ConfigRouter) -> Arc<Atomic<SockAddr>> {
......@@ -274,46 +223,50 @@ impl Router {
Arc::new(addr)
}
pub fn new(
config: ConfigRouter,
local_id: u8,
udp_count: &mut HashMap<u16, (Arc<Socket>, Vec<u8>)>,
) -> Result<Router, Box<dyn std::error::Error>> {
let secret = Self::create_secret(config.remote_secret.as_str())?;
let endpoint = Self::create_endpoint(&config);
let socket = Self::create_socket(&config, local_id, udp_count)?;
let (tun_reader, tun_writer) = Self::create_tun_device(&config)?;
Self::run_up_script(&config)?;
pub fn new(config: ConfigRouter, local_id: u8) -> Result<Router> {
let router = Router {
secret: Self::create_secret(config.remote_secret.as_str())?,
tun: Self::create_tun_device(&config)?,
endpoint: Self::create_endpoint(&config),
socket: Self::create_socket(&config, local_id)?,
config,
secret,
endpoint,
tun_reader,
tun_writer,
socket,
};
Self::run_up_script(&router.config)?;
Ok(router)
}
pub fn split(self) -> (RouterReader, RouterWriter) {
let writer = RouterWriter {
endpoint: self.endpoint.clone(),
tun_writer: self.tun_writer,
socket: self.socket.clone(),
last_activity: std::time::Instant::now(),
};
pub fn tcp_listen(&self) -> Result<Socket> {
let result = Socket::new(self.config.family, Type::STREAM, Some(Protocol::TCP))?;
let addr = Router::bind_addr(&self.config);
result.bind(&addr)?;
result.listen(100)?;
Ok(result)
}
pub fn tcp_connect(&self, local_id: u8) -> Result<Socket> {
let result = Socket::new(self.config.family, Type::STREAM, Some(Protocol::TCP))?;
result.set_mark(self.config.mark)?;
let guard = crossbeam::epoch::pin();
let endpoint_ref = self.endpoint.load(Ordering::Relaxed, &guard);
let endpoint = unsafe { endpoint_ref.as_ref() }.ok_or_else(|| anyhow::anyhow!("endpoint info load failed"))?;
let reader = RouterReader {
config: self.config,
secret: self.secret,
endpoint: self.endpoint,
tun_reader: self.tun_reader,
socket: self.socket,
let meta = Meta {
src_id: local_id,
dst_id: self.config.remote_id,
reversed: 0,
};
let buf = unsafe { std::slice::from_raw_parts(&meta as *const Meta as *const u8, META_SIZE) };
result.send_to_with_flags(buf, endpoint, MSG_FASTOPEN)?;
Ok(result)
}
(reader, writer)
fn bind_addr(config: &ConfigRouter) -> SockAddr {
match config.family {
Domain::IPV4 => SockAddr::from(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.src_port)),
Domain::IPV6 => SockAddr::from(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.src_port, 0, 0)),
_ => panic!("unsupported family"),
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment