Skip to content

Commit

Permalink
Send rumors state when connecting
Browse files Browse the repository at this point in the history
  • Loading branch information
arnauds5 committed Jun 27, 2024
1 parent e68e9ee commit 4ffcbf1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type Config interface {
GetServerInfo() (string, string, string, error)
}

type RumorStateSender interface {
SendRumorStateTo(socket socket.Socket) error
}

type Repository interface {
// HasMessage returns true if the message already exists.
HasMessage(messageID string) (bool, error)
Expand Down Expand Up @@ -85,19 +89,21 @@ type Handler struct {
sockets Sockets
conf Config
db Repository
rumors RumorStateSender
schema *validation.SchemaValidator
log zerolog.Logger
}

func New(hub Hub, subs Subscribers, sockets Sockets, conf Config,
db Repository, schema *validation.SchemaValidator,
db Repository, rumors RumorStateSender, schema *validation.SchemaValidator,
log zerolog.Logger) *Handler {
return &Handler{
hub: hub,
subs: subs,
sockets: sockets,
conf: conf,
db: db,
rumors: rumors,
schema: schema,
log: log.With().Str("module", "federation").Logger(),
}
Expand Down Expand Up @@ -366,19 +372,28 @@ func (h *Handler) handleChallenge(msg mmessage.Message, channelPath string,
// called => broadcast the result to both federation channels directly.
_ = h.db.StoreMessageAndData(remoteChannel, resultMsg)
_ = h.subs.BroadcastToAllClients(resultMsg, remoteChannel)

h.log.Info().Msgf("A federation was created with the local LAO %s",
federationExpect.LaoId)
} else {
// Add the socket to the list of server sockets
h.sockets.Upsert(socket)

// Send the rumor state directly to avoid delay while syncing
err = h.rumors.SendRumorStateTo(socket)
if err != nil {
return err
}

// publish the FederationResult to the other server
err = h.publishTo(resultMsg, remoteChannel, socket)
if err != nil {
return err
}

// Add the socket to the list of server sockets
h.sockets.Upsert(socket)
h.log.Info().Msgf("A federation was created with the LAO %s from: %s",
federationExpect.LaoId, federationExpect.ServerAddress)
}

h.log.Info().Msgf("A federation was successfully")

// broadcast the FederationResult to the local organizer
return h.subs.BroadcastToAllClients(resultMsg, channelPath)
}
Expand Down Expand Up @@ -530,7 +545,7 @@ func (h *Handler) connectTo(serverAddress string) (socket.Socket, error) {
go server.WritePump()
go server.ReadPump()

return server, nil
return server, h.rumors.SendRumorStateTo(server)
}

func (h *Handler) createMessage(data channel.MessageData) (mmessage.Message, error) {
Expand Down
3 changes: 2 additions & 1 deletion be1-go/internal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ func New(dbPath string, ownerPubKey kyber.Point, clientAddress, serverAddress st
rumorStateHandler := hrumorstate.New(queries, sockets, &db, log)

// Create the federation handler
federationHandler := hfederation.New(hubParams, subs, sockets, conf, &db, schemaValidator, log)
federationHandler := hfederation.New(hubParams, subs, sockets, conf, &db,
rumorStateHandler, schemaValidator, log)

// Create the query handler
methodHandlers := make(hquery.MethodHandlers)
Expand Down

0 comments on commit 4ffcbf1

Please sign in to comment.