From 218c655e4b97141501009f2995be3feca0ad6eae Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Tue, 25 Jun 2024 14:06:39 +0200 Subject: [PATCH 1/5] added order to rumorState --- .../pop/decentralized/GossipManager.scala | 12 +++++-- .../pop/model/network/method/RumorState.scala | 16 ++++++++- .../network/method/RumorStateSuite.scala | 34 +++++++++++++++++++ 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala index 7269be0d11..c0016030d2 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/decentralized/GossipManager.scala @@ -4,7 +4,7 @@ import akka.NotUsed import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.pattern.{AskableActorRef, ask} import akka.stream.scaladsl.Flow -import ch.epfl.pop.model.network.method.Rumor +import ch.epfl.pop.model.network.method.{Rumor, RumorState} import ch.epfl.pop.model.network.method.message.Message import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType} import ch.epfl.pop.model.objects.{Channel, PublicKey, RumorData} @@ -110,7 +110,15 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou */ private def startGossip(messages: Map[Channel, List[Message]]): Unit = { if (publicKey.isDefined) - val rumor: Rumor = Rumor(publicKey.get, getRumorId(publicKey.get) + 1, messages) + var state: RumorState = RumorState(Map.empty) + val getRumorState = dbActorRef ? GetRumorState() + Await.result(getRumorState, duration) match + case DbActorGetRumorStateAck(rumorState) => + state = rumorState + case _ => + log.info(s"Actor (gossip) $self was not able to get its rumor state. Gossip has not started") + return + val rumor: Rumor = Rumor(publicKey.get, getRumorId(publicKey.get) + 1, messages, state) val jsonRpcRequest = prepareRumor(rumor) val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor) Await.result(writeRumor, duration) match diff --git a/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/RumorState.scala b/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/RumorState.scala index 0ac4a6b77f..3bfd9722b3 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/RumorState.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/model/network/method/RumorState.scala @@ -5,7 +5,7 @@ import ch.epfl.pop.json.HighLevelProtocol.RumorStateFormat import ch.epfl.pop.model.objects.PublicKey import spray.json.* -final case class RumorState(state: Map[PublicKey, Int]) extends Params { +final case class RumorState(state: Map[PublicKey, Int]) extends Params, Ordered[RumorState] { override def hasChannel: Boolean = false @@ -27,6 +27,20 @@ final case class RumorState(state: Map[PublicKey, Int]) extends Params { otherRumorState.state.filter((pk, _) => !this.state.contains(pk)).map((pk, rumorId) => pk -> List.range(0, rumorId + 1)) } } + + override def compare(that: RumorState): Int = { + val stateDiff: Map[PublicKey, Int] = state.map { (pk, id) => + that.state.get(pk) match + case Some(value) => pk -> (id - value) + case None => pk -> id + } ++ that.state.removedAll(state.keySet).map((pk, value) => (pk, -value)) + if stateDiff.values.forall(_ <= 0) && stateDiff.values.exists(_ < 0) then + -1 + else if stateDiff.values.forall(_ >= 0) && stateDiff.values.exists(_ > 0) then + 1 + else + 0 + } } object RumorState extends Parsable { diff --git a/be2-scala/src/test/scala/ch/epfl/pop/model/network/method/RumorStateSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/model/network/method/RumorStateSuite.scala index 9dfc4a71be..0d33571c4e 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/model/network/method/RumorStateSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/model/network/method/RumorStateSuite.scala @@ -56,4 +56,38 @@ class RumorStateSuite extends FunSuite with Matchers { val diffResult = rumorState1.isMissingRumorsFrom(rumorState2) diffResult shouldBe Map.empty } + + test("comparator works") { + val rumorState1: RumorState = RumorState(Map( + PublicKey(Base64Data.encode("1")) -> 3, + PublicKey(Base64Data.encode("2")) -> 5, + PublicKey(Base64Data.encode("3")) -> 3 + )) + + val rumorState2: RumorState = RumorState(Map( + PublicKey(Base64Data.encode("1")) -> 1, + PublicKey(Base64Data.encode("2")) -> 2 + )) + + rumorState1 > rumorState2 shouldBe true + + val rumorState3: RumorState = RumorState(Map( + PublicKey(Base64Data.encode("1")) -> 4, + PublicKey(Base64Data.encode("2")) -> 6, + PublicKey(Base64Data.encode("3")) -> 3 + )) + + rumorState1 < rumorState3 shouldBe true + + val rumorState4: RumorState = RumorState(Map( + PublicKey(Base64Data.encode("1")) -> 2, + PublicKey(Base64Data.encode("2")) -> 6, + PublicKey(Base64Data.encode("3")) -> 3 + )) + + rumorState1 < rumorState4 shouldBe false + rumorState1 > rumorState4 shouldBe false + + List(rumorState1, rumorState2, rumorState3, rumorState4).sorted shouldBe List(rumorState2, rumorState1, rumorState4, rumorState3) + } } From c358e0ed16634894d4db520e4b053728b92ba399 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Tue, 25 Jun 2024 14:57:13 +0200 Subject: [PATCH 2/5] added processing order --- .../handlers/ProcessMessagesHandler.scala | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala index 80dbe97908..77f87c2cd8 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala @@ -55,14 +55,22 @@ object ProcessMessagesHandler extends AskPatternConstants { case msg @ Right(JsonRpcResponse(_, Some(resultObject), None, jsonId)) => resultObject.resultRumor match case Some(rumorList) => - val mergedMsg = rumorList - .flatMap(_.messages) - .groupBy(_._1) - .view.mapValues(_.flatMap(_._2).toSet).toMap - if (processMsgMap(mergedMsg, messageRegistry) && writeRumorsInDb(dbActorRef, rumorList)) - msg + val orderedRumors = rumorList.sortBy(_.timestamp) + var processedRumors: List[Rumor] = List.empty + var failed = false + for rumor <- orderedRumors if !failed do { + failed = rumorHandler(messageRegistry, rumor) && writeRumorInDb(dbActorRef, rumor) + processedRumors = processedRumors.appended(rumor) + } + if failed then + system.log.info(s"Failed to process all rumors from rumorStateAnswer $jsonId. Processed rumors where ${processedRumors.map(rumor => (rumor.senderPk, rumor.rumorId))}") + Left(PipelineError( + ErrorCodes.SERVER_ERROR.id, + s"Rumor state handler was not able to process all rumors from $msg", + jsonId + )) else - Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"Rumor state handler was not able to process all rumors from $msg", jsonId)) + msg case _ => Left(PipelineError( ErrorCodes.SERVER_ERROR.id, s"Rumor state handler received an unexpected type of result $msg", @@ -75,14 +83,11 @@ object ProcessMessagesHandler extends AskPatternConstants { )) } - private def writeRumorsInDb(dbActorRef: AskableActorRef, rumors: List[Rumor]): Boolean = { - rumors.foreach { rumor => - val writeRumor = dbActorRef ? WriteRumor(rumor) - Await.result(writeRumor, duration) match - case DbActorAck() => /* DO NOTHING*/ - case _ => false - } - true + private def writeRumorInDb(dbActorRef: AskableActorRef, rumor: Rumor): Boolean = { + val writeRumor = dbActorRef ? WriteRumor(rumor) + Await.result(writeRumor, duration) match + case DbActorAck() => true + case _ => false } def rumorHandler(messageRegistry: MessageRegistry, rumor: Rumor)(implicit system: ActorSystem): Boolean = { From d3721edf557c7243661ab2147ee810cf5e989c0b Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Tue, 25 Jun 2024 15:55:27 +0200 Subject: [PATCH 3/5] add new rumorStateAns --- .../handlers/ProcessMessagesHandler.scala | 1 + .../handlers/RumorStateAnsHandlerSuite.scala | 28 ++-- .../json/rumor_state/rumor_state_ans.json | 130 ++++++++++++++++++ 3 files changed, 144 insertions(+), 15 deletions(-) create mode 100644 be2-scala/src/test/scala/util/examples/json/rumor_state/rumor_state_ans.json diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala index 77f87c2cd8..8663696ff4 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala @@ -56,6 +56,7 @@ object ProcessMessagesHandler extends AskPatternConstants { resultObject.resultRumor match case Some(rumorList) => val orderedRumors = rumorList.sortBy(_.timestamp) + println(s"orderedRumors $orderedRumors") var processedRumors: List[Rumor] = List.empty var failed = false for rumor <- orderedRumors if !failed do { diff --git a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala index ecf083e858..e0162d0e1f 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala @@ -21,6 +21,7 @@ import ch.epfl.pop.pubsub.graph.validators.RpcValidator import ch.epfl.pop.storage.DbActor.{DbActorReadRumorData, ReadRumorData} import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers.{a, equal, should, shouldBe} +import util.examples.MessageExample import scala.concurrent.Await @@ -30,8 +31,7 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle private var pubSubMediatorRef: ActorRef = _ private var dbActorRef: AskableActorRef = _ private var rumorStateAnsHandler: Flow[GraphMessage, GraphMessage, NotUsed] = _ - private val rumorRequest = JsonRpcRequest.buildFromJson(IOHelper.readJsonFromPath("src/test/scala/util/examples/json/rumor/rumor_correct_msg.json")) - private val rumor = rumorRequest.getParams.asInstanceOf[Rumor] + private val rumorStateResponse = JsonRpcResponse.buildFromJson(IOHelper.readJsonFromPath("src/test/scala/util/examples/json/rumor_state/rumor_state_ans.json")) override def beforeAll(): Unit = { inMemoryStorage = InMemoryStorage() @@ -59,15 +59,9 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle } test("rumor state ans processes all msg well") { - val splitMsg = rumor.messages.toList.flatMap((channel, msgList) => msgList.map(msg => (channel, List(msg)))) - var rumorId = -1 - val rumorList = splitMsg.map((channel, msgList) => { - rumorId += 1 - Rumor(rumor.senderPk, rumorId, Map(channel -> msgList)) - }) - val rumorStateAnsMsg = Right(JsonRpcResponse(RpcValidator.JSON_RPC_VERSION, ResultObject(ResultRumor(rumorList)), Some(1))) - - val output = Source.single(rumorStateAnsMsg).via(rumorStateAnsHandler).runWith(Sink.head) + + + val output = Source.single(Right(rumorStateResponse)).via(rumorStateAnsHandler).runWith(Sink.head) Await.result(output, duration) val ask = dbActorRef ? DbActor.GetAllChannels() @@ -76,15 +70,17 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle case err @ _ => Matchers.fail(err.toString) } - val channelsInRumor = rumor.messages.keySet + val rumorList = rumorStateResponse.result.get.resultRumor.get + println(rumorList) + val channelsInRumor = rumorList.flatMap(rumor => rumor.messages.keySet).toSet channelsInRumor.diff(channelsInDb) should equal(Set.empty) val messagesInDb: Set[Message] = channelsInDb.foldLeft(Set.empty: Set[Message])((acc, channel) => acc ++ getMessages(channel)) - val messagesInRumor = rumor.messages.values.foldLeft(Set.empty: Set[Message])((acc, set) => acc ++ set) + val messagesInRumor = rumorList.flatMap(rumor => rumor.messages.values).foldLeft(Set.empty: Set[Message])((acc, set) => acc ++ set) messagesInRumor.diff(messagesInDb) should equal(Set.empty) - val readRumorData = dbActorRef ? ReadRumorData(rumor.senderPk) + val readRumorData = dbActorRef ? ReadRumorData(rumorList.head.senderPk) Await.result(readRumorData, duration) match case DbActorReadRumorData(rumorData: RumorData) => rumorData.rumorIds shouldBe rumorList.map(_.rumorId) @@ -96,7 +92,9 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle val outputResponseInt = Source.single(Right(responseInt)).via(rumorStateAnsHandler).runWith(Sink.head) Await.result(outputResponseInt, duration) shouldBe a[Left[PipelineError, Nothing]] - val request = JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, publish, Publish(rumor.messages.head._1, rumor.messages.head._2.head), None) + + val rumorList = rumorStateResponse.result.get.resultRumor.get + val request = JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, publish, Publish(rumorList.head.messages.head._1, rumorList.head.messages.head._2.head), None) val outputRequest = Source.single(Right(request)).via(rumorStateAnsHandler).runWith(Sink.head) Await.result(outputRequest, duration) shouldBe a[Left[PipelineError, Nothing]] diff --git a/be2-scala/src/test/scala/util/examples/json/rumor_state/rumor_state_ans.json b/be2-scala/src/test/scala/util/examples/json/rumor_state/rumor_state_ans.json new file mode 100644 index 0000000000..469e328179 --- /dev/null +++ b/be2-scala/src/test/scala/util/examples/json/rumor_state/rumor_state_ans.json @@ -0,0 +1,130 @@ +{ + "jsonrpc": "2.0", + "id": 6, + "result": [ + { + "sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=", + "rumor_id": 0, + "messages": { + "/root": [ + { + "data": "eyJvYmplY3QiOiJsYW8iLCJhY3Rpb24iOiJjcmVhdGUiLCJuYW1lIjoibmV3TGFvIiwiY3JlYXRpb24iOjE3MTkzMjE2NDgsIm9yZ2FuaXplciI6IlYxdEZoOTZUQVlrb3owLUtXdXo1cDhnNXljTGZJbWljY1Vod2VaR05DbzA9Iiwid2l0bmVzc2VzIjpbXSwiaWQiOiJPaGRHdXltVE9TS2kxSW9xSS1wYXV6bzZ2dHBqSXBmcDh1dU5RVlFScFlNPSJ9", + "message_id": "joRj4PrmoEHk0i4Ry9qKQkrQcJXLuM47OuN32JyYM0s=", + "sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=", + "signature": "YZYnz7LkkR288REVuBPDxjO4hcOxf3lBsTrsv7k-u27LMJksD0NFYehic6jA_xNxpegCGOqM8g5-bfuEnK9ZBg==", + "witness_signatures": [] + } + ] + }, + "timestamp": {} + }, + { + "sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=", + "rumor_id": 1, + "messages": { + "/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [ + { + "data": "eyJvYmplY3QiOiJtZWV0aW5nIiwiYWN0aW9uIjoiY3JlYXRlIiwibmFtZSI6ImhlbGxvIiwiY3JlYXRpb24iOjE3MTkzMjE2NjgsImxvY2F0aW9uIjoia2pkZiIsInN0YXJ0IjoxNzE5MzIxNjY4LCJlbmQiOjE3MTkzMjUyNjMsImlkIjoiMHpieEZyTnZHSHN5UnJZV0FjdlhSbEE0YlZHYWFhbmhTbGVPTDA3UkVSYz0ifQ==", + "message_id": "QPsTmFQggaOlcAd45_DknceMRjjz-pJTNzg_zaciuaA=", + "sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=", + "signature": "3RiOapigtgKbExXAwuYHBei-rOIjIcVJlTmh5kzGLbOsBpAlAJRx9M50oMAUD_N4cu7axiWM0SjxzAh9v6QoBA==", + "witness_signatures": [] + } + ] + }, + "timestamp": { + "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 0 + } + }, + { + "sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=", + "rumor_id": 2, + "messages": { + "/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [ + { + "data": "eyJvYmplY3QiOiJyb2xsX2NhbGwiLCJhY3Rpb24iOiJjcmVhdGUiLCJuYW1lIjoiY2JjdmIiLCJjcmVhdGlvbiI6MTcxOTMyMTY5OSwicHJvcG9zZWRfc3RhcnQiOjE3MTkzMjE2OTksInByb3Bvc2VkX2VuZCI6MTcxOTMyNTI5MywibG9jYXRpb24iOiJnZmRmZyIsImRlc2NyaXB0aW9uIjoiZ2RmZyIsImlkIjoiaGRLTUZ0dnE1Y3JROGExSzhjUjZuYXNCQS1Zek51dUtRbWpuZlpZOHRWWT0ifQ==", + "message_id": "UXPG4Fl7U9jva8XvCxCDudYrZ0DxX3i8ZupeeqqM8j4=", + "sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=", + "signature": "BzuZ5tbIPmOU2XuRy0PBPil7Uz6ltjDU5Sap4yL4zr_zGWJtkBSjO9NNFG79OKhztnSJrmAWcxCZgtvpQX7PAg==", + "witness_signatures": [] + } + ] + }, + "timestamp": { + "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 1 + } + }, + { + "sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=", + "rumor_id": 3, + "messages": { + "/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [ + { + "data": "eyJvYmplY3QiOiJyb2xsX2NhbGwiLCJhY3Rpb24iOiJvcGVuIiwib3BlbmVkX2F0IjoxNzE5MzIxNzAyLCJvcGVucyI6ImhkS01GdHZxNWNyUThhMUs4Y1I2bmFzQkEtWXpOdXVLUW1qbmZaWTh0Vlk9IiwidXBkYXRlX2lkIjoiNXVZelpNdVFTRVJkWVdhREE2VUVTZW5vaW1nZGxZYXpVR0RSVWl5VmtzND0ifQ==", + "message_id": "EGwWI8N15z39MroOUvJoa0_fGVUdVYUNGJmyKiv3oX4=", + "sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=", + "signature": "fxfnImU04Ig3Eci134EzgBwBUAfiovajxCIyn5ZSJjQKm-o9A4L_qYsF_rgAWKrTMY6IvC5-LU4m5soNytpTCg==", + "witness_signatures": [] + } + ] + }, + "timestamp": { + "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 2 + } + }, + { + "sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=", + "rumor_id": 4, + "messages": { + "/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [ + { + "data": "eyJvYmplY3QiOiJyb2xsX2NhbGwiLCJhY3Rpb24iOiJjbG9zZSIsImNsb3NlZF9hdCI6MTcxOTMyMTcwNSwiYXR0ZW5kZWVzIjpbImgwT3Z6NDgzS2ktWU5xZG1rTlp0ME1EVll4dnc5cXlTdEV6UEJzNGZWcmc9Il0sImNsb3NlcyI6IjV1WXpaTXVRU0VSZFlXYURBNlVFU2Vub2ltZ2RsWWF6VUdEUlVpeVZrczQ9IiwidXBkYXRlX2lkIjoiQVEybnM0Um5qZ1R6bTUxQ1ZIcEVOT3ZIS2xaZ0xyRndiNnhETWpPZzBjbz0ifQ==", + "message_id": "e2qLbMYAM_aekiN-87wTX2qDsG__qwpJswLiFkqPSRo=", + "sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=", + "signature": "BGBOjVasCsUHWNTUsV_TvGlTCpDuxpwGeCWSA26CRogAzsMHzAOEwA0UDEqqMKN3NBOEuXgPWEKX4DrKTLLvAQ==", + "witness_signatures": [] + } + ] + }, + "timestamp": { + "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 3 + } + }, + { + "sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=", + "rumor_id": 5, + "messages": { + "/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=/social/h0Ovz483Ki-YNqdmkNZt0MDVYxvw9qyStEzPBs4fVrg=": [ + { + "data": "eyJvYmplY3QiOiJjaGlycCIsImFjdGlvbiI6ImFkZCIsInRleHQiOiJoZWxsbG8iLCJ0aW1lc3RhbXAiOjE3MTkzMjE3MjB9", + "message_id": "IyDqOOxidwgO7EHp6SzH7tznn___PZWkMU3WsGlywRc=", + "sender": "h0Ovz483Ki-YNqdmkNZt0MDVYxvw9qyStEzPBs4fVrg=", + "signature": "keLO9dwyitIMaar7wlejoeg9vzQ-Vm_E4Uax85dyDVUrGfGsY31Vf2f_uNELsnO4ebiwQo0AQ_wNWo5F_9HJCA==", + "witness_signatures": [] + } + ] + }, + "timestamp": { + "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 4 + } + }, + { + "sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=", + "rumor_id": 6, + "messages": { + "/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=/social/reactions": [ + { + "data": "eyJvYmplY3QiOiJyZWFjdGlvbiIsImFjdGlvbiI6ImFkZCIsInJlYWN0aW9uX2NvZGVwb2ludCI6IvCfkY0iLCJjaGlycF9pZCI6Ikl5RHFPT3hpZHdnTzdFSHA2U3pIN3R6bm5fX19QWldrTVUzV3NHbHl3UmM9IiwidGltZXN0YW1wIjoxNzE5MzIxNzIxfQ==", + "message_id": "kfLq_6gr5wgMH0_5DbW6ILVPMJU_EjhYV4UUb7ECkn4=", + "sender": "h0Ovz483Ki-YNqdmkNZt0MDVYxvw9qyStEzPBs4fVrg=", + "signature": "IsMRoWfpJUzkBSeyqaju_vysZ_UztMoNIoVyncTFHI9dwKmtaaKqmD6bHqdT09RuOI9cFt-iLCmhH31AHdU2DQ==", + "witness_signatures": [] + } + ] + }, + "timestamp": { + "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 5 + } + } + ] +} From d949097b3e532092c77a0a981eb6d149520463b5 Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Tue, 25 Jun 2024 17:08:20 +0200 Subject: [PATCH 4/5] small fix and adapt test --- .../graph/handlers/ProcessMessagesHandler.scala | 12 ++++++------ .../graph/handlers/RumorStateAnsHandlerSuite.scala | 3 --- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala index 8663696ff4..23d8605fa4 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala @@ -58,13 +58,13 @@ object ProcessMessagesHandler extends AskPatternConstants { val orderedRumors = rumorList.sortBy(_.timestamp) println(s"orderedRumors $orderedRumors") var processedRumors: List[Rumor] = List.empty - var failed = false - for rumor <- orderedRumors if !failed do { - failed = rumorHandler(messageRegistry, rumor) && writeRumorInDb(dbActorRef, rumor) - processedRumors = processedRumors.appended(rumor) + var successful = true + for rumor <- orderedRumors if successful do { + successful = rumorHandler(messageRegistry, rumor) && writeRumorInDb(dbActorRef, rumor) + processedRumors = processedRumors.prepended(rumor) } - if failed then - system.log.info(s"Failed to process all rumors from rumorStateAnswer $jsonId. Processed rumors where ${processedRumors.map(rumor => (rumor.senderPk, rumor.rumorId))}") + if !successful then + system.log.info(s"Failed to process all rumors from rumorStateAnswer $jsonId. Processed rumors where ${processedRumors.map(rumor => (rumor.senderPk, rumor.rumorId)).tail}") Left(PipelineError( ErrorCodes.SERVER_ERROR.id, s"Rumor state handler was not able to process all rumors from $msg", diff --git a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala index e0162d0e1f..0c58dcd594 100644 --- a/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala +++ b/be2-scala/src/test/scala/ch/epfl/pop/pubsub/graph/handlers/RumorStateAnsHandlerSuite.scala @@ -60,7 +60,6 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle test("rumor state ans processes all msg well") { - val output = Source.single(Right(rumorStateResponse)).via(rumorStateAnsHandler).runWith(Sink.head) Await.result(output, duration) @@ -71,7 +70,6 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle } val rumorList = rumorStateResponse.result.get.resultRumor.get - println(rumorList) val channelsInRumor = rumorList.flatMap(rumor => rumor.messages.keySet).toSet channelsInRumor.diff(channelsInDb) should equal(Set.empty) @@ -92,7 +90,6 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle val outputResponseInt = Source.single(Right(responseInt)).via(rumorStateAnsHandler).runWith(Sink.head) Await.result(outputResponseInt, duration) shouldBe a[Left[PipelineError, Nothing]] - val rumorList = rumorStateResponse.result.get.resultRumor.get val request = JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, publish, Publish(rumorList.head.messages.head._1, rumorList.head.messages.head._2.head), None) val outputRequest = Source.single(Right(request)).via(rumorStateAnsHandler).runWith(Sink.head) From 98aa6bd0f6feb1f82dae60742485f6a1bcca26ed Mon Sep 17 00:00:00 2001 From: Daniel Tavares Agostinho Date: Tue, 25 Jun 2024 17:37:06 +0200 Subject: [PATCH 5/5] removed print --- .../epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala index 23d8605fa4..9899587525 100644 --- a/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala +++ b/be2-scala/src/main/scala/ch/epfl/pop/pubsub/graph/handlers/ProcessMessagesHandler.scala @@ -56,7 +56,6 @@ object ProcessMessagesHandler extends AskPatternConstants { resultObject.resultRumor match case Some(rumorList) => val orderedRumors = rumorList.sortBy(_.timestamp) - println(s"orderedRumors $orderedRumors") var processedRumors: List[Rumor] = List.empty var successful = true for rumor <- orderedRumors if successful do {