Skip to content

Commit

Permalink
Merge pull request #3 from piaodazhu/dev
Browse files Browse the repository at this point in the history
suport rejecting unexpected user in concurrent control; improve servi…
  • Loading branch information
piaodazhu authored Aug 18, 2023
2 parents 8619b53 + fd833cc commit de59a5e
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 48 deletions.
2 changes: 1 addition & 1 deletion TODO
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
- [ ] Code Report
- [ ] Benchmark
- [x] Register arguments: maxConn, maxServeNum, maxServeTime
- [ ] Active user status
- [x] Active user status
6 changes: 5 additions & 1 deletion example/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func main() {
Name: "redis",
Message: "redis kv",
},
proxylite.ControlInfo{},
proxylite.ControlInfo{
MaxServeConn: 2,
MaxServeCount: 4,
MaxServeTime: 600,
},
))

log.Print(client.AvaliablePorts())
Expand Down
11 changes: 11 additions & 0 deletions example/discovery/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module discovery

go 1.12

// require github.com/piaodazhu/proxylite v0.0.0-20230813091435-b2c8f474a9fc
replace github.com/piaodazhu/proxylite => ../../

require (
github.com/piaodazhu/proxylite v0.0.0-00010101000000-000000000000
github.com/tidwall/pretty v1.2.1
)
24 changes: 24 additions & 0 deletions example/discovery/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
18 changes: 18 additions & 0 deletions example/discovery/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
"encoding/json"
"fmt"

"github.com/piaodazhu/proxylite"
"github.com/tidwall/pretty"
)

func main() {
infos, err := proxylite.DiscoverServices(":9933")
if err != nil {
panic(err)
}
raw, _ := json.Marshal(infos)
fmt.Println(string(pretty.Pretty(raw)))
}
11 changes: 8 additions & 3 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type RegisterInfo struct {

// ControlInfo Register Controlling information
type ControlInfo struct {
MaxServeTime uint32
MaxServeConn uint32
MaxServeTime uint32
MaxServeConn uint32
MaxServeCount uint32
}

Expand Down Expand Up @@ -73,8 +73,13 @@ type ServiceInfo struct {
Port uint32
Name string
Message string
Busy bool
Birth time.Time

Online uint32 // online user count
Capacity uint32 // max concurrency user count
AlreadyServe uint32 // already served user number
TotalServe uint32 // total user number can serve
DeadLine time.Time // time to close this port
}

// AskServiceRsp Service discovery response
Expand Down
18 changes: 2 additions & 16 deletions proxylite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ func TestMultiplexMaxCountControl(t *testing.T) {
default:
t.Error("unexpected continue")
}
t.Log(len(recordmsg))
if len(recordmsg) != 20 {
t.Error("count control error")
}
Expand Down Expand Up @@ -505,20 +504,7 @@ func TestMultiplexMaxConnControl(t *testing.T) {
default:
}

length := len(recordmsg)
if length != 30 {
t.Fatal("not all user is served")
}
records := []int{}
for i := 0; i < length; i++ {
x := <-recordmsg
records = append(records, x)
}
lastItem := records[len(records)-1]
for i := length - 1; i >= length-10; i-- {
if records[i]/100 != lastItem/100 {
t.Error("fail to concurrency control")
break
}
if len(recordmsg) != 20 {
t.Error("count control error")
}
}
83 changes: 56 additions & 27 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
sem "golang.org/x/sync/semaphore"
)

type tunnel struct {
empty bool
busy bool
birth time.Time
empty bool
// busy bool
// birth time.Time
innerConn *net.Conn
info *RegisterInfo
ctrl *ControlInfo
// info *RegisterInfo
service *ServiceInfo
ctrl *ControlInfo
}

// ProxyLiteServer Public server that forwards traffic between user and inner client.
Expand Down Expand Up @@ -150,15 +152,9 @@ func (s *ProxyLiteServer) handleAskService(conn net.Conn, req *AskServiceReq) er

for _, t := range s.used {
if !t.empty &&
len(t.info.Name) >= len(req.Prefix) &&
t.info.Name[:len(req.Prefix)] == req.Prefix {
Services = append(Services, ServiceInfo{
Port: t.info.OuterPort,
Name: t.info.Name,
Message: t.info.Message,
Busy: t.busy,
Birth: t.birth,
})
len(t.service.Name) >= len(req.Prefix) &&
t.service.Name[:len(req.Prefix)] == req.Prefix {
Services = append(Services, *t.service)
}
}
s.lock.RUnlock()
Expand Down Expand Up @@ -186,24 +182,43 @@ func (s *ProxyLiteServer) handleRegisterService(conn net.Conn, req *RegisterServ
if !found {
tn = &tunnel{
empty: false,
busy: false,
birth: time.Now(),
innerConn: &conn,
info: &req.Info,
ctrl: &req.Ctrl,
}
s.used[req.Info.OuterPort] = tn
} else if tn.empty { // reuse
tn.empty = false
tn.busy = false
tn.birth = time.Now()
tn.innerConn = &conn
tn.info = &req.Info
tn.ctrl = &req.Ctrl
} else {
return s.registerServiceResponse(conn, req, RegisterRspPortOccupied)
}

tn.service = &ServiceInfo{
Port: req.Info.OuterPort,
Name: req.Info.Name,
Message: req.Info.Message,
Birth: time.Now(),
}

if req.Ctrl.MaxServeConn != 0 {
tn.service.Capacity = req.Ctrl.MaxServeConn
} else {
tn.service.Capacity = 10000
}

if req.Ctrl.MaxServeCount != 0 {
tn.service.TotalServe = req.Ctrl.MaxServeCount
} else {
tn.service.TotalServe = 100000
}

if req.Ctrl.MaxServeTime != 0 {
tn.service.DeadLine = time.Now().Add(time.Second * time.Duration(req.Ctrl.MaxServeTime))
} else {
tn.service.DeadLine = time.Now().Add(time.Hour * 24 * 7 * 365)
}

s.registerServiceResponse(conn, req, RegisterRspOK)
go s.startTunnel(tn)
return nil
Expand Down Expand Up @@ -247,10 +262,10 @@ func (s *ProxyLiteServer) startTunnel(tn *tunnel) {
var totalOut, totalIn uint64

// Now, register is OK, we want to map outer port to the inner client's socket
s.logTunnelMessage(tn.info.Name, "REGISTER", fmt.Sprintf("New inner client register port %d ok. Listening for outer client...", tn.info.OuterPort))
s.logTunnelMessage(tn.service.Name, "REGISTER", fmt.Sprintf("New inner client register port %d ok. Listening for outer client...", tn.service.Port))

// First, listen on registered outer port.
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tn.info.OuterPort))
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tn.service.Port))
if err != nil {
(*tn.innerConn).Close()
return
Expand All @@ -261,7 +276,7 @@ func (s *ProxyLiteServer) startTunnel(tn *tunnel) {
(*tn.innerConn).Close()
listener.Close()
binder.closeAll()
s.logTunnelMessage(tn.info.Name, "CLOSE", fmt.Sprintf("tunnel closed, port=%d, Out %dB, In %dB", tn.info.OuterPort, totalOut, totalIn))
s.logTunnelMessage(tn.service.Name, "CLOSE", fmt.Sprintf("tunnel closed, port=%d, Out %dB, In %dB", tn.service.Port, totalOut, totalIn))
}

// if MaxServeTime is set, close all conn after MaxServeTime s
Expand Down Expand Up @@ -294,7 +309,7 @@ func (s *ProxyLiteServer) startTunnel(tn *tunnel) {
// make sure tunnel closed and tn set empty before this function return
defer func() {
if err := recover(); err != nil {
s.logTunnelMessage(tn.info.Name, "PANIC", fmt.Sprint("tunnel panic, err=", err))
s.logTunnelMessage(tn.service.Name, "PANIC", fmt.Sprint("tunnel panic, err=", err))
}
if timeoutTimer != nil && !timeoutTimer.Stop() {
select { // timer may not be triggered. If so we drain it out.
Expand Down Expand Up @@ -374,20 +389,32 @@ func (s *ProxyLiteServer) startTunnel(tn *tunnel) {
}
if binder.freeUidIfExists(GenConnId(&outerConn)) {
outerConn.Close()
s.logTunnelMessage(tn.info.Name, "FINISH", fmt.Sprintf("in->out finish: %v, [%v]", outerConn.RemoteAddr(), err))
s.logTunnelMessage(tn.service.Name, "FINISH", fmt.Sprintf("in->out finish: %v, [%v]", outerConn.RemoteAddr(), err))
}
concurrency.Release(1)
// if MaxServeCount is defined and the last one user leave
if finiteServeCount && !leaving.TryAcquire(1) {
doOnce.Do(closeTunnel)
lastFinish.Done() // can finish
}
atomic.AddUint32(&tn.service.Online, ^uint32(0)) // sub 1
}

// accept new user and alloc uid and start send loop
for {
// Have no timeout. Just block.
concurrency.Acquire(context.Background(), 1)
// concurrent control
if !concurrency.TryAcquire(1) {
// if online user count reach maxConn, step into this branch
// we close listener to 'reject' unexpected users
listener.Close()
// Then block until at lease one user finish his session.
concurrency.Acquire(context.Background(), 1)
// Then we recover the listener
listener, err = net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", tn.service.Port))
if err != nil {
break
}
}
if finiteServeCount {
// All users have been come.
if !comming.TryAcquire(1) {
Expand All @@ -401,6 +428,8 @@ func (s *ProxyLiteServer) startTunnel(tn *tunnel) {
}
connId := GenConnId(&conn)
if uid, ok := binder.allocUid(connId, &conn); ok {
atomic.AddUint32(&tn.service.AlreadyServe, 1)
atomic.AddUint32(&tn.service.Online, 1)
go readFromUser(conn, connId, uid)
} else {
conn.Close()
Expand Down

0 comments on commit de59a5e

Please sign in to comment.