Commit 8c80b752 authored by Chunchi Che's avatar Chunchi Che Committed by GitHub

Merge pull request #5 from DarkNeos/channel

use channel
parents b5b9fe61 512f8de6
...@@ -6,7 +6,7 @@ import ( ...@@ -6,7 +6,7 @@ import (
"log" "log"
"net" "net"
"net/http" "net/http"
"sync" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
darkneos "github.com/sktt1ryze/ygopro-proxy/DarkNeos" darkneos "github.com/sktt1ryze/ygopro-proxy/DarkNeos"
...@@ -16,6 +16,7 @@ const TARGET_PORT = ":8000" ...@@ -16,6 +16,7 @@ const TARGET_PORT = ":8000"
const PROXY_PORT = ":3344" const PROXY_PORT = ":3344"
const CHANNEL_SIZE = 0x1000 const CHANNEL_SIZE = 0x1000
const BUFFER_SIZE = 0x1000 const BUFFER_SIZE = 0x1000
const TIME_OUT = 5
var upgrader = websocket.Upgrader{ var upgrader = websocket.Upgrader{
ReadBufferSize: 0x1000, ReadBufferSize: 0x1000,
...@@ -23,7 +24,7 @@ var upgrader = websocket.Upgrader{ ...@@ -23,7 +24,7 @@ var upgrader = websocket.Upgrader{
} }
func ygoEndpoint(w http.ResponseWriter, r *http.Request) { func ygoEndpoint(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup defer log.Println("ygoEndpoint finished")
upgrader.CheckOrigin = wsChecker upgrader.CheckOrigin = wsChecker
...@@ -31,81 +32,137 @@ func ygoEndpoint(w http.ResponseWriter, r *http.Request) { ...@@ -31,81 +32,137 @@ func ygoEndpoint(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer ws.Close()
log.Println("Websocket connected") log.Println("Connection to ws://localhost" + TARGET_PORT + " [websocket] succeeded!")
tcp, err := net.Dial("tcp", "127.0.0.1"+PROXY_PORT) tcp, err := net.Dial("tcp", "127.0.0.1"+PROXY_PORT)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
log.Println("Tcp connected") log.Println("Connection to " + "12.0.0.1" + PROXY_PORT + " [tcp] succeeded!")
defer tcp.Close() wsCh := make(chan []byte, CHANNEL_SIZE)
tcpCh := make(chan []byte, CHANNEL_SIZE)
wsStopCh := make(chan bool, CHANNEL_SIZE)
tcpStopCh := make(chan bool, CHANNEL_SIZE)
defer func() {
wsStopCh <- true
tcpStopCh <- true
close(wsStopCh)
close(tcpStopCh)
}()
go wsProxy(ws, wsCh, wsStopCh)
go tcpProxy(tcp, tcpCh, tcpStopCh)
for {
select {
case wsBuf, ok := <-wsCh:
if !ok {
return
}
if _, err = tcp.Write(wsBuf); err != nil {
log.Println(err)
return
}
case tcpBuf, ok := <-tcpCh:
if !ok {
return
}
wg.Add(2) if err = ws.WriteMessage(websocket.BinaryMessage, tcpBuf); err != nil {
go wsProxy(ws, &tcp, &wg) log.Println(err)
go tcpProxy(&tcp, ws, &wg) return
wg.Wait() }
}
}
} }
func wsProxy(ws *websocket.Conn, tcp *net.Conn, wg *sync.WaitGroup) { // todo: generic
defer wg.Done() func wsProxy(ws *websocket.Conn, Ch chan<- []byte, stopCh <-chan bool) {
defer ws.Close()
defer close(Ch)
for { for {
select {
case _, ok := <-stopCh:
log.Println("wsProxy recv stop singal, exit. channel closed: ", ok)
return
default:
// if err := ws.SetReadDeadline(time.Now().Add(time.Second * TIME_OUT)); err != nil {
// log.Println(err)
// return
// }
messageType, buffer, err := ws.ReadMessage() messageType, buffer, err := ws.ReadMessage()
if err != nil { if err != nil {
// if err, ok := err.(net.Error); ok && err.Timeout() {
// continue
// }
log.Println(err) log.Println(err)
break return
} }
if messageType == websocket.CloseMessage { if messageType == websocket.CloseMessage {
log.Println("Websocket closed") log.Println("Websocket closed")
break return
} }
buffer, err = darkneos.Transform(buffer, darkneos.ProtobufToRawBuf) buffer, err = darkneos.Transform(buffer, darkneos.ProtobufToRawBuf)
if err != nil { if err != nil {
log.Fatal(err) log.Println(err)
break return
} }
_, err = (*tcp).Write(buffer) Ch <- buffer
if err != nil {
log.Fatal(err)
break
} }
} }
} }
func tcpProxy(tcp *net.Conn, ws *websocket.Conn, wg *sync.WaitGroup) { func tcpProxy(tcp net.Conn, Ch chan<- []byte, stopCh <-chan bool) {
defer wg.Done() defer tcp.Close()
defer close(Ch)
reader := bufio.NewReader(*tcp) reader := bufio.NewReader(tcp)
buffer := make([]byte, BUFFER_SIZE) buffer := make([]byte, BUFFER_SIZE)
for { for {
select {
case _, ok := <-stopCh:
log.Println("tcpProxy recv stop singal, exit. channel closed: ", ok)
return
default:
if err := tcp.SetReadDeadline(time.Now().Add(time.Second * TIME_OUT)); err != nil {
log.Println(err)
return
}
_, err := reader.Read(buffer) _, err := reader.Read(buffer)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
continue continue
} }
log.Println("Tcp read message error: ", err) if err, ok := err.(net.Error); ok && err.Timeout() {
break continue
}
log.Println(err)
return
} }
buffer, err = darkneos.Transform(buffer, darkneos.RawBufToProtobuf) buffer, err = darkneos.Transform(buffer, darkneos.RawBufToProtobuf)
if err != nil { if err != nil {
log.Fatal(err) log.Println(err)
break return
} }
err = ws.WriteMessage(websocket.BinaryMessage, buffer) Ch <- buffer
if err != nil {
log.Fatal(err)
break
} }
} }
} }
...@@ -117,6 +174,8 @@ func setupRoutes() { ...@@ -117,6 +174,8 @@ func setupRoutes() {
} }
func main() { func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile | log.Llongfile)
setupRoutes() setupRoutes()
log.Fatal(http.ListenAndServe(TARGET_PORT, nil)) log.Fatal(http.ListenAndServe(TARGET_PORT, nil))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment