Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BE2] Added vector clock #1945

Merged
merged 6 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.pattern.{AskableActorRef, ask}
import akka.stream.scaladsl.Flow
import ch.epfl.pop.decentralized.GossipManager.TriggerPullState
import ch.epfl.pop.model.network.MethodType.rumor_state
import ch.epfl.pop.model.network.method.Rumor
import ch.epfl.pop.model.network.method.{Rumor, RumorState}
import ch.epfl.pop.model.network.method.message.Message
import ch.epfl.pop.model.network.{JsonRpcRequest, JsonRpcResponse, MethodType}
import ch.epfl.pop.model.objects.{Channel, PublicKey, RumorData}
Expand Down Expand Up @@ -129,7 +129,15 @@ final case class GossipManager(dbActorRef: AskableActorRef, stopProbability: Dou
*/
private def startGossip(messages: Map[Channel, List[Message]]): Unit = {
if (publicKey.isDefined)
val rumor: Rumor = Rumor(publicKey.get, getRumorId(publicKey.get) + 1, messages)
var state: RumorState = RumorState(Map.empty)
val getRumorState = dbActorRef ? GetRumorState()
Await.result(getRumorState, duration) match
case DbActorGetRumorStateAck(rumorState) =>
state = rumorState
case _ =>
log.info(s"Actor (gossip) $self was not able to get its rumor state. Gossip has not started")
return
val rumor: Rumor = Rumor(publicKey.get, getRumorId(publicKey.get) + 1, messages, state)
val jsonRpcRequest = prepareRumor(rumor)
val writeRumor = dbActorRef ? DbActor.WriteRumor(rumor)
Await.result(writeRumor, duration) match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import ch.epfl.pop.json.HighLevelProtocol.RumorStateFormat
import ch.epfl.pop.model.objects.PublicKey
import spray.json.*

final case class RumorState(state: Map[PublicKey, Int]) extends Params {
final case class RumorState(state: Map[PublicKey, Int]) extends Params, Ordered[RumorState] {

override def hasChannel: Boolean = false

Expand All @@ -27,6 +27,20 @@ final case class RumorState(state: Map[PublicKey, Int]) extends Params {
otherRumorState.state.filter((pk, _) => !this.state.contains(pk)).map((pk, rumorId) => pk -> List.range(0, rumorId + 1))
}
}

override def compare(that: RumorState): Int = {
val stateDiff: Map[PublicKey, Int] = state.map { (pk, id) =>
that.state.get(pk) match
case Some(value) => pk -> (id - value)
case None => pk -> id
} ++ that.state.removedAll(state.keySet).map((pk, value) => (pk, -value))
if stateDiff.values.forall(_ <= 0) && stateDiff.values.exists(_ < 0) then
-1
else if stateDiff.values.forall(_ >= 0) && stateDiff.values.exists(_ > 0) then
1
else
0
}
}

object RumorState extends Parsable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,23 @@ object ProcessMessagesHandler extends AskPatternConstants {
case msg @ Right(JsonRpcResponse(_, Some(resultObject), None, jsonId)) =>
resultObject.resultRumor match
case Some(rumorList) =>
val mergedMsg = rumorList
.flatMap(_.messages)
.groupBy(_._1)
.view.mapValues(_.flatMap(_._2).toSet).toMap
if (processMsgMap(mergedMsg, messageRegistry) && writeRumorsInDb(dbActorRef, rumorList))
msg
val orderedRumors = rumorList.sortBy(_.timestamp)
println(s"orderedRumors $orderedRumors")
K1li4nL marked this conversation as resolved.
Show resolved Hide resolved
var processedRumors: List[Rumor] = List.empty
var successful = true
for rumor <- orderedRumors if successful do {
successful = rumorHandler(messageRegistry, rumor) && writeRumorInDb(dbActorRef, rumor)
processedRumors = processedRumors.prepended(rumor)
}
if !successful then
system.log.info(s"Failed to process all rumors from rumorStateAnswer $jsonId. Processed rumors where ${processedRumors.map(rumor => (rumor.senderPk, rumor.rumorId)).tail}")
Left(PipelineError(
ErrorCodes.SERVER_ERROR.id,
s"Rumor state handler was not able to process all rumors from $msg",
jsonId
))
else
Left(PipelineError(ErrorCodes.SERVER_ERROR.id, s"Rumor state handler was not able to process all rumors from $msg", jsonId))
msg
case _ => Left(PipelineError(
ErrorCodes.SERVER_ERROR.id,
s"Rumor state handler received an unexpected type of result $msg",
Expand All @@ -75,14 +84,11 @@ object ProcessMessagesHandler extends AskPatternConstants {
))
}

private def writeRumorsInDb(dbActorRef: AskableActorRef, rumors: List[Rumor]): Boolean = {
rumors.foreach { rumor =>
val writeRumor = dbActorRef ? WriteRumor(rumor)
Await.result(writeRumor, duration) match
case DbActorAck() => /* DO NOTHING*/
case _ => false
}
true
private def writeRumorInDb(dbActorRef: AskableActorRef, rumor: Rumor): Boolean = {
val writeRumor = dbActorRef ? WriteRumor(rumor)
Await.result(writeRumor, duration) match
case DbActorAck() => true
case _ => false
}

def rumorHandler(messageRegistry: MessageRegistry, rumor: Rumor)(implicit system: ActorSystem): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,38 @@ class RumorStateSuite extends FunSuite with Matchers {
val diffResult = rumorState1.isMissingRumorsFrom(rumorState2)
diffResult shouldBe Map.empty
}

test("comparator works") {
val rumorState1: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 3,
PublicKey(Base64Data.encode("2")) -> 5,
PublicKey(Base64Data.encode("3")) -> 3
))

val rumorState2: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 1,
PublicKey(Base64Data.encode("2")) -> 2
))

rumorState1 > rumorState2 shouldBe true

val rumorState3: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 4,
PublicKey(Base64Data.encode("2")) -> 6,
PublicKey(Base64Data.encode("3")) -> 3
))

