Skip to content

Commit

Permalink
feat: Implement greeting/farewell from node in topic (for match-system)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChronosXYZ committed Sep 22, 2019
1 parent 9590443 commit d329da4
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 45 deletions.
18 changes: 8 additions & 10 deletions api/protocol.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package api

import "github.com/libp2p/go-libp2p-core/peer"

/*
Flags:
- 0x0: Generic message
- 0x1: Request to get existing PubSub topics at the network
- 0x2: Response to the request for topics (ack)
- 0x3: Request to ask peers for their MatrixID
- 0x4: Response to the request for MatrixID
- 0x5: Greeting to users in the topic
- 0x6: Farewell to users in the topic
- 0x7: Same as 0x4, but for response to greeting in the topic
*/
const (
FlagGenericMessage int = 0x0
FlagTopicsRequest int = 0x1
FlagTopicsResponse int = 0x2
FlagIdentityRequest int = 0x3
FlagIdentityResponse int = 0x4
FlagGreeting int = 0x5
FlagFarewell int = 0x6
FlagGreetingRespond int = 0x7

ProtocolString string = "/moonshard/2.0.0"
)

// BaseMessage is the basic message format of our protocol
Expand All @@ -32,11 +38,3 @@ type GetTopicsRespondMessage struct {
BaseMessage
Topics []string `json:"topics"`
}

// GetIdentityRespondMessage is the format of the message to answer of request for peer identity
// Flag: 0x4
type GetIdentityRespondMessage struct {
BaseMessage
PeerID peer.ID `json:"peer_id"`
MatrixID string `json:"matrix_id"`
}
106 changes: 71 additions & 35 deletions pkg/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewHandler(pb *pubsub.PubSub, serviceTopic string, peerID peer.ID, networkT
}
}

func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage)) {
addr, err := peer.IDFromBytes(msg.From)
func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage), handleMatch func(string, string), handleUnmatch func(string, string)) {
fromPeerID, err := peer.IDFromBytes(msg.From)
if err != nil {
log.Println("Error occurred when reading message from field...")
return
Expand All @@ -62,7 +62,7 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
textMessage := TextMessage{
Topic: topic,
Body: message.Body,
FromPeerID: addr.String(),
FromPeerID: fromPeerID.String(),
FromMatrixID: message.FromMatrixID,
}
handleTextMessage(textMessage)
Expand All @@ -73,7 +73,7 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
Body: "",
Flag: api.FlagTopicsResponse,
FromMatrixID: h.matrixID,
To: addr.String(),
To: fromPeerID.String(),
},
Topics: h.GetTopics(),
}
Expand All @@ -83,7 +83,6 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
return
}
go func() {
// Lock for blocking "same-time-respond"
h.PbMutex.Lock()
h.pb.Publish(h.serviceTopic, sendData)
h.PbMutex.Unlock()
Expand All @@ -100,39 +99,45 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
}
// Getting identity request, answer identity response
case api.FlagIdentityRequest:
respond := &api.GetIdentityRespondMessage{
BaseMessage: api.BaseMessage{
Body: "",
Flag: api.FlagIdentityResponse,
FromMatrixID: h.matrixID,
To: addr.String(),
},
PeerID: h.peerID,
MatrixID: h.matrixID,
}
sendData, err := json.Marshal(respond)
if err != nil {
log.Println("Error occurred during marshalling the respond from IdentityRequest")
return
}
go func() {
h.PbMutex.Lock()
h.pb.Publish(h.serviceTopic, sendData)
h.PbMutex.Unlock()
}()
h.sendIdentityResponse(h.serviceTopic, fromPeerID.String())
// Getting identity respond, mapping Multiaddress/MatrixID
case api.FlagIdentityResponse:
respond := &api.GetIdentityRespondMessage{}
if err := json.Unmarshal(msg.Data, respond); err != nil {
log.Println("Error occurred during unmarshalling the message data from IdentityResponse")
return
}
h.identityMap[respond.PeerID] = respond.MatrixID
h.identityMap[peer.ID(fromPeerID.String())] = message.FromMatrixID
case api.FlagGreeting:
handleMatch(topic, message.FromMatrixID)
h.sendIdentityResponse(topic, fromPeerID.String())
case api.FlagFarewell:
handleUnmatch(topic, message.FromMatrixID)
default:
log.Printf("\nUnknown message type: %#x\n", message.Flag)
}
}

func (h *Handler) sendIdentityResponse(topic string, fromPeerID string) {
var flag int
if topic == h.serviceTopic {
flag = api.FlagIdentityResponse
} else {
flag = api.FlagGreetingRespond
}
respond := &api.BaseMessage{
Body: "",
Flag: flag,
FromMatrixID: h.matrixID,
To: fromPeerID,
}
sendData, err := json.Marshal(respond)
if err != nil {
log.Println("Error occurred during marshalling the respond from IdentityRequest")
return
}
go func() {
h.PbMutex.Lock()
h.pb.Publish(topic, sendData)
h.PbMutex.Unlock()
}()
}

// Set Matrix ID
func (h *Handler) SetMatrixID(matrixID string) {
h.matrixID = matrixID
Expand Down Expand Up @@ -173,7 +178,8 @@ func (h *Handler) RequestNetworkTopics() {
}

// Requests MatrixID from specific peer
func (h *Handler) RequestPeersIdentity(peerID string) {
// TODO: refactor with promise
func (h *Handler) RequestPeerIdentity(peerID string) {
requestPeersIdentity := &api.BaseMessage{
Body: "",
To: peerID,
Expand All @@ -184,15 +190,45 @@ func (h *Handler) RequestPeersIdentity(peerID string) {
h.sendMessageToServiceTopic(requestPeersIdentity)
}

// TODO: refactor
func (h *Handler) SendGreetingInTopic(topic string) {
greetingMessage := &api.BaseMessage{
Body: "",
To: "",
Flag: api.FlagGreeting,
FromMatrixID: h.matrixID,
}

h.sendMessageToTopic(topic, greetingMessage)
}

// TODO: refactor
func (h *Handler) SendFarewellInTopic(topic string) {
farewellMessage := &api.BaseMessage{
Body: "",
To: "",
Flag: api.FlagFarewell,
FromMatrixID: h.matrixID,
}

h.sendMessageToTopic(topic, farewellMessage)
}

// Sends marshaled message to the service topic
func (h *Handler) sendMessageToServiceTopic(message *api.BaseMessage) {
h.sendMessageToTopic(h.serviceTopic, message)
}

func (h *Handler) sendMessageToTopic(topic string, message *api.BaseMessage) {
sendData, err := json.Marshal(message)
if err != nil {
log.Println(err.Error())
return
}

h.PbMutex.Lock()
h.pb.Publish(h.serviceTopic, sendData)
h.PbMutex.Unlock()
go func() {
h.PbMutex.Lock()
h.pb.Publish(topic, sendData)
h.PbMutex.Unlock()
}()
}

0 comments on commit d329da4

Please sign in to comment.