Skip to content

Commit

Permalink
Release v2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ChronosXYZ committed Sep 22, 2019
2 parents 9946feb + d329da4 commit 28ca0e9
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 77 deletions.
26 changes: 13 additions & 13 deletions api/protocol.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
package api

import "github.com/libp2p/go-libp2p-core/peer"

/*
Flags:
- 0x0: Generic message
- 0x1: Request to get existing PubSub topics at the network
- 0x2: Response to the request for topics (ack)
- 0x3: Request to ask peers for their MatrixID
- 0x4: Response to the request for peers identity
- 0x4: Response to the request for MatrixID
- 0x5: Greeting to users in the topic
- 0x6: Farewell to users in the topic
- 0x7: Same as 0x4, but for response to greeting in the topic
*/
const (
FlagGenericMessage int = 0x0
FlagTopicsRequest int = 0x1
FlagTopicsResponse int = 0x2
FlagIdentityRequest int = 0x3
FlagIdentityResponse int = 0x4
FlagGreeting int = 0x5
FlagFarewell int = 0x6
FlagGreetingRespond int = 0x7

ProtocolString string = "/moonshard/2.0.0"
)

// BaseMessage is the basic message format of our protocol
type BaseMessage struct {
Body string `json:"body"`
Flag int `json:"flag"`
Body string `json:"body"`
To string `json:"to"`
Flag int `json:"flag"`
FromMatrixID string `json:"fromMatrixID"`
}

// GetTopicsRespondMessage is the format of the message to answer of request for topics
Expand All @@ -30,11 +38,3 @@ type GetTopicsRespondMessage struct {
BaseMessage
Topics []string `json:"topics"`
}

// GetIdentityRespondMessage is the format of the message to answer of request for peer identity
// Flag: 0x4
type GetIdentityRespondMessage struct {
BaseMessage
PeerID peer.ID `json:"peer_id"`
MatrixID string `json:"matrix_id"`
}
161 changes: 97 additions & 64 deletions pkg/handler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package pkg

import (
"context"
"encoding/json"
"log"
"sync"
"time"

"github.com/MoonSHRD/p2chat/api"
mapset "github.com/deckarep/golang-set"
Expand All @@ -26,9 +24,10 @@ type Handler struct {

// TextMessage is more end-user model of regular text messages
type TextMessage struct {
Topic string
Body string
From string
Topic string
Body string
FromPeerID string
FromMatrixID string
}

func NewHandler(pb *pubsub.PubSub, serviceTopic string, peerID peer.ID, networkTopics *mapset.Set) Handler {
Expand All @@ -41,8 +40,8 @@ func NewHandler(pb *pubsub.PubSub, serviceTopic string, peerID peer.ID, networkT
}
}

func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage)) {
addr, err := peer.IDFromBytes(msg.From)
func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage), handleMatch func(string, string), handleUnmatch func(string, string)) {
fromPeerID, err := peer.IDFromBytes(msg.From)
if err != nil {
log.Println("Error occurred when reading message from field...")
return
Expand All @@ -52,26 +51,29 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
log.Println("Error occurred during unmarshalling the base message data")
return
}

if message.To != "" && message.To != string(h.peerID) {
return // Drop message, because it is not for us
}

switch message.Flag {
// Getting regular message
case api.FlagGenericMessage:
from := addr.String()
if h.matrixID != "" {
from = h.matrixID
}

textMessage := TextMessage{
Topic: topic,
Body: message.Body,
From: from,
Topic: topic,
Body: message.Body,
FromPeerID: fromPeerID.String(),
FromMatrixID: message.FromMatrixID,
}
handleTextMessage(textMessage)
// Getting topic request, answer topic response
case api.FlagTopicsRequest:
respond := &api.GetTopicsRespondMessage{
BaseMessage: api.BaseMessage{
Body: "",
Flag: api.FlagTopicsResponse,
Body: "",
Flag: api.FlagTopicsResponse,
FromMatrixID: h.matrixID,
To: fromPeerID.String(),
},
Topics: h.GetTopics(),
}
Expand All @@ -81,7 +83,6 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
return
}
go func() {
// Lock for blocking "same-time-respond"
h.PbMutex.Lock()
h.pb.Publish(h.serviceTopic, sendData)
h.PbMutex.Unlock()
Expand All @@ -98,37 +99,45 @@ func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handle
}
// Getting identity request, answer identity response
case api.FlagIdentityRequest:
respond := &api.GetIdentityRespondMessage{
BaseMessage: api.BaseMessage{
Body: "",
Flag: api.FlagIdentityResponse,
},
PeerID: h.peerID,
MatrixID: h.matrixID,
}
sendData, err := json.Marshal(respond)
if err != nil {
log.Println("Error occurred during marshalling the respond from IdentityRequest")
return
}
go func() {
h.PbMutex.Lock()
h.pb.Publish(h.serviceTopic, sendData)
h.PbMutex.Unlock()
}()
h.sendIdentityResponse(h.serviceTopic, fromPeerID.String())
// Getting identity respond, mapping Multiaddress/MatrixID
case api.FlagIdentityResponse:
respond := &api.GetIdentityRespondMessage{}
if err := json.Unmarshal(msg.Data, respond); err != nil {
log.Println("Error occurred during unmarshalling the message data from IdentityResponse")
return
}
h.identityMap[respond.PeerID] = respond.MatrixID
h.identityMap[peer.ID(fromPeerID.String())] = message.FromMatrixID
case api.FlagGreeting:
handleMatch(topic, message.FromMatrixID)
h.sendIdentityResponse(topic, fromPeerID.String())
case api.FlagFarewell:
handleUnmatch(topic, message.FromMatrixID)
default:
log.Printf("\nUnknown message type: %#x\n", message.Flag)
}
}

func (h *Handler) sendIdentityResponse(topic string, fromPeerID string) {
var flag int
if topic == h.serviceTopic {
flag = api.FlagIdentityResponse
} else {
flag = api.FlagGreetingRespond
}
respond := &api.BaseMessage{
Body: "",
Flag: flag,
FromMatrixID: h.matrixID,
To: fromPeerID,
}
sendData, err := json.Marshal(respond)
if err != nil {
log.Println("Error occurred during marshalling the respond from IdentityRequest")
return
}
go func() {
h.PbMutex.Lock()
h.pb.Publish(topic, sendData)
h.PbMutex.Unlock()
}()
}

// Set Matrix ID
func (h *Handler) SetMatrixID(matrixID string) {
h.matrixID = matrixID
Expand Down Expand Up @@ -157,45 +166,69 @@ func (h *Handler) BlacklistPeer(pid peer.ID) {
}

// Requesting topics from **other** peers
func (h *Handler) RequestNetworkTopics(ctx context.Context) {
func (h *Handler) RequestNetworkTopics() {
requestTopicsMessage := &api.BaseMessage{
Body: "",
Flag: api.FlagTopicsRequest,
Body: "",
Flag: api.FlagTopicsRequest,
To: "",
FromMatrixID: h.matrixID,
}

h.sendMessageToServiceTopic(ctx, requestTopicsMessage)
h.sendMessageToServiceTopic(requestTopicsMessage)
}

// Requests MatrixID from other peers
func (h *Handler) RequestPeersIdentity(ctx context.Context) {
// Requests MatrixID from specific peer
// TODO: refactor with promise
func (h *Handler) RequestPeerIdentity(peerID string) {
requestPeersIdentity := &api.BaseMessage{
Body: "",
Flag: api.FlagIdentityRequest,
Body: "",
To: peerID,
Flag: api.FlagIdentityRequest,
FromMatrixID: h.matrixID,
}

h.sendMessageToServiceTopic(ctx, requestPeersIdentity)
h.sendMessageToServiceTopic(requestPeersIdentity)
}

// TODO: refactor
func (h *Handler) SendGreetingInTopic(topic string) {
greetingMessage := &api.BaseMessage{
Body: "",
To: "",
Flag: api.FlagGreeting,
FromMatrixID: h.matrixID,
}

h.sendMessageToTopic(topic, greetingMessage)
}

// TODO: refactor
func (h *Handler) SendFarewellInTopic(topic string) {
farewellMessage := &api.BaseMessage{
Body: "",
To: "",
Flag: api.FlagFarewell,
FromMatrixID: h.matrixID,
}

h.sendMessageToTopic(topic, farewellMessage)
}

// Sends marshaled message to the service topic
func (h *Handler) sendMessageToServiceTopic(ctx context.Context, message *api.BaseMessage) {
func (h *Handler) sendMessageToServiceTopic(message *api.BaseMessage) {
h.sendMessageToTopic(h.serviceTopic, message)
}

func (h *Handler) sendMessageToTopic(topic string, message *api.BaseMessage) {
sendData, err := json.Marshal(message)
if err != nil {
log.Println(err.Error())
return
}

ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()

for range ticker.C {
select {
case <-ctx.Done():
return
default:
}

go func() {
h.PbMutex.Lock()
h.pb.Publish(h.serviceTopic, sendData)
h.pb.Publish(topic, sendData)
h.PbMutex.Unlock()
}
}()
}

0 comments on commit 28ca0e9

Please sign in to comment.