diff --git a/be1-go/channel/authentication/mod.go b/be1-go/channel/authentication/authentication.go similarity index 100% rename from be1-go/channel/authentication/mod.go rename to be1-go/channel/authentication/authentication.go diff --git a/be1-go/channel/authentication/mod_test.go b/be1-go/channel/authentication/authentication_test.go similarity index 100% rename from be1-go/channel/authentication/mod_test.go rename to be1-go/channel/authentication/authentication_test.go diff --git a/be1-go/channel/mod.go b/be1-go/channel/channel.go similarity index 100% rename from be1-go/channel/mod.go rename to be1-go/channel/channel.go diff --git a/be1-go/channel/chirp/mod.go b/be1-go/channel/chirp/chirp.go similarity index 100% rename from be1-go/channel/chirp/mod.go rename to be1-go/channel/chirp/chirp.go diff --git a/be1-go/channel/chirp/mod_test.go b/be1-go/channel/chirp/chirp_test.go similarity index 100% rename from be1-go/channel/chirp/mod_test.go rename to be1-go/channel/chirp/chirp_test.go diff --git a/be1-go/channel/coin/mod.go b/be1-go/channel/coin/coin.go similarity index 100% rename from be1-go/channel/coin/mod.go rename to be1-go/channel/coin/coin.go diff --git a/be1-go/channel/coin/mod_test.go b/be1-go/channel/coin/coin_test.go similarity index 100% rename from be1-go/channel/coin/mod_test.go rename to be1-go/channel/coin/coin_test.go diff --git a/be1-go/channel/coin/uint53/mod.go b/be1-go/channel/coin/uint53/uint53.go similarity index 100% rename from be1-go/channel/coin/uint53/mod.go rename to be1-go/channel/coin/uint53/uint53.go diff --git a/be1-go/channel/coin/uint53/mod_test.go b/be1-go/channel/coin/uint53/uint53_test.go similarity index 100% rename from be1-go/channel/coin/uint53/mod_test.go rename to be1-go/channel/coin/uint53/uint53_test.go diff --git a/be1-go/channel/consensus/mod.go b/be1-go/channel/consensus/consensus.go similarity index 100% rename from be1-go/channel/consensus/mod.go rename to be1-go/channel/consensus/consensus.go diff --git a/be1-go/channel/consensus/mod_test.go b/be1-go/channel/consensus/consensus_test.go similarity index 100% rename from be1-go/channel/consensus/mod_test.go rename to be1-go/channel/consensus/consensus_test.go diff --git a/be1-go/channel/election/mod.go b/be1-go/channel/election/election.go similarity index 100% rename from be1-go/channel/election/mod.go rename to be1-go/channel/election/election.go diff --git a/be1-go/channel/election/mod_test.go b/be1-go/channel/election/election_test.go similarity index 100% rename from be1-go/channel/election/mod_test.go rename to be1-go/channel/election/election_test.go diff --git a/be1-go/channel/generalChirping/mod.go b/be1-go/channel/generalChirping/generalChirping.go similarity index 100% rename from be1-go/channel/generalChirping/mod.go rename to be1-go/channel/generalChirping/generalChirping.go diff --git a/be1-go/channel/generalChirping/mod_test.go b/be1-go/channel/generalChirping/generalChirping_test.go similarity index 100% rename from be1-go/channel/generalChirping/mod_test.go rename to be1-go/channel/generalChirping/generalChirping_test.go diff --git a/be1-go/channel/lao/mod.go b/be1-go/channel/lao/lao.go similarity index 100% rename from be1-go/channel/lao/mod.go rename to be1-go/channel/lao/lao.go diff --git a/be1-go/channel/lao/mod_test.go b/be1-go/channel/lao/lao_test.go similarity index 100% rename from be1-go/channel/lao/mod_test.go rename to be1-go/channel/lao/lao_test.go diff --git a/be1-go/channel/reaction/mod.go b/be1-go/channel/reaction/reaction.go similarity index 100% rename from be1-go/channel/reaction/mod.go rename to be1-go/channel/reaction/reaction.go diff --git a/be1-go/channel/reaction/mod_test.go b/be1-go/channel/reaction/reaction_test.go similarity index 100% rename from be1-go/channel/reaction/mod_test.go rename to be1-go/channel/reaction/reaction_test.go diff --git a/be1-go/channel/registry/mod.go b/be1-go/channel/registry/registry.go similarity index 100% rename from be1-go/channel/registry/mod.go rename to be1-go/channel/registry/registry.go diff --git a/be1-go/cli/mod.go b/be1-go/cli/cli.go similarity index 100% rename from be1-go/cli/mod.go rename to be1-go/cli/cli.go diff --git a/be1-go/cli/mod_test.go b/be1-go/cli/cli_test.go similarity index 100% rename from be1-go/cli/mod_test.go rename to be1-go/cli/cli_test.go diff --git a/be1-go/hub/mod.go b/be1-go/hub/hub.go similarity index 100% rename from be1-go/hub/mod.go rename to be1-go/hub/hub.go diff --git a/be1-go/hub/standard_hub/hub_state/Channels.go b/be1-go/hub/standard_hub/hub_state/channels.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/Channels.go rename to be1-go/hub/standard_hub/hub_state/channels.go diff --git a/be1-go/hub/standard_hub/hub_state/MessageIds.go b/be1-go/hub/standard_hub/hub_state/messageIds.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/MessageIds.go rename to be1-go/hub/standard_hub/hub_state/messageIds.go diff --git a/be1-go/hub/standard_hub/hub_state/Peers.go b/be1-go/hub/standard_hub/hub_state/peers.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/Peers.go rename to be1-go/hub/standard_hub/hub_state/peers.go diff --git a/be1-go/hub/standard_hub/hub_state/Queries.go b/be1-go/hub/standard_hub/hub_state/queries.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/Queries.go rename to be1-go/hub/standard_hub/hub_state/queries.go diff --git a/be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go b/be1-go/hub/standard_hub/hub_state/threadSafeMap.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/ThreadSafeMap.go rename to be1-go/hub/standard_hub/hub_state/threadSafeMap.go diff --git a/be1-go/hub/standard_hub/hub_state/ThreadSafeSlice.go b/be1-go/hub/standard_hub/hub_state/threadSafeSlice.go similarity index 100% rename from be1-go/hub/standard_hub/hub_state/ThreadSafeSlice.go rename to be1-go/hub/standard_hub/hub_state/threadSafeSlice.go diff --git a/be1-go/hub/standard_hub/mod.go b/be1-go/hub/standard_hub/standard_hub.go similarity index 100% rename from be1-go/hub/standard_hub/mod.go rename to be1-go/hub/standard_hub/standard_hub.go diff --git a/be1-go/hub/standard_hub/mod_test.go b/be1-go/hub/standard_hub/standard_hub_test.go similarity index 100% rename from be1-go/hub/standard_hub/mod_test.go rename to be1-go/hub/standard_hub/standard_hub_test.go diff --git a/be1-go/inbox/mod.go b/be1-go/inbox/inbox.go similarity index 100% rename from be1-go/inbox/mod.go rename to be1-go/inbox/inbox.go diff --git a/be1-go/inbox/mod_test.go b/be1-go/inbox/inbox_test.go similarity index 100% rename from be1-go/inbox/mod_test.go rename to be1-go/inbox/inbox_test.go diff --git a/be1-go/internal/depgraph/mod.go b/be1-go/internal/depgraph/depgraph.go similarity index 86% rename from be1-go/internal/depgraph/mod.go rename to be1-go/internal/depgraph/depgraph.go index 9691f17e62..09d601078b 100644 --- a/be1-go/internal/depgraph/mod.go +++ b/be1-go/internal/depgraph/depgraph.go @@ -36,7 +36,6 @@ type config struct { type bag map[string]struct{} func main() { - app := &cli.App{ Name: "depgraph", Usage: "generate a dot graph", @@ -219,8 +218,6 @@ func getWriter(config config) (io.Writer, error) { // folder func walkFn(config config, links map[string]bag) filepath.WalkFunc { return func(path string, f os.FileInfo, err error) error { - fset := token.NewFileSet() - if err != nil { return xerrors.Errorf("got an error while walking: %v", err) } @@ -237,47 +234,52 @@ func walkFn(config config, links map[string]bag) filepath.WalkFunc { return nil } - astFile, err := parser.ParseFile(fset, path, nil, parser.ImportsOnly) - if err != nil { - return xerrors.Errorf("failed to parse file: %v", err) - } + return walkTroughImports(config, links, path) + } +} - path = filepath.Dir(path) - // This is the full package path. From "mino" we want - // "go.dedis.ch/dela/mino" - packagePath := config.Modname + path +func walkTroughImports(config config, links map[string]bag, path string) error { + fset := token.NewFileSet() + astFile, err := parser.ParseFile(fset, path, nil, parser.ImportsOnly) + if err != nil { + return xerrors.Errorf("failed to parse file: %v", err) + } - if !isIncluded(packagePath, config.Includes) || - isExcluded(packagePath, config.Excludes) { - return nil - } + path = filepath.Dir(path) + // This is the full package path. From "mino" we want + // "go.dedis.ch/dela/mino" + packagePath := config.Modname + path - for _, s := range astFile.Imports { - // because an import path is always surrounded with "" we remove - // them - importPath := s.Path.Value[1 : len(s.Path.Value)-1] + if !isIncluded(packagePath, config.Includes) || + isExcluded(packagePath, config.Excludes) { + return nil + } - if !isIncluded(importPath, config.Includes) || - isExcluded(importPath, config.Excludes) { + for _, s := range astFile.Imports { + // because an import path is always surrounded with "" we remove + // them + importPath := s.Path.Value[1 : len(s.Path.Value)-1] - continue - } + if !isIncluded(importPath, config.Includes) || + isExcluded(importPath, config.Excludes) { - // in the case the package imports a package from the same module, - // we want to keep only the "relative" name. From - // "go.dedis.ch/dela/mino/minogrpc" we want only "mino/minogrpc". - importPath = strings.TrimPrefix(importPath, config.Modname) + continue + } - if links[packagePath[len(config.Modname):]] == nil { - links[packagePath[len(config.Modname):]] = make(bag) - } + // in the case the package imports a package from the same module, + // we want to keep only the "relative" name. From + // "go.dedis.ch/dela/mino/minogrpc" we want only "mino/minogrpc". + importPath = strings.TrimPrefix(importPath, config.Modname) - // add the dependency to the bag - links[packagePath[len(config.Modname):]][importPath] = struct{}{} + if links[packagePath[len(config.Modname):]] == nil { + links[packagePath[len(config.Modname):]] = make(bag) } - return nil + // add the dependency to the bag + links[packagePath[len(config.Modname):]][importPath] = struct{}{} } + + return nil } func displayGraph(out io.Writer, links map[string]bag, interfaces bag) { diff --git a/be1-go/mod.go b/be1-go/logger.go similarity index 100% rename from be1-go/mod.go rename to be1-go/logger.go diff --git a/be1-go/network/mod.go b/be1-go/network/network.go similarity index 100% rename from be1-go/network/mod.go rename to be1-go/network/network.go diff --git a/be1-go/network/socket/mod.go b/be1-go/network/socket/mod.go deleted file mode 100644 index 83a1b35324..0000000000 --- a/be1-go/network/socket/mod.go +++ /dev/null @@ -1,70 +0,0 @@ -// Package socket contains an implementation of the Socket interface which -// is responsible for low level communication over websockets, i.e. sending -// and receiving marshaled messages over the wire. -// -// The Socket interface has multiple concrete implementations - one for servers -// and one for clients. -package socket - -import ( - "popstellar/message/query/method/message" - "time" -) - -const ( - // maxMessageSize denotes a maximum possible message size in bytes - maxMessageSize = 256 * 1024 // 256K - - // writeWait denotes the timeout for writing. - writeWait = 10 * time.Second - - // pongWait is the timeout for reading a pong. - pongWait = 60 * time.Second - - // pingPeriod is the interval to send ping messages in. - pingPeriod = (pongWait * 9) / 10 -) - -// Socket is an interface which allows reading/writing messages to -// another client -type Socket interface { - // ID denotes a unique ID of the socket. This allows us to store - // sockets in maps - ID() string - - // Type denotes the type of socket. - Type() SocketType - - // ReadPump is a lower level method for reading messages from the socket. - ReadPump() - - // WritePump is a lower level method for writing messages to the socket. - WritePump() - - // Send is used to send a message to the client. - Send(msg []byte) - - // SendError is used to send an error to the client. Please refer to - // the Protocol Specification document for information on the error - // codes. id is a pointer type because an error might be for a - // message which does not have an ID. - SendError(id *int, err error) - - // SendResult is used to send a result message to the client. Res can be - // nil, empty, or filled if the result is a slice of messages. - // MissingMessagesByChannel can be nil or filled if the result is a map - // associating a channel to a slice of messages. In case both are nil - // it sends the "0" return value. You can either send res or missingMessagesByChannel, not both. - SendResult(id int, res []message.Message, missingMessagesByChannel map[string][]message.Message) -} - -// IncomingMessage wraps the raw message from the websocket connection and pairs -// it with a `Socket` instance. -type IncomingMessage struct { - // Socket denotes where the message is originating from - // and allows us to communicate with the other party. - Socket Socket - - // Message is the marshaled message - Message []byte -} diff --git a/be1-go/network/socket/socket.go b/be1-go/network/socket/socket.go index 9eb5d6a90a..83a1b35324 100644 --- a/be1-go/network/socket/socket.go +++ b/be1-go/network/socket/socket.go @@ -1,303 +1,70 @@ +// Package socket contains an implementation of the Socket interface which +// is responsible for low level communication over websockets, i.e. sending +// and receiving marshaled messages over the wire. +// +// The Socket interface has multiple concrete implementations - one for servers +// and one for clients. package socket import ( - "encoding/json" - "errors" - jsonrpc "popstellar/message" - - "popstellar/message/answer" "popstellar/message/query/method/message" - "sync" "time" - - "github.com/gorilla/websocket" - "github.com/rs/xid" - "github.com/rs/zerolog" ) -// SocketType represents different socket types -type SocketType string - const ( - // ClientSocketType denotes a client. - ClientSocketType SocketType = "client" - - // ServerSocketType denotes a server. - ServerSocketType SocketType = "server" -) - -// baseSocket represents a socket connected to the server. -type baseSocket struct { - id string - - socketType SocketType - - receiver chan<- IncomingMessage - - // Used to remove sockets which close unexpectedly. - closedSockets chan<- string - - conn *websocket.Conn - - send chan []byte - - wg *sync.WaitGroup - - done chan struct{} - - log zerolog.Logger -} - -func (s *baseSocket) ID() string { - return s.id -} - -func (s *baseSocket) Type() SocketType { - return s.socketType -} - -// ReadPump starts the reader loop for the socket. -func (s *baseSocket) ReadPump() { - defer func() { - s.conn.Close() - s.wg.Done() - - // it is safe to send a message on s.closedSockets after calling - // s.wg.Done() If the hub is still open then it will be processed and - // the client will be unsubscribed. Otherwise, since the hub is being - // shut down, this won't block because the process will exit. - s.closedSockets <- s.ID() - }() - - s.log.Info().Msgf("listening for messages from %s", s.socketType) - - s.conn.SetReadLimit(maxMessageSize) - s.conn.SetReadDeadline(time.Now().Add(pongWait)) - s.conn.SetPongHandler(func(string) error { - s.conn.SetReadDeadline(time.Now().Add(pongWait)) - return nil - }) - - for { - _, message, err := s.conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - s.log.Err(err). - Str("socket", s.conn.RemoteAddr().String()). - Msg("connection dropped unexpectedly") - } else { - s.log.Info().Msg("closing the read pump") - } - break - } - - msg := IncomingMessage{ - Socket: s, - Message: message, - } - - // return if we're done - select { - case <-s.done: - return - default: - s.receiver <- msg - } - } -} - -// WritePump starts the writer loop for the socket. -func (s *baseSocket) WritePump() { - ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - s.conn.Close() - s.wg.Done() - // it's safe to send a message on s.closedSockets after calling - // s.wg.Done() If the hub is still open then it will be processed and - // the client will be unsubscribed. Otherwise, since the hub is being - // shut down, this won't block because the process will exit. - s.closedSockets <- s.ID() - }() + // maxMessageSize denotes a maximum possible message size in bytes + maxMessageSize = 256 * 1024 // 256K - for { - select { - case message, ok := <-s.send: - s.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if !ok { - s.conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } + // writeWait denotes the timeout for writing. + writeWait = 10 * time.Second - w, err := s.conn.NextWriter(websocket.TextMessage) - if err != nil { - s.log.Err(err).Msg("failed to retrieve writer") - return - } + // pongWait is the timeout for reading a pong. + pongWait = 60 * time.Second - w.Write(message) - - if err := w.Close(); err != nil { - s.log.Err(err).Msg("failed to close writer") - return - } - case <-ticker.C: - s.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := s.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - s.log.Err(err).Msg("failed to send ping") - return - } - case <-s.done: - s.log.Info().Msg("closing the write pump") - s.conn.WriteMessage(websocket.CloseGoingAway, []byte{}) - return - } - } -} - -// Send allows sending a serialized message to the socket. -func (s *baseSocket) Send(msg []byte) { - s.log.Info(). - Str("to", s.conn.RemoteAddr().String()). - Str("msg", string(msg)). - Msg("send generic msg") - s.send <- msg -} - -// SendError is a utility method that allows sending an `error` as a -// `message.Error` message to the socket. -func (s *baseSocket) SendError(id *int, err error) { - msgError := &answer.Error{} - - if !errors.As(err, &msgError) { - msgError = answer.NewError(-6, err.Error()) - } - - answer := answer.Answer{ - JSONRPCBase: jsonrpc.JSONRPCBase{ - JSONRPC: "2.0", - }, - ID: id, - Error: msgError, - } - - answerBuf, err := json.Marshal(answer) - if err != nil { - s.log.Err(err).Msg("failed to marshal answer") - return - } - - s.log.Info(). - Str("to", s.conn.RemoteAddr().String()). - Str("msg", string(answerBuf)). - Msg("send error") - - s.send <- answerBuf -} - -// SendResult is a utility method that allows sending a `message.Result` to the -// socket. -func (s *baseSocket) SendResult(id int, res []message.Message, missingMessagesByChannel map[string][]message.Message) { - var answer interface{} - - if res != nil && missingMessagesByChannel != nil { - s.log.Error().Msg("The result must be either a slice or a map of messages, not both.") - return - } - - if res == nil && missingMessagesByChannel == nil { - answer = struct { - JSONRPC string `json:"jsonrpc"` - ID int `json:"id"` - Result int `json:"result"` - }{ - "2.0", id, 0, - } - } else if res != nil { - for _, r := range res { - if r.WitnessSignatures == nil { - r.WitnessSignatures = []message.WitnessSignature{} - } - } - answer = struct { - JSONRPC string `json:"jsonrpc"` - ID int `json:"id"` - Result []message.Message `json:"result"` - }{ - "2.0", id, res, - } - } else if missingMessagesByChannel != nil { - answer = struct { - JSONRPC string `json:"jsonrpc"` - ID int `json:"id"` - Result map[string][]message.Message `json:"result"` - }{ - "2.0", id, missingMessagesByChannel, - } - } - - answerBuf, err := json.Marshal(&answer) - if err != nil { - s.log.Err(err).Msg("failed to marshal answer") - return - } - - s.log.Info(). - Str("to", s.id). - Str("msg", string(answerBuf)). - Msg("send result") - s.send <- answerBuf -} - -func newBaseSocket(socketType SocketType, receiver chan<- IncomingMessage, - closedSockets chan<- string, conn *websocket.Conn, wg *sync.WaitGroup, - done chan struct{}, log zerolog.Logger) *baseSocket { - - return &baseSocket{ - id: xid.New().String(), - socketType: socketType, - receiver: receiver, - closedSockets: closedSockets, - conn: conn, - send: make(chan []byte, 256), - wg: wg, - done: done, - log: log, - } -} - -// ClientSocket denotes a client socket and implements the Socket interface. -type ClientSocket struct { - *baseSocket -} - -// NewClientSocket returns an instance of a baseSocket. -func NewClientSocket(receiver chan<- IncomingMessage, - closedSockets chan<- string, conn *websocket.Conn, wg *sync.WaitGroup, - done chan struct{}, log zerolog.Logger) *ClientSocket { - - log = log.With().Str("role", "client socket").Logger() - - return &ClientSocket{ - baseSocket: newBaseSocket(ClientSocketType, receiver, closedSockets, - conn, wg, done, log), - } -} + // pingPeriod is the interval to send ping messages in. + pingPeriod = (pongWait * 9) / 10 +) -// ServerSocket denotes an organizer socket and implements the Socket interface. -type ServerSocket struct { - *baseSocket +// Socket is an interface which allows reading/writing messages to +// another client +type Socket interface { + // ID denotes a unique ID of the socket. This allows us to store + // sockets in maps + ID() string + + // Type denotes the type of socket. + Type() SocketType + + // ReadPump is a lower level method for reading messages from the socket. + ReadPump() + + // WritePump is a lower level method for writing messages to the socket. + WritePump() + + // Send is used to send a message to the client. + Send(msg []byte) + + // SendError is used to send an error to the client. Please refer to + // the Protocol Specification document for information on the error + // codes. id is a pointer type because an error might be for a + // message which does not have an ID. + SendError(id *int, err error) + + // SendResult is used to send a result message to the client. Res can be + // nil, empty, or filled if the result is a slice of messages. + // MissingMessagesByChannel can be nil or filled if the result is a map + // associating a channel to a slice of messages. In case both are nil + // it sends the "0" return value. You can either send res or missingMessagesByChannel, not both. + SendResult(id int, res []message.Message, missingMessagesByChannel map[string][]message.Message) } -// NewServerSocket returns a new ServerSocket. -func NewServerSocket(receiver chan<- IncomingMessage, - closedSockets chan<- string, conn *websocket.Conn, wg *sync.WaitGroup, - done chan struct{}, log zerolog.Logger) *ServerSocket { - - log = log.With().Str("role", "server socket").Logger() +// IncomingMessage wraps the raw message from the websocket connection and pairs +// it with a `Socket` instance. +type IncomingMessage struct { + // Socket denotes where the message is originating from + // and allows us to communicate with the other party. + Socket Socket - return &ServerSocket{ - baseSocket: newBaseSocket(ServerSocketType, receiver, closedSockets, - conn, wg, done, log), - } + // Message is the marshaled message + Message []byte } diff --git a/be1-go/network/socket/socket_impl.go b/be1-go/network/socket/socket_impl.go new file mode 100644 index 0000000000..ef49801f45 --- /dev/null +++ b/be1-go/network/socket/socket_impl.go @@ -0,0 +1,302 @@ +package socket + +import ( + "encoding/json" + "errors" + jsonrpc "popstellar/message" + "popstellar/message/answer" + "popstellar/message/query/method/message" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/rs/xid" + "github.com/rs/zerolog" +) + +// SocketType represents different socket types +type SocketType string + +const ( + // ClientSocketType denotes a client. + ClientSocketType SocketType = "client" + + // ServerSocketType denotes a server. + ServerSocketType SocketType = "server" +) + +// baseSocket represents a socket connected to the server. +type baseSocket struct { + id string + + socketType SocketType + + receiver chan<- IncomingMessage + + // Used to remove sockets which close unexpectedly. + closedSockets chan<- string + + conn *websocket.Conn + + send chan []byte + + wg *sync.WaitGroup + + done chan struct{} + + log zerolog.Logger +} + +func (s *baseSocket) ID() string { + return s.id +} + +func (s *baseSocket) Type() SocketType { + return s.socketType +} + +// ReadPump starts the reader loop for the socket. +func (s *baseSocket) ReadPump() { + defer func() { + s.conn.Close() + s.wg.Done() + + // it is safe to send a message on s.closedSockets after calling + // s.wg.Done() If the hub is still open then it will be processed and + // the client will be unsubscribed. Otherwise, since the hub is being + // shut down, this won't block because the process will exit. + s.closedSockets <- s.ID() + }() + + s.log.Info().Msgf("listening for messages from %s", s.socketType) + + s.conn.SetReadLimit(maxMessageSize) + s.conn.SetReadDeadline(time.Now().Add(pongWait)) + s.conn.SetPongHandler(func(string) error { + s.conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + _, message, err := s.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + s.log.Err(err). + Str("socket", s.conn.RemoteAddr().String()). + Msg("connection dropped unexpectedly") + } else { + s.log.Info().Msg("closing the read pump") + } + break + } + + msg := IncomingMessage{ + Socket: s, + Message: message, + } + + // return if we're done + select { + case <-s.done: + return + default: + s.receiver <- msg + } + } +} + +// WritePump starts the writer loop for the socket. +func (s *baseSocket) WritePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + s.conn.Close() + s.wg.Done() + // it's safe to send a message on s.closedSockets after calling + // s.wg.Done() If the hub is still open then it will be processed and + // the client will be unsubscribed. Otherwise, since the hub is being + // shut down, this won't block because the process will exit. + s.closedSockets <- s.ID() + }() + + for { + select { + case message, ok := <-s.send: + s.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + s.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := s.conn.NextWriter(websocket.TextMessage) + if err != nil { + s.log.Err(err).Msg("failed to retrieve writer") + return + } + + w.Write(message) + + if err := w.Close(); err != nil { + s.log.Err(err).Msg("failed to close writer") + return + } + case <-ticker.C: + s.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := s.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + s.log.Err(err).Msg("failed to send ping") + return + } + case <-s.done: + s.log.Info().Msg("closing the write pump") + s.conn.WriteMessage(websocket.CloseGoingAway, []byte{}) + return + } + } +} + +// Send allows sending a serialized message to the socket. +func (s *baseSocket) Send(msg []byte) { + s.log.Info(). + Str("to", s.conn.RemoteAddr().String()). + Str("msg", string(msg)). + Msg("send generic msg") + s.send <- msg +} + +// SendError is a utility method that allows sending an `error` as a +// `message.Error` message to the socket. +func (s *baseSocket) SendError(id *int, err error) { + msgError := &answer.Error{} + + if !errors.As(err, &msgError) { + msgError = answer.NewError(-6, err.Error()) + } + + answer := answer.Answer{ + JSONRPCBase: jsonrpc.JSONRPCBase{ + JSONRPC: "2.0", + }, + ID: id, + Error: msgError, + } + + answerBuf, err := json.Marshal(answer) + if err != nil { + s.log.Err(err).Msg("failed to marshal answer") + return + } + + s.log.Info(). + Str("to", s.conn.RemoteAddr().String()). + Str("msg", string(answerBuf)). + Msg("send error") + + s.send <- answerBuf +} + +// SendResult is a utility method that allows sending a `message.Result` to the +// socket. +func (s *baseSocket) SendResult(id int, res []message.Message, missingMessagesByChannel map[string][]message.Message) { + var answer interface{} + + if res != nil && missingMessagesByChannel != nil { + s.log.Error().Msg("The result must be either a slice or a map of messages, not both.") + return + } + + if res == nil && missingMessagesByChannel == nil { + answer = struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Result int `json:"result"` + }{ + "2.0", id, 0, + } + } else if res != nil { + for _, r := range res { + if r.WitnessSignatures == nil { + r.WitnessSignatures = []message.WitnessSignature{} + } + } + answer = struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Result []message.Message `json:"result"` + }{ + "2.0", id, res, + } + } else if missingMessagesByChannel != nil { + answer = struct { + JSONRPC string `json:"jsonrpc"` + ID int `json:"id"` + Result map[string][]message.Message `json:"result"` + }{ + "2.0", id, missingMessagesByChannel, + } + } + + answerBuf, err := json.Marshal(&answer) + if err != nil { + s.log.Err(err).Msg("failed to marshal answer") + return + } + + s.log.Info(). + Str("to", s.id). + Str("msg", string(answerBuf)). + Msg("send result") + s.send <- answerBuf +} + +func newBaseSocket(socketType SocketType, receiver chan<- IncomingMessage, + closedSockets chan<- string, conn *websocket.Conn, wg *sync.WaitGroup, + done chan struct{}, log zerolog.Logger, +) *baseSocket { + return &baseSocket{ + id: xid.New().String(), + socketType: socketType, + receiver: receiver, + closedSockets: closedSockets, + conn: conn, + send: make(chan []byte, 256), + wg: wg, + done: done, + log: log, + } +} + +// ClientSocket denotes a client socket and implements the Socket interface. +type ClientSocket struct { + *baseSocket +} + +// NewClientSocket returns an instance of a baseSocket. +func NewClientSocket(receiver chan<- IncomingMessage, + closedSockets chan<- string, conn *websocket.Conn, wg *sync.WaitGroup, + done chan struct{}, log zerolog.Logger, +) *ClientSocket { + log = log.With().Str("role", "client socket").Logger() + + return &ClientSocket{ + baseSocket: newBaseSocket(ClientSocketType, receiver, closedSockets, + conn, wg, done, log), + } +} + +// ServerSocket denotes an organizer socket and implements the Socket interface. +type ServerSocket struct { + *baseSocket +} + +// NewServerSocket returns a new ServerSocket. +func NewServerSocket(receiver chan<- IncomingMessage, + closedSockets chan<- string, conn *websocket.Conn, wg *sync.WaitGroup, + done chan struct{}, log zerolog.Logger, +) *ServerSocket { + log = log.With().Str("role", "server socket").Logger() + + return &ServerSocket{ + baseSocket: newBaseSocket(ServerSocketType, receiver, closedSockets, + conn, wg, done, log), + } +} diff --git a/be1-go/network/socket/socket_test.go b/be1-go/network/socket/socket_impl_test.go similarity index 100% rename from be1-go/network/socket/socket_test.go rename to be1-go/network/socket/socket_impl_test.go diff --git a/be1-go/validation/mod.go b/be1-go/validation/validation.go similarity index 100% rename from be1-go/validation/mod.go rename to be1-go/validation/validation.go