Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BE2] fix send state #1979

Merged
merged 1 commit into from
Jun 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.GossipManager.TriggerPullState
import ch.epfl.pop.model.network.MethodType.rumor_state
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.network.method.{GreetServer, Rumor, RumorState}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.method.message.data.ActionType
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
Expand Down Expand Up @@ -163,27 +163,36 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case None => -1
}

private def sendRumorState(): Unit = {
val randomPeer = connectionMediatorRef ? ConnectionMediator.GetRandomPeer()
Await.result(randomPeer, duration) match {
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
val rumorStateGet = dbActorRef ? GetRumorState()
Await.result(rumorStateGet, duration) match
case DbActorGetRumorStateAck(rumorState) =>
log.info(s"Sending rumor_state ${rumorState.state} to ${greetServer.serverAddress}")
serverRef ! ClientAnswer(
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
rumor_state,
rumorState,
Some(jsonId)
))
)
jsonId += 1
case _ => log.error(s"Actor $self failed on creating rumor state. State wasn't gossiped.")
case m @ _ =>
log.warning(s"Received an unexpected message $m waiting for a random peer")
}
private def sendRumorState(actorRefDest: ActorRef = ActorRef.noSender, greetServerDest: Option[GreetServer] = None): Unit = {
var serverRef = ActorRef.noSender
var greetServer_ : Option[GreetServer] = None
if actorRefDest == Actor.noSender then
val randomPeerGet = connectionMediatorRef ? ConnectionMediator.GetRandomPeer()
serverRef = Await.result(randomPeerGet, duration) match {
case ConnectionMediator.GetRandomPeerAck(serverRef, greetServer) =>
greetServer_ = Some(greetServer)
serverRef
case m @ _ =>
log.warning(s"Received an unexpected message $m waiting for a random peer")
serverRef
}
else
serverRef = actorRefDest
greetServer_ = greetServerDest
val rumorStateGet = dbActorRef ? GetRumorState()
Await.result(rumorStateGet, duration) match
case DbActorGetRumorStateAck(rumorState) =>
log.info(s"Sending rumor_state ${rumorState.state} to ${if greetServer_.isDefined then greetServer_.get.serverAddress else ""}")
serverRef ! ClientAnswer(
Right(JsonRpcRequest(
RpcValidator.JSON_RPC_VERSION,
rumor_state,
rumorState,
Some(jsonId)
))
)
jsonId += 1
case _ => log.error(s"Actor $self failed on creating rumor state. State wasn't gossiped.")
}

private def peersAlreadyReceived(jsonRpcRequest: JsonRpcRequest): Set[ActorRef] = {
Expand Down Expand Up @@ -225,6 +234,9 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
case Monitor.NoServerConnected =>
timers.cancel(periodicRumorStateKey)

case ConnectionMediator.NewServerConnected(serverRef, greetServer) =>
sendRumorState(serverRef, Some(greetServer))

case TriggerPullState() =>
sendRumorState()

Expand Down
Loading