From d329da4a7b59b2e6681345e17979f9e87f492fd7 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Mon, 23 Sep 2019 01:51:02 +0400 Subject: [PATCH] feat: Implement greeting/farewell from node in topic (for match-system) --- api/protocol.go | 18 ++++---- pkg/handler.go | 106 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 79 insertions(+), 45 deletions(-) diff --git a/api/protocol.go b/api/protocol.go index 6d4c0b1..90ee187 100644 --- a/api/protocol.go +++ b/api/protocol.go @@ -1,7 +1,5 @@ package api -import "github.com/libp2p/go-libp2p-core/peer" - /* Flags: - 0x0: Generic message @@ -9,6 +7,9 @@ Flags: - 0x2: Response to the request for topics (ack) - 0x3: Request to ask peers for their MatrixID - 0x4: Response to the request for MatrixID + - 0x5: Greeting to users in the topic + - 0x6: Farewell to users in the topic + - 0x7: Same as 0x4, but for response to greeting in the topic */ const ( FlagGenericMessage int = 0x0 @@ -16,6 +17,11 @@ const ( FlagTopicsResponse int = 0x2 FlagIdentityRequest int = 0x3 FlagIdentityResponse int = 0x4 + FlagGreeting int = 0x5 + FlagFarewell int = 0x6 + FlagGreetingRespond int = 0x7 + + ProtocolString string = "/moonshard/2.0.0" ) // BaseMessage is the basic message format of our protocol @@ -32,11 +38,3 @@ type GetTopicsRespondMessage struct { BaseMessage Topics []string `json:"topics"` } - -// GetIdentityRespondMessage is the format of the message to answer of request for peer identity -// Flag: 0x4 -type GetIdentityRespondMessage struct { - BaseMessage - PeerID peer.ID `json:"peer_id"` - MatrixID string `json:"matrix_id"` -} diff --git a/pkg/handler.go b/pkg/handler.go index 014a6e7..999aa92 100644 --- a/pkg/handler.go +++ b/pkg/handler.go @@ -40,8 +40,8 @@ func NewHandler(pb *pubsub.PubSub, serviceTopic string, peerID peer.ID, networkT } } -func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage)) { - addr, err := peer.IDFromBytes(msg.From) +func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage), handleMatch func(string, string), handleUnmatch func(string, string)) { + fromPeerID, err := peer.IDFromBytes(msg.From) if err != nil { log.Println("Error occurred when reading message from field...") return @@ -62,7 +62,7 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle textMessage := TextMessage{ Topic: topic, Body: message.Body, - FromPeerID: addr.String(), + FromPeerID: fromPeerID.String(), FromMatrixID: message.FromMatrixID, } handleTextMessage(textMessage) @@ -73,7 +73,7 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle Body: "", Flag: api.FlagTopicsResponse, FromMatrixID: h.matrixID, - To: addr.String(), + To: fromPeerID.String(), }, Topics: h.GetTopics(), } @@ -83,7 +83,6 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle return } go func() { - // Lock for blocking "same-time-respond" h.PbMutex.Lock() h.pb.Publish(h.serviceTopic, sendData) h.PbMutex.Unlock() @@ -100,39 +99,45 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle } // Getting identity request, answer identity response case api.FlagIdentityRequest: - respond := &api.GetIdentityRespondMessage{ - BaseMessage: api.BaseMessage{ - Body: "", - Flag: api.FlagIdentityResponse, - FromMatrixID: h.matrixID, - To: addr.String(), - }, - PeerID: h.peerID, - MatrixID: h.matrixID, - } - sendData, err := json.Marshal(respond) - if err != nil { - log.Println("Error occurred during marshalling the respond from IdentityRequest") - return - } - go func() { - h.PbMutex.Lock() - h.pb.Publish(h.serviceTopic, sendData) - h.PbMutex.Unlock() - }() + h.sendIdentityResponse(h.serviceTopic, fromPeerID.String()) // Getting identity respond, mapping Multiaddress/MatrixID case api.FlagIdentityResponse: - respond := &api.GetIdentityRespondMessage{} - if err := json.Unmarshal(msg.Data, respond); err != nil { - log.Println("Error occurred during unmarshalling the message data from IdentityResponse") - return - } - h.identityMap[respond.PeerID] = respond.MatrixID + h.identityMap[peer.ID(fromPeerID.String())] = message.FromMatrixID + case api.FlagGreeting: + handleMatch(topic, message.FromMatrixID) + h.sendIdentityResponse(topic, fromPeerID.String()) + case api.FlagFarewell: + handleUnmatch(topic, message.FromMatrixID) default: log.Printf("\nUnknown message type: %#x\n", message.Flag) } } +func (h *Handler) sendIdentityResponse(topic string, fromPeerID string) { + var flag int + if topic == h.serviceTopic { + flag = api.FlagIdentityResponse + } else { + flag = api.FlagGreetingRespond + } + respond := &api.BaseMessage{ + Body: "", + Flag: flag, + FromMatrixID: h.matrixID, + To: fromPeerID, + } + sendData, err := json.Marshal(respond) + if err != nil { + log.Println("Error occurred during marshalling the respond from IdentityRequest") + return + } + go func() { + h.PbMutex.Lock() + h.pb.Publish(topic, sendData) + h.PbMutex.Unlock() + }() +} + // Set Matrix ID func (h *Handler) SetMatrixID(matrixID string) { h.matrixID = matrixID @@ -173,7 +178,8 @@ func (h *Handler) RequestNetworkTopics() { } // Requests MatrixID from specific peer -func (h *Handler) RequestPeersIdentity(peerID string) { +// TODO: refactor with promise +func (h *Handler) RequestPeerIdentity(peerID string) { requestPeersIdentity := &api.BaseMessage{ Body: "", To: peerID, @@ -184,15 +190,45 @@ func (h *Handler) RequestPeersIdentity(peerID string) { h.sendMessageToServiceTopic(requestPeersIdentity) } +// TODO: refactor +func (h *Handler) SendGreetingInTopic(topic string) { + greetingMessage := &api.BaseMessage{ + Body: "", + To: "", + Flag: api.FlagGreeting, + FromMatrixID: h.matrixID, + } + + h.sendMessageToTopic(topic, greetingMessage) +} + +// TODO: refactor +func (h *Handler) SendFarewellInTopic(topic string) { + farewellMessage := &api.BaseMessage{ + Body: "", + To: "", + Flag: api.FlagFarewell, + FromMatrixID: h.matrixID, + } + + h.sendMessageToTopic(topic, farewellMessage) +} + // Sends marshaled message to the service topic func (h *Handler) sendMessageToServiceTopic(message *api.BaseMessage) { + h.sendMessageToTopic(h.serviceTopic, message) +} + +func (h *Handler) sendMessageToTopic(topic string, message *api.BaseMessage) { sendData, err := json.Marshal(message) if err != nil { log.Println(err.Error()) return } - h.PbMutex.Lock() - h.pb.Publish(h.serviceTopic, sendData) - h.PbMutex.Unlock() + go func() { + h.PbMutex.Lock() + h.pb.Publish(topic, sendData) + h.PbMutex.Unlock() + }() }