rumorState1 < rumorState3 shouldBe true

val rumorState4: RumorState = RumorState(Map(
PublicKey(Base64Data.encode("1")) -> 2,
PublicKey(Base64Data.encode("2")) -> 6,
PublicKey(Base64Data.encode("3")) -> 3
))

rumorState1 < rumorState4 shouldBe false
rumorState1 > rumorState4 shouldBe false

List(rumorState1, rumorState2, rumorState3, rumorState4).sorted shouldBe List(rumorState2, rumorState1, rumorState4, rumorState3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ch.epfl.pop.pubsub.graph.validators.RpcValidator
import ch.epfl.pop.storage.DbActor.{DbActorReadRumorData, ReadRumorData}
import org.scalatest.matchers.should.Matchers
import org.scalatest.matchers.should.Matchers.{a, equal, should, shouldBe}
import util.examples.MessageExample

import scala.concurrent.Await

Expand All @@ -30,8 +31,7 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle
private var pubSubMediatorRef: ActorRef = _
private var dbActorRef: AskableActorRef = _
private var rumorStateAnsHandler: Flow[GraphMessage, GraphMessage, NotUsed] = _
private val rumorRequest = JsonRpcRequest.buildFromJson(IOHelper.readJsonFromPath("src/test/scala/util/examples/json/rumor/rumor_correct_msg.json"))
private val rumor = rumorRequest.getParams.asInstanceOf[Rumor]
private val rumorStateResponse = JsonRpcResponse.buildFromJson(IOHelper.readJsonFromPath("src/test/scala/util/examples/json/rumor_state/rumor_state_ans.json"))

override def beforeAll(): Unit = {
inMemoryStorage = InMemoryStorage()
Expand Down Expand Up @@ -59,15 +59,8 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle
}

test("rumor state ans processes all msg well") {
val splitMsg = rumor.messages.toList.flatMap((channel, msgList) => msgList.map(msg => (channel, List(msg))))
var rumorId = -1
val rumorList = splitMsg.map((channel, msgList) => {
rumorId += 1
Rumor(rumor.senderPk, rumorId, Map(channel -> msgList))
})
val rumorStateAnsMsg = Right(JsonRpcResponse(RpcValidator.JSON_RPC_VERSION, ResultObject(ResultRumor(rumorList)), Some(1)))

val output = Source.single(rumorStateAnsMsg).via(rumorStateAnsHandler).runWith(Sink.head)

val output = Source.single(Right(rumorStateResponse)).via(rumorStateAnsHandler).runWith(Sink.head)
Await.result(output, duration)

val ask = dbActorRef ? DbActor.GetAllChannels()
Expand All @@ -76,15 +69,16 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle
case err @ _ => Matchers.fail(err.toString)
}

val channelsInRumor = rumor.messages.keySet
val rumorList = rumorStateResponse.result.get.resultRumor.get
val channelsInRumor = rumorList.flatMap(rumor => rumor.messages.keySet).toSet
channelsInRumor.diff(channelsInDb) should equal(Set.empty)

val messagesInDb: Set[Message] = channelsInDb.foldLeft(Set.empty: Set[Message])((acc, channel) => acc ++ getMessages(channel))
val messagesInRumor = rumor.messages.values.foldLeft(Set.empty: Set[Message])((acc, set) => acc ++ set)
val messagesInRumor = rumorList.flatMap(rumor => rumor.messages.values).foldLeft(Set.empty: Set[Message])((acc, set) => acc ++ set)

messagesInRumor.diff(messagesInDb) should equal(Set.empty)

val readRumorData = dbActorRef ? ReadRumorData(rumor.senderPk)
val readRumorData = dbActorRef ? ReadRumorData(rumorList.head.senderPk)
Await.result(readRumorData, duration) match
case DbActorReadRumorData(rumorData: RumorData) =>
rumorData.rumorIds shouldBe rumorList.map(_.rumorId)
Expand All @@ -96,7 +90,8 @@ class RumorStateAnsHandlerSuite extends TestKit(ActorSystem("RumorStateAnsHandle
val outputResponseInt = Source.single(Right(responseInt)).via(rumorStateAnsHandler).runWith(Sink.head)
Await.result(outputResponseInt, duration) shouldBe a[Left[PipelineError, Nothing]]

val request = JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, publish, Publish(rumor.messages.head._1, rumor.messages.head._2.head), None)
val rumorList = rumorStateResponse.result.get.resultRumor.get
val request = JsonRpcRequest(RpcValidator.JSON_RPC_VERSION, publish, Publish(rumorList.head.messages.head._1, rumorList.head.messages.head._2.head), None)
val outputRequest = Source.single(Right(request)).via(rumorStateAnsHandler).runWith(Sink.head)
Await.result(outputRequest, duration) shouldBe a[Left[PipelineError, Nothing]]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"jsonrpc": "2.0",
"id": 6,
"result": [
{
"sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=",
"rumor_id": 0,
"messages": {
"/root": [
{
"data": "eyJvYmplY3QiOiJsYW8iLCJhY3Rpb24iOiJjcmVhdGUiLCJuYW1lIjoibmV3TGFvIiwiY3JlYXRpb24iOjE3MTkzMjE2NDgsIm9yZ2FuaXplciI6IlYxdEZoOTZUQVlrb3owLUtXdXo1cDhnNXljTGZJbWljY1Vod2VaR05DbzA9Iiwid2l0bmVzc2VzIjpbXSwiaWQiOiJPaGRHdXltVE9TS2kxSW9xSS1wYXV6bzZ2dHBqSXBmcDh1dU5RVlFScFlNPSJ9",
"message_id": "joRj4PrmoEHk0i4Ry9qKQkrQcJXLuM47OuN32JyYM0s=",
"sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=",
"signature": "YZYnz7LkkR288REVuBPDxjO4hcOxf3lBsTrsv7k-u27LMJksD0NFYehic6jA_xNxpegCGOqM8g5-bfuEnK9ZBg==",
"witness_signatures": []
}
]
},
"timestamp": {}
},
{
"sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=",
"rumor_id": 1,
"messages": {
"/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [
{
"data": "eyJvYmplY3QiOiJtZWV0aW5nIiwiYWN0aW9uIjoiY3JlYXRlIiwibmFtZSI6ImhlbGxvIiwiY3JlYXRpb24iOjE3MTkzMjE2NjgsImxvY2F0aW9uIjoia2pkZiIsInN0YXJ0IjoxNzE5MzIxNjY4LCJlbmQiOjE3MTkzMjUyNjMsImlkIjoiMHpieEZyTnZHSHN5UnJZV0FjdlhSbEE0YlZHYWFhbmhTbGVPTDA3UkVSYz0ifQ==",
"message_id": "QPsTmFQggaOlcAd45_DknceMRjjz-pJTNzg_zaciuaA=",
"sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=",
"signature": "3RiOapigtgKbExXAwuYHBei-rOIjIcVJlTmh5kzGLbOsBpAlAJRx9M50oMAUD_N4cu7axiWM0SjxzAh9v6QoBA==",
"witness_signatures": []
}
]
},
"timestamp": {
"723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 0
}
},
{
"sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=",
"rumor_id": 2,
"messages": {
"/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [
{
"data": "eyJvYmplY3QiOiJyb2xsX2NhbGwiLCJhY3Rpb24iOiJjcmVhdGUiLCJuYW1lIjoiY2JjdmIiLCJjcmVhdGlvbiI6MTcxOTMyMTY5OSwicHJvcG9zZWRfc3RhcnQiOjE3MTkzMjE2OTksInByb3Bvc2VkX2VuZCI6MTcxOTMyNTI5MywibG9jYXRpb24iOiJnZmRmZyIsImRlc2NyaXB0aW9uIjoiZ2RmZyIsImlkIjoiaGRLTUZ0dnE1Y3JROGExSzhjUjZuYXNCQS1Zek51dUtRbWpuZlpZOHRWWT0ifQ==",
"message_id": "UXPG4Fl7U9jva8XvCxCDudYrZ0DxX3i8ZupeeqqM8j4=",
"sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=",
"signature": "BzuZ5tbIPmOU2XuRy0PBPil7Uz6ltjDU5Sap4yL4zr_zGWJtkBSjO9NNFG79OKhztnSJrmAWcxCZgtvpQX7PAg==",
"witness_signatures": []
}
]
},
"timestamp": {
"723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 1
}
},
{
"sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=",
"rumor_id": 3,
"messages": {
"/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [
{
"data": "eyJvYmplY3QiOiJyb2xsX2NhbGwiLCJhY3Rpb24iOiJvcGVuIiwib3BlbmVkX2F0IjoxNzE5MzIxNzAyLCJvcGVucyI6ImhkS01GdHZxNWNyUThhMUs4Y1I2bmFzQkEtWXpOdXVLUW1qbmZaWTh0Vlk9IiwidXBkYXRlX2lkIjoiNXVZelpNdVFTRVJkWVdhREE2VUVTZW5vaW1nZGxZYXpVR0RSVWl5VmtzND0ifQ==",
"message_id": "EGwWI8N15z39MroOUvJoa0_fGVUdVYUNGJmyKiv3oX4=",
"sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=",
"signature": "fxfnImU04Ig3Eci134EzgBwBUAfiovajxCIyn5ZSJjQKm-o9A4L_qYsF_rgAWKrTMY6IvC5-LU4m5soNytpTCg==",
"witness_signatures": []
}
]
},
"timestamp": {
"723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 2
}
},
{
"sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=",
"rumor_id": 4,
"messages": {
"/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=": [
{
"data": "eyJvYmplY3QiOiJyb2xsX2NhbGwiLCJhY3Rpb24iOiJjbG9zZSIsImNsb3NlZF9hdCI6MTcxOTMyMTcwNSwiYXR0ZW5kZWVzIjpbImgwT3Z6NDgzS2ktWU5xZG1rTlp0ME1EVll4dnc5cXlTdEV6UEJzNGZWcmc9Il0sImNsb3NlcyI6IjV1WXpaTXVRU0VSZFlXYURBNlVFU2Vub2ltZ2RsWWF6VUdEUlVpeVZrczQ9IiwidXBkYXRlX2lkIjoiQVEybnM0Um5qZ1R6bTUxQ1ZIcEVOT3ZIS2xaZ0xyRndiNnhETWpPZzBjbz0ifQ==",
"message_id": "e2qLbMYAM_aekiN-87wTX2qDsG__qwpJswLiFkqPSRo=",
"sender": "V1tFh96TAYkoz0-KWuz5p8g5ycLfImiccUhweZGNCo0=",
"signature": "BGBOjVasCsUHWNTUsV_TvGlTCpDuxpwGeCWSA26CRogAzsMHzAOEwA0UDEqqMKN3NBOEuXgPWEKX4DrKTLLvAQ==",
"witness_signatures": []
}
]
},
"timestamp": {
"723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 3
}
},
{
"sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=",
"rumor_id": 5,
"messages": {
"/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=/social/h0Ovz483Ki-YNqdmkNZt0MDVYxvw9qyStEzPBs4fVrg=": [
{
"data": "eyJvYmplY3QiOiJjaGlycCIsImFjdGlvbiI6ImFkZCIsInRleHQiOiJoZWxsbG8iLCJ0aW1lc3RhbXAiOjE3MTkzMjE3MjB9",
"message_id": "IyDqOOxidwgO7EHp6SzH7tznn___PZWkMU3WsGlywRc=",
"sender": "h0Ovz483Ki-YNqdmkNZt0MDVYxvw9qyStEzPBs4fVrg=",
"signature": "keLO9dwyitIMaar7wlejoeg9vzQ-Vm_E4Uax85dyDVUrGfGsY31Vf2f_uNELsnO4ebiwQo0AQ_wNWo5F_9HJCA==",
"witness_signatures": []
}
]
},
"timestamp": {
"723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 4
}
},
{
"sender_id": "723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=",
"rumor_id": 6,
"messages": {
"/root/OhdGuymTOSKi1IoqI-pauzo6vtpjIpfp8uuNQVQRpYM=/social/reactions": [
{
"data": "eyJvYmplY3QiOiJyZWFjdGlvbiIsImFjdGlvbiI6ImFkZCIsInJlYWN0aW9uX2NvZGVwb2ludCI6IvCfkY0iLCJjaGlycF9pZCI6Ikl5RHFPT3hpZHdnTzdFSHA2U3pIN3R6bm5fX19QWldrTVUzV3NHbHl3UmM9IiwidGltZXN0YW1wIjoxNzE5MzIxNzIxfQ==",
"message_id": "kfLq_6gr5wgMH0_5DbW6ILVPMJU_EjhYV4UUb7ECkn4=",
"sender": "h0Ovz483Ki-YNqdmkNZt0MDVYxvw9qyStEzPBs4fVrg=",
"signature": "IsMRoWfpJUzkBSeyqaju_vysZ_UztMoNIoVyncTFHI9dwKmtaaKqmD6bHqdT09RuOI9cFt-iLCmhH31AHdU2DQ==",
"witness_signatures": []
}
]
},
"timestamp": {
"723zm3j-R47B8KHtMvun7yaHB02CWpEFyJj4ZZTt6IE=": 5
}
}
]
}
Loading