Commit 4b856214 authored by ginuerzh's avatar ginuerzh

update udp transparent proxy

parent bd9fc764
...@@ -610,7 +610,7 @@ func (l *tcpRemoteForwardListener) Close() error { ...@@ -610,7 +610,7 @@ func (l *tcpRemoteForwardListener) Close() error {
type udpRemoteForwardListener struct { type udpRemoteForwardListener struct {
addr net.Addr addr net.Addr
chain *Chain chain *Chain
connMap udpConnMap connMap *udpConnMap
connChan chan net.Conn connChan chan net.Conn
ln *net.UDPConn ln *net.UDPConn
errChan chan error errChan chan error
...@@ -640,6 +640,7 @@ func UDPRemoteForwardListener(addr string, chain *Chain, cfg *UDPListenConfig) ( ...@@ -640,6 +640,7 @@ func UDPRemoteForwardListener(addr string, chain *Chain, cfg *UDPListenConfig) (
ln := &udpRemoteForwardListener{ ln := &udpRemoteForwardListener{
addr: laddr, addr: laddr,
chain: chain, chain: chain,
connMap: new(udpConnMap),
connChan: make(chan net.Conn, backlog), connChan: make(chan net.Conn, backlog),
errChan: make(chan error, 1), errChan: make(chan error, 1),
closed: make(chan struct{}), closed: make(chan struct{}),
......
...@@ -6,7 +6,9 @@ import ( ...@@ -6,7 +6,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"syscall" "syscall"
"time"
"github.com/LiamHaworth/go-tproxy" "github.com/LiamHaworth/go-tproxy"
"github.com/go-log/log" "github.com/go-log/log"
...@@ -121,17 +123,15 @@ func (h *udpRedirectHandler) Init(options ...HandlerOption) { ...@@ -121,17 +123,15 @@ func (h *udpRedirectHandler) Init(options ...HandlerOption) {
} }
} }
func (h *udpRedirectHandler) Handle(c net.Conn) { func (h *udpRedirectHandler) Handle(conn net.Conn) {
defer c.Close() defer conn.Close()
conn, ok := c.(*udpRedirectServerConn) raddr, ok := conn.LocalAddr().(*net.UDPAddr)
if !ok { if !ok {
log.Log("wrong connection type") log.Log("[red-udp] wrong connection type")
return return
} }
raddr := conn.DstAddr()
var cc net.Conn var cc net.Conn
var err error var err error
if h.options.Chain.IsEmpty() { if h.options.Chain.IsEmpty() {
...@@ -167,11 +167,8 @@ func (h *udpRedirectHandler) Handle(c net.Conn) { ...@@ -167,11 +167,8 @@ func (h *udpRedirectHandler) Handle(c net.Conn) {
} }
type udpRedirectListener struct { type udpRedirectListener struct {
ln *net.UDPConn *net.UDPConn
connChan chan net.Conn config *UDPListenConfig
errChan chan error
connMap udpConnMap
config *UDPListenConfig
} }
// UDPRedirectListener creates a Listener for UDP transparent proxy server. // UDPRedirectListener creates a Listener for UDP transparent proxy server.
...@@ -189,103 +186,72 @@ func UDPRedirectListener(addr string, cfg *UDPListenConfig) (Listener, error) { ...@@ -189,103 +186,72 @@ func UDPRedirectListener(addr string, cfg *UDPListenConfig) (Listener, error) {
if cfg == nil { if cfg == nil {
cfg = &UDPListenConfig{} cfg = &UDPListenConfig{}
} }
return &udpRedirectListener{
UDPConn: ln,
config: cfg,
}, nil
}
backlog := cfg.Backlog func (l *udpRedirectListener) Accept() (conn net.Conn, err error) {
if backlog <= 0 { b := make([]byte, mediumBufferSize)
backlog = defaultBacklog
}
l := &udpRedirectListener{ n, raddr, dstAddr, err := tproxy.ReadFromUDP(l.UDPConn, b)
ln: ln, if err != nil {
connChan: make(chan net.Conn, backlog), log.Logf("[red-udp] %s : %s", l.Addr(), err)
errChan: make(chan error, 1), return
config: cfg,
} }
go l.listenLoop() log.Logf("[red-udp] %s: %s -> %s", l.Addr(), raddr, dstAddr)
return l, nil
}
func (l *udpRedirectListener) listenLoop() { c, err := tproxy.DialUDP("udp", dstAddr, raddr)
for { if err != nil {
b := make([]byte, mediumBufferSize) log.Logf("[red-udp] %s -> %s : %s", raddr, dstAddr, err)
n, raddr, dstAddr, err := tproxy.ReadFromUDP(l.ln, b) return
if err != nil { }
log.Logf("[red-udp] peer -> %s : %s", l.Addr(), err)
l.Close()
l.errChan <- err
close(l.errChan)
return
}
conn, ok := l.connMap.Get(raddr.String())
if !ok {
conn = newUDPServerConn(l.ln, raddr, &udpServerConnConfig{
ttl: l.config.TTL,
qsize: l.config.QueueSize,
onClose: func() {
l.connMap.Delete(raddr.String())
log.Logf("[red-udp] %s closed (%d)", raddr, l.connMap.Size())
},
})
cc := udpRedirectServerConn{
udpServerConn: conn,
dstAddr: dstAddr,
}
select {
case l.connChan <- cc:
l.connMap.Set(raddr.String(), conn)
log.Logf("[red-udp] %s -> %s (%d)", raddr, l.Addr(), l.connMap.Size())
default:
conn.Close()
log.Logf("[red-udp] %s - %s: connection queue is full (%d)",
raddr, l.Addr(), cap(l.connChan))
}
}
select { ttl := l.config.TTL
case conn.rChan <- b[:n]: if ttl <= 0 {
if Debug { ttl = defaultTTL
log.Logf("[red-udp] %s >>> %s : length %d", raddr, l.Addr(), n)
}
default:
log.Logf("[red-udp] %s -> %s : recv queue is full (%d)",
raddr, l.Addr(), cap(conn.rChan))
}
} }
}
func (l *udpRedirectListener) Accept() (conn net.Conn, err error) { conn = &udpRedirectServerConn{
var ok bool Conn: c,
select { buf: b[:n],
case conn = <-l.connChan: ttl: ttl,
case err, ok = <-l.errChan:
if !ok {
err = errors.New("accpet on closed listener")
}
} }
return return
} }
func (l *udpRedirectListener) Addr() net.Addr { func (l *udpRedirectListener) Addr() net.Addr {
return l.ln.LocalAddr() return l.UDPConn.LocalAddr()
} }
func (l *udpRedirectListener) Close() error { type udpRedirectServerConn struct {
err := l.ln.Close() net.Conn
l.connMap.Range(func(k interface{}, v *udpServerConn) bool { buf []byte
v.Close() ttl time.Duration
return true once sync.Once
})
return err
} }
type udpRedirectServerConn struct { func (c *udpRedirectServerConn) Read(b []byte) (n int, err error) {
*udpServerConn if c.ttl > 0 {
dstAddr *net.UDPAddr c.SetReadDeadline(time.Now().Add(c.ttl))
defer c.SetReadDeadline(time.Time{})
}
c.once.Do(func() {
n = copy(b, c.buf)
c.buf = nil
})
if n == 0 {
n, err = c.Conn.Read(b)
}
return
} }
func (c *udpRedirectServerConn) DstAddr() *net.UDPAddr { func (c *udpRedirectServerConn) Write(b []byte) (n int, err error) {
return c.dstAddr if c.ttl > 0 {
c.SetWriteDeadline(time.Now().Add(c.ttl))
defer c.SetWriteDeadline(time.Time{})
}
return c.Conn.Write(b)
} }
...@@ -54,7 +54,7 @@ type udpListener struct { ...@@ -54,7 +54,7 @@ type udpListener struct {
ln net.PacketConn ln net.PacketConn
connChan chan net.Conn connChan chan net.Conn
errChan chan error errChan chan error
connMap udpConnMap connMap *udpConnMap
config *UDPListenConfig config *UDPListenConfig
} }
...@@ -82,6 +82,7 @@ func UDPListener(addr string, cfg *UDPListenConfig) (Listener, error) { ...@@ -82,6 +82,7 @@ func UDPListener(addr string, cfg *UDPListenConfig) (Listener, error) {
ln: ln, ln: ln,
connChan: make(chan net.Conn, backlog), connChan: make(chan net.Conn, backlog),
errChan: make(chan error, 1), errChan: make(chan error, 1),
connMap: new(udpConnMap),
config: cfg, config: cfg,
} }
go l.listenLoop() go l.listenLoop()
...@@ -159,8 +160,8 @@ func (l *udpListener) Close() error { ...@@ -159,8 +160,8 @@ func (l *udpListener) Close() error {
} }
type udpConnMap struct { type udpConnMap struct {
m sync.Map
size int64 size int64
m sync.Map
} }
func (m *udpConnMap) Get(key interface{}) (conn *udpServerConn, ok bool) { func (m *udpConnMap) Get(key interface{}) (conn *udpServerConn, ok bool) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment