Skip to content

Commit

Permalink
realtime: поддержка ignore list
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcom committed Dec 17, 2023
1 parent 42f5012 commit 8bfb47f
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 52 deletions.
111 changes: 59 additions & 52 deletions src/main/scala/ru/org/linux/realtime/RealtimeEventHub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import ru.org.linux.comment.CommentReadService
import ru.org.linux.realtime.RealtimeEventHub.*
import ru.org.linux.spring.SiteConfig
import ru.org.linux.topic.{TopicDao, TopicPermissionService}
import ru.org.linux.user.IgnoreListDao

import java.io.IOException
import scala.collection.mutable
Expand All @@ -42,19 +43,19 @@ import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
import scala.util.control.NonFatal

// TODO ignore list support
// TODO fix face conditions on simultaneous posting comment, subscription and missing processing
object RealtimeEventHub {
sealed trait SessionProtocol

sealed trait Protocol

case class NewComment(msgid: Int, cid: Int) extends SessionProtocol with Protocol
case class NewComment(msgid: Int, cid: Int) extends Protocol
case class NewCommentOnly(cid: Int) extends SessionProtocol
case class RefreshEvents(users: Set[Int]) extends SessionProtocol with Protocol
private[realtime] case object Tick extends SessionProtocol with Protocol

private[realtime] case class SessionStarted(session: WebSocketSession, user: Option[Int], replyTo: ActorRef[Done.type]) extends Protocol
private[realtime] case class SubscribeTopic(session: WebSocketSession, topic: Int, replyTo: ActorRef[Done.type]) extends Protocol
private[realtime] case class SubscribeTopic(session: WebSocketSession, topic: Int, missedComments: Seq[Int],
replyTo: ActorRef[Done.type]) extends Protocol
private[realtime] case class SessionTerminated(session: String) extends Protocol

private[realtime] case object TerminateSession extends SessionProtocol
Expand All @@ -71,7 +72,7 @@ object RealtimeEventHub {
realtimeEventHub ! RefreshEvents(users.asScala.map(_.toInt).toSet)
}

def behavior: Behavior[Protocol] = Behaviors.setup { context =>
def behavior(ignoreListDao: IgnoreListDao): Behavior[Protocol] = Behaviors.setup { context =>
val topicSubscriptions: mutable.MultiDict[Int, ActorRef[SessionProtocol]] = mutable.MultiDict[Int, ActorRef[SessionProtocol]]()
val userSubscriptions: mutable.MultiDict[Int, ActorRef[SessionProtocol]] = mutable.MultiDict[Int, ActorRef[SessionProtocol]]()
val sessions = new mutable.HashMap[String, ActorRef[SessionProtocol]]
Expand All @@ -80,9 +81,11 @@ object RealtimeEventHub {
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, Tick, 5.minutes)

Behaviors.receiveMessage[Protocol] {
Behaviors.receiveMessagePartial[Protocol] {
case SessionStarted(session, user, replyTo) if !sessions.contains(session.getId) =>
val actor: ActorRef[SessionProtocol] = context.spawnAnonymous(RealtimeSessionActor.behavior(session))
val actor: ActorRef[SessionProtocol] =
context.spawnAnonymous(RealtimeSessionActor.behavior(ignoreListDao, session, user))

context.watch(actor)

sessions += (session.getId -> actor)
Expand All @@ -100,11 +103,16 @@ object RealtimeEventHub {
replyTo ! Done

Behaviors.same
case SubscribeTopic(session, topic, replyTo) if sessions.contains(session.getId) =>
case SubscribeTopic(session, topic, missed, replyTo) if sessions.contains(session.getId) =>
val actor = sessions(session.getId)

topicSubscriptions += (topic -> actor)

missed.foreach { cid =>
context.log.debug(s"Sending missed comment $cid")
actor ! NewCommentOnly(cid)
}

replyTo ! Done

Behaviors.same
Expand All @@ -116,11 +124,11 @@ object RealtimeEventHub {
}

Behaviors.same
case msg@NewComment(msgid, _) =>
case NewComment(msgid, cid) =>
context.log.debug(s"New comment in topic $msgid")

topicSubscriptions.sets.getOrElse(msgid, Set.empty).foreach {
_ ! msg
_ ! NewCommentOnly(cid)
}

Behaviors.same
Expand Down Expand Up @@ -162,43 +170,47 @@ object RealtimeEventHub {
}

object RealtimeSessionActor {
def behavior(session: WebSocketSession): Behavior[SessionProtocol] = Behaviors.setup { context =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, Tick, initialDelay = 5.seconds, delay = 1.minute)

def handleExceptions: PartialFunction[Throwable, Behavior[SessionProtocol]] = {
case ex: IOException =>
context.log.debug(s"Terminated by IOException ${ex.toString}")
Behaviors.stopped
}

Behaviors.receiveMessage[SessionProtocol] {
case NewComment(_, cid) =>
try {
notifyComment(session, cid)
Behaviors.same
} catch handleExceptions

case RefreshEvents(_) =>
try {
notifyEvent(session)
Behaviors.same
} catch handleExceptions
case Tick =>
// log.debug("Sending keepalive")
try {
session.sendMessage(new PingMessage())
def behavior(ignoreListDao: IgnoreListDao, session: WebSocketSession, userId: Option[Int]): Behavior[SessionProtocol] =
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, Tick, initialDelay = 5.seconds, delay = 1.minute)

def handleExceptions: PartialFunction[Throwable, Behavior[SessionProtocol]] = {
case ex: IOException =>
context.log.debug(s"Terminated by IOException ${ex.toString}")
Behaviors.stopped
}

Behaviors.receiveMessage[SessionProtocol] {
case NewCommentOnly(cid) =>
try {
if (userId.isEmpty || !ignoreListDao.isIgnored(userId.get, cid)) {
notifyComment(session, cid)
}

Behaviors.same
} catch handleExceptions

case RefreshEvents(_) =>
try {
notifyEvent(session)
Behaviors.same
} catch handleExceptions
case Tick =>
// log.debug("Sending keepalive")
try {
session.sendMessage(new PingMessage())
Behaviors.same
} catch handleExceptions
case TerminateSession =>
Behaviors.stopped
}.receiveSignal {
case (_, PostStop) =>
session.close()
Behaviors.same
} catch handleExceptions
case TerminateSession =>
Behaviors.stopped
}.receiveSignal {
case (_, PostStop) =>
session.close()
Behaviors.same
}
}
}
}
}

@Service
Expand Down Expand Up @@ -257,12 +269,7 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef[Protoco

val missed = comments.map(_.id).dropWhile(_ <= last).toVector

missed.foreach { cid =>
logger.debug(s"Sending missed comment $cid")
notifyComment(session, cid)
}

val result = hub.ask(SubscribeTopic(session, topic.id, _))
val result = hub.ask(SubscribeTopic(session, topic.id, missed, _))

Await.result(result, 10.seconds)
} catch {
Expand All @@ -282,10 +289,10 @@ class RealtimeWebsocketHandler(@Qualifier("realtimeHubWS") hub: ActorRef[Protoco
@Configuration
class RealtimeConfigurationBeans(actorSystem: ActorSystem) {
@Bean(Array("realtimeHubWS"))
def hub: ActorRef[Protocol] = {
def hub(ignoreListDao: IgnoreListDao): ActorRef[Protocol] = {
import akka.actor.typed.scaladsl.adapter.*

actorSystem.spawn(RealtimeEventHub.behavior, "realtimeHubWS")
actorSystem.spawn(RealtimeEventHub.behavior(ignoreListDao), "realtimeHubWS")
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/ru/org/linux/user/IgnoreListDao.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,9 @@ class IgnoreListDao(ds: DataSource) extends StrictLogging {
jdbcTemplate.queryForObject[Integer](
"SELECT count(*) as inum FROM ignore_list JOIN users ON ignore_list.userid = users.id" +
" WHERE ignored=? AND not blocked", ignoredUser.getId).get

def isIgnored(byUserId: Int, commentId: Int): Boolean =
jdbcTemplate.queryForObject[Boolean](
"select exists (select ignored from ignore_list where userid=? intersect select get_branch_authors(?))",
byUserId, commentId).get
}

0 comments on commit 8bfb47f

Please sign in to comment.