diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f2b66fadc4..16f682d76a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -115,10 +115,10 @@ jobs: working-directory: ./be1-go steps: - - name: Use go >= 1.19 + - name: Use go >= 1.21 uses: actions/setup-go@v3 with: - go-version: ">=1.19" + go-version: ">=1.21" - name: Setup repo uses: actions/checkout@v3 diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml index affccafc7c..232effb1a6 100644 --- a/.github/workflows/deploy.yaml +++ b/.github/workflows/deploy.yaml @@ -91,10 +91,10 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Use go >= 1.19 + - name: Use go >= 1.21 uses: actions/setup-go@v3 with: - go-version: ">=1.19" + go-version: ">=1.21" - name: build run: | diff --git a/.github/workflows/karate_be1-go.yaml b/.github/workflows/karate_be1-go.yaml index 18337a7b71..42cf6e7572 100644 --- a/.github/workflows/karate_be1-go.yaml +++ b/.github/workflows/karate_be1-go.yaml @@ -11,10 +11,10 @@ jobs: runs-on: ubuntu-latest steps: - - name: Use go >= 1.19 + - name: Use go >= 1.21 uses: actions/setup-go@v3 with: - go-version: ">=1.19" + go-version: ">=1.21" - name: Setup repo uses: actions/checkout@v3 diff --git a/be1-go/.gitignore b/be1-go/.gitignore index 96c0aa4fe9..c99d6a4e40 100644 --- a/be1-go/.gitignore +++ b/be1-go/.gitignore @@ -4,4 +4,6 @@ report.json coverage.out coverage.html configServer1.json -configServer2.json \ No newline at end of file +configServer2.json +configServer3.json +configServer4.json \ No newline at end of file diff --git a/be1-go/channel/lao/mod.go b/be1-go/channel/lao/mod.go index 58e73bde74..6394c52d06 100644 --- a/be1-go/channel/lao/mod.go +++ b/be1-go/channel/lao/mod.go @@ -702,12 +702,12 @@ func (c *Channel) createAndSendLAOGreet() error { return xerrors.Errorf("failed to marshal the organizer key: %v", err) } - peers := []messagedata.Peer{} + peersInfo := c.hub.GetPeersInfo() - for _, info := range c.hub.GetPeersInfo() { - peers = append(peers, messagedata.Peer{ - Address: info.ClientAddress, - }) + peers := make([]messagedata.Peer, 0, len(peersInfo)) + + for _, info := range peersInfo { + peers = append(peers, messagedata.Peer{Address: info.ClientAddress}) } msgData := messagedata.LaoGreet{ diff --git a/be1-go/configServer2.json b/be1-go/configServer2.json index 5238986b74..327ec28b77 100644 --- a/be1-go/configServer2.json +++ b/be1-go/configServer2.json @@ -11,4 +11,4 @@ "other-servers": [ "localhost:9001" ] -} \ No newline at end of file +} diff --git a/be1-go/go.mod b/be1-go/go.mod index f74e63ae84..1d52d3f62e 100644 --- a/be1-go/go.mod +++ b/be1-go/go.mod @@ -1,6 +1,6 @@ module popstellar -go 1.19 +go 1.21 require ( github.com/aaronarduino/goqrsvg v0.0.0-20220419053939-17e843f1dd40 @@ -20,7 +20,7 @@ require ( github.com/zitadel/oidc/v2 v2.1.2 go.dedis.ch/kyber/v3 v3.0.13 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 - golang.org/x/sync v0.1.0 + golang.org/x/sync v0.4.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 gopkg.in/yaml.v2 v2.2.3 ) @@ -38,11 +38,11 @@ require ( github.com/russross/blackfriday/v2 v2.0.1 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect go.dedis.ch/fixbuf v1.0.3 // indirect - golang.org/x/crypto v0.7.0 // indirect - golang.org/x/net v0.8.0 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.16.0 // indirect golang.org/x/oauth2 v0.6.0 // indirect - golang.org/x/sys v0.6.0 // indirect - golang.org/x/text v0.8.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.29.1 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect diff --git a/be1-go/go.sum b/be1-go/go.sum index 1498a8b6c6..30c5019d86 100644 --- a/be1-go/go.sum +++ b/be1-go/go.sum @@ -21,13 +21,16 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/schema v1.2.0 h1:YufUaxZYCKGFuAq3c96BOhjgd5nmXiOY9NGzF247Tsc= @@ -91,8 +94,8 @@ golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= -golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -102,15 +105,15 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= +golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190124100055-b90733256f2e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -120,14 +123,14 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= @@ -146,6 +149,7 @@ google.golang.org/protobuf v1.29.1 h1:7QBf+IK2gx70Ap/hDsOmam3GE0v9HicjfEdAxE62Uo google.golang.org/protobuf v1.29.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.3 h1:fvjTMHxHEw/mxHbtzPi3JCcKXQRAnQTBRo6YCJSVHKI= diff --git a/be1-go/hub/standard_hub/hub_state/Channels.go b/be1-go/hub/standard_hub/hub_state/Channels.go new file mode 100644 index 0000000000..29c3a0c95d --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/Channels.go @@ -0,0 +1,24 @@ +package hub_state + +import "popstellar/channel" + +// Channels stores channel ids with their corresponding channels +type Channels struct { + ThreadSafeMap[string, channel.Channel] +} + +// NewChannelsMap creates a new Channels structure +func NewChannelsMap() Channels { + return Channels{ + ThreadSafeMap: NewThreadSafeMap[string, channel.Channel](), + } +} + +// ForEach iterates over all channels and applies the given function +func (c *Channels) ForEach(f func(channel.Channel)) { + c.Lock() + defer c.Unlock() + for _, channel := range c.table { + f(channel) + } +} diff --git a/be1-go/hub/standard_hub/hub_state/MessageIds.go b/be1-go/hub/standard_hub/hub_state/MessageIds.go new file mode 100644 index 0000000000..ebd495c03a --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/MessageIds.go @@ -0,0 +1,39 @@ +package hub_state + +import ( + "golang.org/x/exp/slices" +) + +// MessageIds stores a channel id with its corresponding message ids +type MessageIds struct { + ThreadSafeMap[string, []string] +} + +// NewMessageIdsMap creates a new MessageIds structure +func NewMessageIdsMap() MessageIds { + return MessageIds{ + ThreadSafeMap: NewThreadSafeMap[string, []string](), + } +} + +// Add adds a message id to the slice of message ids of the channel +func (i *MessageIds) Add(channel string, id string) { + i.Lock() + defer i.Unlock() + messageIds, channelStored := i.table[channel] + if !channelStored { + i.table[channel] = append(i.table[channel], id) + return + } + alreadyStoredId := slices.Contains(messageIds, id) + if !alreadyStoredId { + i.table[channel] = append(i.table[channel], id) + } +} + +// AddAll adds a slice of message ids to the slice of message ids of the channel +func (i *MessageIds) AddAll(channel string, ids []string) { + i.Lock() + defer i.Unlock() + i.table[channel] = append(i.table[channel], ids...) +} diff --git a/be1-go/hub/standard_hub/hub_state/Peers.go b/be1-go/hub/standard_hub/hub_state/Peers.go new file mode 100644 index 0000000000..9950b50f33 --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/Peers.go @@ -0,0 +1,60 @@ +package hub_state + +import ( + "popstellar/message/query/method" + "sync" + + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" +) + +// Peers stores the peers' information +type Peers struct { + sync.RWMutex + // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID + peersInfo map[string]method.ServerInfo + // peersGreeted stores the peers that were greeted by the socket ID + peersGreeted map[string]struct{} +} + +// NewPeers creates a new Peers structure +func NewPeers() Peers { + return Peers{ + peersInfo: make(map[string]method.ServerInfo), + peersGreeted: make(map[string]struct{}), + } +} + +// AddPeerInfo adds a peer's info to the table +func (p *Peers) AddPeerInfo(socketId string, info method.ServerInfo) { + p.Lock() + defer p.Unlock() + p.peersInfo[socketId] = info +} + +// AddPeerGreeted adds a peer's socket ID to the slice of peers greeted +func (p *Peers) AddPeerGreeted(socketId string) { + p.Lock() + defer p.Unlock() + p.peersGreeted[socketId] = struct{}{} +} + +// GetAllPeersInfo returns a copy of the peers' info slice +func (p *Peers) GetAllPeersInfo() []method.ServerInfo { + p.RLock() + defer p.RUnlock() + peersInfo := make([]method.ServerInfo, 0, len(p.peersInfo)) + for _, info := range p.peersInfo { + if !slices.Contains(peersInfo, info) { + peersInfo = append(peersInfo, info) + } + } + return peersInfo +} + +// IsPeerGreeted returns true if the peer was greeted, otherwise it returns false +func (p *Peers) IsPeerGreeted(socketId string) bool { + p.RLock() + defer p.RUnlock() + return slices.Contains(maps.Keys(p.peersGreeted), socketId) +} diff --git a/be1-go/hub/standard_hub/hub_state/Queries.go b/be1-go/hub/standard_hub/hub_state/Queries.go new file mode 100644 index 0000000000..48aaed1707 --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/Queries.go @@ -0,0 +1,78 @@ +package hub_state + +import ( + "popstellar/message/query/method" + "sync" + + "golang.org/x/xerrors" +) + +// Queries let the hub remember all queries that it sent to other servers +type Queries struct { + sync.Mutex + // state stores the ID of the server's queries and their state. False for a + // query not yet answered, else true. + state map[int]bool + // getMessagesByIdQueries stores the server's getMessagesByIds queries by their ID. + getMessagesByIdQueries map[int]method.GetMessagesById + // nextID store the ID of the next query + nextID int +} + +// NewQueries creates a new queries struct +func NewQueries() Queries { + return Queries{ + state: make(map[int]bool), + getMessagesByIdQueries: make(map[int]method.GetMessagesById), + } +} + +// GetQueryState returns a given query's state +func (q *Queries) GetQueryState(id int) (bool, error) { + q.Lock() + defer q.Unlock() + + state, ok := q.state[id] + if !ok { + return false, xerrors.Errorf("query with id %d not found", id) + } + return state, nil +} + +// GetNextID returns the next query ID +func (q *Queries) GetNextID() int { + q.Lock() + defer q.Unlock() + + id := q.nextID + q.nextID++ + return id +} + +// SetQueryReceived sets the state of the query with the given ID as received +func (q *Queries) SetQueryReceived(id int) error { + q.Lock() + defer q.Unlock() + + currentState, ok := q.state[id] + + if !ok { + return xerrors.Errorf("query with id %d not found", id) + } + + if currentState { + return xerrors.Errorf("query with id %d already answered", id) + } + + q.state[id] = true + return nil +} + +// AddQuery adds the given query to the table +func (q *Queries) AddQuery(id int, query method.GetMessagesById) { + q.Lock() + defer q.Unlock() + + q.getMessagesByIdQueries[id] = query + q.state[id] = false +} diff --git a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go new file mode 100644 index 0000000000..46f1c070a8 --- /dev/null +++ b/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go @@ -0,0 +1,44 @@ +package hub_state + +import "sync" + +type ThreadSafeMap[K comparable, V any] struct { + sync.RWMutex + table map[K]V +} + +func NewThreadSafeMap[K comparable, V any]() ThreadSafeMap[K, V] { + return ThreadSafeMap[K, V]{ + table: make(map[K]V), + } +} + +func (i *ThreadSafeMap[K, V]) Get(key K) (V, bool) { + i.RLock() + defer i.RUnlock() + val, ok := i.table[key] + return val, ok +} + +func (i *ThreadSafeMap[K, V]) Set(key K, val V) { + i.Lock() + defer i.Unlock() + i.table[key] = val +} + +// GetTable returns a shallow copy of the map +func (i *ThreadSafeMap[K, V]) GetTable() map[K]V { + i.RLock() + defer i.RUnlock() + tableCopy := make(map[K]V, len(i.table)) + for key, val := range i.table { + tableCopy[key] = val + } + return tableCopy +} + +func (i *ThreadSafeMap[K, V]) IsEmpty() bool { + i.RLock() + defer i.RUnlock() + return len(i.table) == 0 +} diff --git a/be1-go/hub/standard_hub/message_handling.go b/be1-go/hub/standard_hub/message_handling.go index e465427dd4..94fcbb681d 100644 --- a/be1-go/hub/standard_hub/message_handling.go +++ b/be1-go/hub/standard_hub/message_handling.go @@ -3,7 +3,6 @@ package standard_hub import ( "encoding/base64" "encoding/json" - "github.com/rs/zerolog/log" "popstellar/crypto" jsonrpc "popstellar/message" "popstellar/message/answer" @@ -14,6 +13,8 @@ import ( "popstellar/network/socket" "popstellar/validation" + "github.com/rs/zerolog/log" + "go.dedis.ch/kyber/v3/sign/schnorr" "golang.org/x/exp/slices" @@ -78,7 +79,7 @@ func (h *Hub) handleRootChannelPublishMessage(sock socket.Socket, publish method h.rootInbox.StoreMessage(publish.Params.Message) h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) return nil } @@ -143,7 +144,7 @@ func (h *Hub) handleRootChannelBroadcastMessage(sock socket.Socket, h.rootInbox.StoreMessage(broadcast.Params.Message) h.hubInbox.StoreMessage(broadcast.Params.Message) - h.addMessageId(broadcast.Params.Channel, broadcast.Params.Message.MessageID) + h.messageIdsByChannel.Add(broadcast.Params.Channel, broadcast.Params.Message.MessageID) return nil } @@ -189,22 +190,11 @@ func (h *Hub) handleAnswer(senderSocket socket.Socket, byteMessage []byte) error return nil } - h.queries.Lock() - - val := h.queries.state[*answerMsg.ID] - if val == nil { - h.queries.Unlock() - return xerrors.Errorf("no query sent with id %v", answerMsg.ID) - } - - if *val { - h.queries.Unlock() - return xerrors.Errorf("query %v already got an answer", answerMsg.ID) + err = h.queries.SetQueryReceived(*answerMsg.ID) + if err != nil { + return xerrors.Errorf("failed to set query state: %v", err) } - *h.queries.state[*answerMsg.ID] = true - h.queries.Unlock() - err = h.handleGetMessagesByIdAnswer(senderSocket, answerMsg) if err != nil { return err @@ -275,7 +265,7 @@ func (h *Hub) handlePublish(socket socket.Socket, byteMessage []byte) (int, erro return publish.ID, err } h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) return publish.ID, nil } @@ -295,7 +285,7 @@ func (h *Hub) handlePublish(socket socket.Socket, byteMessage []byte) (int, erro } h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) return publish.ID, nil } @@ -318,16 +308,13 @@ func (h *Hub) handleBroadcast(socket socket.Socket, byteMessage []byte) error { expectedMessageID, messageID) } - h.Lock() _, ok := h.hubInbox.GetMessage(broadcast.Params.Message.MessageID) if ok { h.log.Info().Msg("message was already received") return nil } h.hubInbox.StoreMessage(broadcast.Params.Message) - h.addMessageId(broadcast.Params.Channel, broadcast.Params.Message.MessageID) - - h.Unlock() + h.messageIdsByChannel.Add(broadcast.Params.Channel, broadcast.Params.Message.MessageID) if err != nil { return xerrors.Errorf("failed to broadcast message: %v", err) @@ -435,7 +422,7 @@ func (h *Hub) handleHeartbeat(socket socket.Socket, receivedIds := heartbeat.Params - missingIds := getMissingIds(receivedIds, h.messageIdsByChannel, h.blacklist) + missingIds := getMissingIds(receivedIds, h.messageIdsByChannel.GetTable(), h.blacklist) if len(missingIds) > 0 { err = h.sendGetMessagesByIdToServer(socket, missingIds) @@ -473,16 +460,12 @@ func (h *Hub) handleGreetServer(socket socket.Socket, byteMessage []byte) error return xerrors.Errorf("failed to unmarshal greetServer message: %v", err) } - h.Lock() // store information about the server - h.peersInfo[socket.ID()] = greetServer.Params + h.peers.AddPeerInfo(socket.ID(), greetServer.Params) - // check if the server is already greeted - if slices.Contains(h.peersGreeted, socket.ID()) { - h.Unlock() + if h.peers.IsPeerGreeted(socket.ID()) { return nil } - h.Unlock() err = h.SendGreetServer(socket) if err != nil { @@ -535,7 +518,6 @@ func (h *Hub) getMissingMessages(missingIds map[string][]string) (map[string][]m // handleReceivedMessage handle a message obtained by the server receiving a // getMessagesById result func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Message, targetChannel string) error { - h.Lock() signature := messageData.Signature messageID := messageData.MessageID data := messageData.Data @@ -543,7 +525,6 @@ func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Me expectedMessageID := messagedata.Hash(data, signature) if expectedMessageID != messageID { - h.Unlock() return xerrors.Errorf(wrongMessageIdError, expectedMessageID, messageID) } @@ -567,10 +548,8 @@ func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Me _, stored := h.hubInbox.GetMessage(publish.Params.Message.MessageID) if stored { h.log.Info().Msgf("Already stored message %s", publish.Params.Message.MessageID) - h.Unlock() return nil } - h.Unlock() if publish.Params.Channel == rootChannel { err := h.handleRootChannelPublishMessage(socket, publish) @@ -590,10 +569,8 @@ func (h *Hub) handleReceivedMessage(socket socket.Socket, messageData message.Me return xerrors.Errorf(publishError, err) } - h.Lock() h.hubInbox.StoreMessage(publish.Params.Message) - h.addMessageId(publish.Params.Channel, publish.Params.Message.MessageID) - h.Unlock() + h.messageIdsByChannel.Add(publish.Params.Channel, publish.Params.Message.MessageID) return nil } diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/mod.go index 76478906a2..116fec80c3 100644 --- a/be1-go/hub/standard_hub/mod.go +++ b/be1-go/hub/standard_hub/mod.go @@ -4,9 +4,9 @@ import ( "context" "encoding/base64" "encoding/json" - "golang.org/x/exp/slices" "popstellar/channel" "popstellar/crypto" + state "popstellar/hub/standard_hub/hub_state" "popstellar/inbox" jsonrpc "popstellar/message" "popstellar/message/answer" @@ -58,7 +58,7 @@ type Hub struct { messageChan chan socket.IncomingMessage sync.RWMutex - channelByID map[string]channel.Channel + channelByID state.Channels closedSockets chan string @@ -84,17 +84,14 @@ type Hub struct { // rootInbox and queries are used to help servers catchup to each other rootInbox inbox.Inbox - queries queries + queries state.Queries // messageIdsByChannel stores all the message ids and the corresponding channel ids // to help servers determine in which channel the message ids go - messageIdsByChannel map[string][]string + messageIdsByChannel state.MessageIds - // peersInfo stores the info of the peers: public key, client and server endpoints associated with the socket ID - peersInfo map[string]method.ServerInfo - - // peersGreeted stores the peers that were greeted by the socket ID - peersGreeted []string + // peers stores information about the peers + peers state.Peers // blacklist stores the IDs of the messages that failed to be processed by the hub // the server will not ask for them again in the heartbeat @@ -103,26 +100,6 @@ type Hub struct { blacklist []string } -// newQueries creates a new queries struct -func newQueries() queries { - return queries{ - state: make(map[int]*bool), - getMessagesByIdQueries: make(map[int]method.GetMessagesById), - } -} - -// queries let the hub remember all queries that it sent to other servers -type queries struct { - sync.Mutex - // state stores the ID of the server's queries and their state. False for a - // query not yet answered, else true. - state map[int]*bool - // getMessagesByIdQueries stores the server's getMessagesByIds queries by their ID. - getMessagesByIdQueries map[int]method.GetMessagesById - // nextID store the ID of the next query - nextID int -} - // NewHub returns a new Hub. func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAddress string, log zerolog.Logger, laoFac channel.LaoFactory) (*Hub, error) { @@ -140,7 +117,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd clientServerAddress: clientServerAddress, serverServerAddress: serverServerAddress, messageChan: make(chan socket.IncomingMessage), - channelByID: make(map[string]channel.Channel), + channelByID: state.NewChannelsMap(), closedSockets: make(chan string), pubKeyOwner: pubKeyOwner, pubKeyServ: pubServ, @@ -153,10 +130,9 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd serverSockets: channel.NewSockets(), hubInbox: *inbox.NewInbox(rootChannel), rootInbox: *inbox.NewInbox(rootChannel), - queries: newQueries(), - messageIdsByChannel: make(map[string][]string), - peersInfo: make(map[string]method.ServerInfo), - peersGreeted: make([]string, 0), + queries: state.NewQueries(), + messageIdsByChannel: state.NewMessageIdsMap(), + peers: state.NewPeers(), blacklist: make([]string, 0), } @@ -198,12 +174,10 @@ func (h *Hub) Start() { } }() case id := <-h.closedSockets: - h.RLock() - for _, channel := range h.channelByID { + h.channelByID.ForEach(func(c channel.Channel) { // dummy Unsubscribe message because it's only used for logging... - channel.Unsubscribe(id, method.Unsubscribe{}) - } - h.RUnlock() + c.Unsubscribe(id, method.Unsubscribe{}) + }) case <-h.stop: h.log.Info().Msg("stopping the hub") return @@ -264,9 +238,6 @@ func (h *Hub) OnSocketClose() chan<- string { // SendGreetServer implements hub.Hub func (h *Hub) SendGreetServer(socket socket.Socket) error { - h.Lock() - defer h.Unlock() - pk, err := h.pubKeyServ.MarshalBinary() if err != nil { return xerrors.Errorf("failed to marshal server public key: %v", err) @@ -295,7 +266,7 @@ func (h *Hub) SendGreetServer(socket socket.Socket) error { socket.Send(buf) - h.peersGreeted = append(h.peersGreeted, socket.ID()) + h.peers.AddPeerGreeted(socket.ID()) return nil } @@ -304,10 +275,7 @@ func (h *Hub) getChan(channelPath string) (channel.Channel, error) { return nil, xerrors.Errorf("channel not prefixed with '%s': %q", rootPrefix, channelPath) } - h.RLock() - defer h.RUnlock() - - channel, ok := h.channelByID[channelPath] + channel, ok := h.channelByID.Get(channelPath) if !ok { return nil, xerrors.Errorf("channel %s does not exist", channelPath) } @@ -495,12 +463,7 @@ func (h *Hub) handleIncomingMessage(incomingMessage *socket.IncomingMessage) err // sendGetMessagesByIdToServer sends a getMessagesById message to a server func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[string][]string) error { - h.Lock() - defer h.Unlock() - - queryId := h.queries.nextID - baseValue := false - h.queries.state[queryId] = &baseValue + queryId := h.queries.GetNextID() getMessagesById := method.GetMessagesById{ Base: query.Base{ @@ -518,36 +481,33 @@ func (h *Hub) sendGetMessagesByIdToServer(socket socket.Socket, missingIds map[s return xerrors.Errorf("failed to marshal getMessagesById query: %v", err) } - h.queries.getMessagesByIdQueries[queryId] = getMessagesById - h.queries.nextID++ - socket.Send(buf) + h.queries.AddQuery(queryId, getMessagesById) + return nil } // sendHeartbeatToServers sends a heartbeat message to all servers func (h *Hub) sendHeartbeatToServers() { - h.Lock() - defer h.Unlock() - if len(h.messageIdsByChannel) > 0 { - heartbeatMessage := method.Heartbeat{ - Base: query.Base{ - JSONRPCBase: jsonrpc.JSONRPCBase{ - JSONRPC: "2.0", - }, - Method: "heartbeat", + if h.messageIdsByChannel.IsEmpty() { + return + } + heartbeatMessage := method.Heartbeat{ + Base: query.Base{ + JSONRPCBase: jsonrpc.JSONRPCBase{ + JSONRPC: "2.0", }, - Params: h.messageIdsByChannel, - } - - buf, err := json.Marshal(heartbeatMessage) - if err != nil { - h.log.Err(err).Msg("Failed to marshal and send heartbeat query") - } + Method: "heartbeat", + }, + Params: h.messageIdsByChannel.GetTable(), + } - h.serverSockets.SendToAll(buf) + buf, err := json.Marshal(heartbeatMessage) + if err != nil { + h.log.Err(err).Msg("Failed to marshal and send heartbeat query") } + h.serverSockets.SendToAll(buf) } // createLao creates a new LAO using the data in the publish parameter. @@ -556,7 +516,7 @@ func (h *Hub) createLao(msg message.Message, laoCreate messagedata.LaoCreate, laoChannelPath := rootPrefix + laoCreate.ID - if _, ok := h.channelByID[laoChannelPath]; ok { + if _, ok := h.channelByID.Get(laoChannelPath); ok { return answer.NewDuplicateResourceError("failed to create lao: duplicate lao path: %q", laoChannelPath) } @@ -637,28 +597,16 @@ func (h *Hub) GetSchemaValidator() validation.SchemaValidator { // NotifyNewChannel implements channel.HubFunctionalities func (h *Hub) NotifyNewChannel(channelID string, channel channel.Channel, sock socket.Socket) { - h.Lock() - h.channelByID[channelID] = channel - h.Unlock() + h.channelByID.Set(channelID, channel) } // NotifyWitnessMessage implements channel.HubFunctionalities func (h *Hub) NotifyWitnessMessage(messageId string, publicKey string, signature string) { - h.Lock() h.hubInbox.AddWitnessSignature(messageId, publicKey, signature) - h.Unlock() } func (h *Hub) GetPeersInfo() []method.ServerInfo { - h.Lock() - defer h.Unlock() - - var peersInfo []method.ServerInfo - for _, info := range h.peersInfo { - peersInfo = append(peersInfo, info) - } - - return peersInfo + return h.peers.GetAllPeersInfo() } func generateKeys() (kyber.Point, kyber.Scalar) { @@ -667,16 +615,3 @@ func generateKeys() (kyber.Point, kyber.Scalar) { return point, secret } - -// addMessageId adds a message ID to the map of messageIds by channel of the hub -func (h *Hub) addMessageId(channelId string, messageId string) { - messageIds, channelStored := h.messageIdsByChannel[channelId] - if !channelStored { - h.messageIdsByChannel[channelId] = append(h.messageIdsByChannel[channelId], messageId) - } else { - alreadyStored := slices.Contains(messageIds, messageId) - if !alreadyStored { - h.messageIdsByChannel[channelId] = append(h.messageIdsByChannel[channelId], messageId) - } - } -} diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/mod_test.go index d15cbc8d5c..7c756497d1 100644 --- a/be1-go/hub/standard_hub/mod_test.go +++ b/be1-go/hub/standard_hub/mod_test.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "encoding/json" "fmt" - "golang.org/x/exp/slices" "io" "os" "path/filepath" @@ -19,6 +18,8 @@ import ( "testing" "time" + "golang.org/x/exp/slices" + "github.com/stretchr/testify/assert" "github.com/rs/zerolog" @@ -729,8 +730,10 @@ func Test_Create_LAO(t *testing.T) { // the server should have saved the channel locally - require.Contains(t, hub.channelByID, rootPrefix+data.ID) - require.Equal(t, fakeChannelFac.c, hub.channelByID[rootPrefix+data.ID]) + require.Contains(t, hub.channelByID.GetTable(), rootPrefix+data.ID) + + channel, _ := hub.channelByID.Get(rootPrefix + data.ID) + require.Equal(t, fakeChannelFac.c, channel) } func Test_Wrong_Root_Publish(t *testing.T) { @@ -743,7 +746,7 @@ func Test_Wrong_Root_Publish(t *testing.T) { laoID := "/root" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) data := messagedata.LaoState{ Object: messagedata.LAOObject, @@ -874,14 +877,12 @@ func Test_Handle_Answer(t *testing.T) { answerBisBuf, err := json.Marshal(serverAnswerBis) require.NoError(t, err) - queryState := false - hub.queries.state[1] = &queryState - hub.queries.getMessagesByIdQueries[1] = method.GetMessagesById{ + query := method.GetMessagesById{ Base: query.Base{}, ID: 1, Params: nil, } - + hub.queries.AddQuery(1, query) sock := &fakeSocket{} hub.handleMessageFromClient(&socket.IncomingMessage{ @@ -890,6 +891,8 @@ func Test_Handle_Answer(t *testing.T) { }) require.Error(t, sock.err, "rpc message sent by a client should be a query") sock.err = nil + queryState, err := hub.queries.GetQueryState(1) + require.NoError(t, err) require.False(t, queryState) hub.handleMessageFromServer(&socket.IncomingMessage{ @@ -897,6 +900,8 @@ func Test_Handle_Answer(t *testing.T) { Message: resultBuf, }) require.NoError(t, sock.err) + queryState, _ = hub.queries.GetQueryState(1) + require.NoError(t, err) require.False(t, queryState) hub.handleMessageFromServer(&socket.IncomingMessage{ @@ -904,6 +909,8 @@ func Test_Handle_Answer(t *testing.T) { Message: answerBuf, }) require.NoError(t, sock.err) + queryState, _ = hub.queries.GetQueryState(1) + require.NoError(t, err) require.True(t, queryState) hub.handleMessageFromServer(&socket.IncomingMessage{ @@ -932,7 +939,7 @@ func Test_Handle_Publish_From_Client(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -999,7 +1006,7 @@ func Test_Handle_Publish_From_Server(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1066,7 +1073,7 @@ func Test_Receive_Publish_Twice(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1191,9 +1198,7 @@ func Test_Create_LAO_GetMessagesById_Result(t *testing.T) { Params: missingMessages, } - queryState := false - hub.queries.state[1] = &queryState - hub.queries.getMessagesByIdQueries[1] = getMessagesByIdQuery + hub.queries.AddQuery(1, getMessagesByIdQuery) ans := struct { JSONRPC string `json:"jsonrpc"` @@ -1228,8 +1233,9 @@ func Test_Create_LAO_GetMessagesById_Result(t *testing.T) { // the server should have saved the channel locally - require.Contains(t, hub.channelByID, rootPrefix+data.ID) - require.Equal(t, fakeChannelFac.c, hub.channelByID[rootPrefix+data.ID]) + require.Contains(t, hub.channelByID.GetTable(), rootPrefix+data.ID) + channel, _ := hub.channelByID.Get(rootPrefix + laoID) + require.Equal(t, fakeChannelFac.c, channel) } // Tests that an answer to a getMessagesById without a valid message id returns an error @@ -1294,9 +1300,7 @@ func Test_Create_LAO_GetMessagesById_Wrong_MessageID(t *testing.T) { Params: missingMessages, } - queryState := false - hub.queries.state[1] = &queryState - hub.queries.getMessagesByIdQueries[1] = getMessagesByIdQuery + hub.queries.AddQuery(1, getMessagesByIdQuery) ans := struct { JSONRPC string `json:"jsonrpc"` @@ -1334,7 +1338,7 @@ func Test_Handle_Subscribe(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) subscribe := method.Subscribe{ Base: query.Base{ @@ -1397,7 +1401,7 @@ func TestServer_Handle_Unsubscribe(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) unsubscribe := method.Unsubscribe{ Base: query.Base{ @@ -1471,7 +1475,7 @@ func TestServer_Handle_Catchup(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) catchup := method.Catchup{ Base: query.Base{ @@ -1543,7 +1547,7 @@ func Test_Send_And_Handle_Message(t *testing.T) { laoID := "XXX" - hub.channelByID[rootPrefix+laoID] = c + hub.channelByID.Set(rootPrefix+laoID, c) signature, err := schnorr.Sign(suite, keypair.private, []byte("XXX")) require.NoError(t, err) @@ -1615,8 +1619,8 @@ func Test_Send_Heartbeat_Message(t *testing.T) { hub.hubInbox.StoreMessage(msg2) hub.hubInbox.StoreMessage(msg3) - hub.messageIdsByChannel["/root"] = idsRoot - hub.messageIdsByChannel["/root/channel1"] = idsChannel1 + hub.messageIdsByChannel.AddAll("/root", idsRoot) + hub.messageIdsByChannel.AddAll("/root/channel1", idsChannel1) hub.sendHeartbeatToServers() @@ -1630,7 +1634,7 @@ func Test_Send_Heartbeat_Message(t *testing.T) { messageIdsSent := heartbeat.Params //Check that all the stored messages where sent - for storedChannel, storedIds := range hub.messageIdsByChannel { + for storedChannel, storedIds := range hub.messageIdsByChannel.GetTable() { sentIds, exists := messageIdsSent[storedChannel] require.True(t, exists) for _, storedId := range storedIds { @@ -1649,7 +1653,7 @@ func Test_Handle_Heartbeat(t *testing.T) { hub.hubInbox.StoreMessage(msg1) - hub.messageIdsByChannel["/root"] = []string{msg1.MessageID} + hub.messageIdsByChannel.Add("/root", msg1.MessageID) sock := &fakeSocket{} @@ -1715,8 +1719,8 @@ func Test_Handle_GetMessagesById(t *testing.T) { hub.hubInbox.StoreMessage(msg2) hub.hubInbox.StoreMessage(msg3) - hub.messageIdsByChannel["/root"] = idsRoot - hub.messageIdsByChannel["/root/channel1"] = idsChannel1 + hub.messageIdsByChannel.AddAll("/root", idsRoot) + hub.messageIdsByChannel.AddAll("/root/channel1", idsChannel1) //The missing Ids requested by the server missingIds := make(map[string][]string) @@ -1846,7 +1850,7 @@ func Test_Handle_GreetServer_Already_Greeted(t *testing.T) { err = hub.SendGreetServer(sock) require.NoError(t, err) - require.True(t, slices.Contains(hub.peersGreeted, sock.ID())) + require.True(t, hub.peers.IsPeerGreeted(sock.ID())) //reset socket message sock.msg = nil diff --git a/be1-go/inbox/mod.go b/be1-go/inbox/mod.go index 0639e9c4de..e51cd43e44 100644 --- a/be1-go/inbox/mod.go +++ b/be1-go/inbox/mod.go @@ -38,9 +38,9 @@ func NewInbox(channelID string) *Inbox { // `messageID`. If the message was not yet received, the signature is added // to the pending signatures map. func (i *Inbox) AddWitnessSignature(messageID string, public string, signature string) { - msg, ok := i.GetMessage(messageID) i.mutex.Lock() defer i.mutex.Unlock() + msg, ok := i.msgsMap[messageID] if !ok { // Add the signature to the pending signatures i.pendingSignatures[messageID] = append(i.pendingSignatures[messageID], message.WitnessSignature{ @@ -48,7 +48,7 @@ func (i *Inbox) AddWitnessSignature(messageID string, public string, signature s Signature: signature, }) } else { - msg.WitnessSignatures = append(msg.WitnessSignatures, message.WitnessSignature{ + msg.message.WitnessSignatures = append(msg.message.WitnessSignatures, message.WitnessSignature{ Witness: public, Signature: signature, })