Commit 74904457 authored by 七海千秋's avatar 七海千秋 Committed by GitHub

Merge pull request #1 from nanamicat/codetector_patch

bugfix: multi threading
parents 38b5d2bf edba2c50
...@@ -83,24 +83,50 @@ fn main() -> Result<()> { ...@@ -83,24 +83,50 @@ fn main() -> Result<()> {
&& let Some(router) = routers.get(&meta.src_id) && let Some(router) = routers.get(&meta.src_id)
&& meta.dst_id == router.config.local_id && meta.dst_id == router.config.local_id
{ {
let connection = Arc::new(connection); // let connection = Arc::new(connection);
// tcp listener 只许一个连接,过来新连接就把前一个关掉。 // tcp listener 只许一个连接,过来新连接就把前一个关掉。
{ {
let guard = pin(); let guard = pin();
let new_shared = Owned::new(connection.clone()).into_shared(&guard); let new_shared = Owned::new(connection).into_shared(&guard);
let old_shared = router.tcp_listener_connection.swap(new_shared, Ordering::Release, &guard); let old_shared = router.tcp_listener_connection.swap(new_shared, Ordering::AcqRel, &guard);
unsafe { //
if let Some(old) = old_shared.as_ref() { // SAFETY: this is guaranteed to still point to valid connection because
let _ = old.shutdown(Shutdown::Both); // the guard is swapped with AcqRel so we are for sure tracked by the pin
// list
//
if let Some(old) = unsafe { old_shared.as_ref() } {
let _ = old.shutdown(Shutdown::Both);
// SAFETY: At this point old_shared is guaranteed
// to be non-null (above if let checks that)
// And since it is already swapped out of the
// `tcp_listener_connection` no other thread
// should have access to it.
unsafe {
guard.defer_destroy(old_shared);
} }
guard.defer_destroy(old_shared)
} }
} }
let _ = thread::scope(|s| { let _ = thread::scope(|s| {
s.spawn(|_| router.handle_outbound_tcp(&connection)); s.spawn(|_| {
s.spawn(|_| router.handle_inbound_tcp(&connection)); let guard = pin();
let shared = router.tcp_listener_connection.load(Ordering::Acquire, &guard);
// SAFETY: tcp_listener_connection shoud always either point to null or some valid thing
if let Some(connection) = unsafe { shared.as_ref() } {
router.handle_outbound_tcp(connection);
}
});
s.spawn(|_| {
let guard = pin();
let shared = router.tcp_listener_connection.load(Ordering::Acquire, &guard);
// SAFETY: tcp_listener_connection shoud always either point to null or some valid thing
if let Some(connection) = unsafe { shared.as_ref() } {
router.handle_inbound_tcp(&connection);
}
});
}); });
} }
}); });
......
...@@ -45,7 +45,7 @@ pub struct Router { ...@@ -45,7 +45,7 @@ pub struct Router {
pub socket: Socket, pub socket: Socket,
pub endpoint: Atomic<SockAddr>, pub endpoint: Atomic<SockAddr>,
pub tcp_listener_connection: Atomic<Arc<Socket>>, pub tcp_listener_connection: Atomic<Socket>,
} }
impl Router { impl Router {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment