Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/staging' into 1926-indexing-of-m…
Browse files Browse the repository at this point in the history
…ap-3
  • Loading branch information
Pawel Czajka committed Jan 14, 2025
2 parents a750459 + 78115b0 commit 580a546
Show file tree
Hide file tree
Showing 19 changed files with 896 additions and 37 deletions.
52 changes: 35 additions & 17 deletions designer/client/src/containers/Notifications.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import Notification from "../components/notifications/Notification";
import CheckCircleOutlinedIcon from "@mui/icons-material/CheckCircleOutlined";
import DangerousOutlinedIcon from "@mui/icons-material/DangerousOutlined";
import { markBackendNotificationRead, updateBackendNotifications } from "../actions/nk/notifications";
import { loadProcessState } from "../actions/nk";
import { getProcessName, getProcessVersionId } from "../reducers/selectors/graph";
import { fetchProcessDefinition, loadProcessState } from "../actions/nk";
import { getProcessingType, getProcessName, getProcessVersionId, isFragment } from "../reducers/selectors/graph";
import { useChangeConnectionError } from "./connectionErrorProvider";
import i18next from "i18next";
import { ThunkAction } from "../actions/reduxTypes";
Expand Down Expand Up @@ -44,23 +44,37 @@ const prepareNotification =
};

const handleRefresh =
({ scenarioName, toRefresh }: BackendNotification, currentScenarioName: string, processVersionId: number): ThunkAction =>
(
{ scenarioName, toRefresh }: BackendNotification,
currentScenarioName: string,
processVersionId: number,
currentProcessingType: string,
currentIsFragment: boolean,
): ThunkAction =>
(dispatch) => {
if (!scenarioName || scenarioName !== currentScenarioName) {
if (scenarioName && scenarioName !== currentScenarioName) {
return;
}
toRefresh.forEach((data) => {
switch (data) {
case "activity":
return dispatch(getScenarioActivities(scenarioName));
case "state":
return dispatch(loadProcessState(scenarioName, processVersionId));
}
});
if (toRefresh.indexOf("activity") >= 0 && currentScenarioName) {
dispatch(getScenarioActivities(currentScenarioName));
}
if (toRefresh.indexOf("state") >= 0 && currentScenarioName && processVersionId) {
dispatch(loadProcessState(currentScenarioName, processVersionId));
}
if (toRefresh.indexOf("creator") >= 0 && currentProcessingType && currentIsFragment != null) {
dispatch(fetchProcessDefinition(currentProcessingType, currentIsFragment));
}
return;
};

const prepareNotifications =
(notifications: BackendNotification[], scenarioName: string, processVersionId: number): ThunkAction =>
(
notifications: BackendNotification[],
scenarioName: string,
processVersionId: number,
currentProcessingType: string,
currentIsFragment: boolean,
): ThunkAction =>
(dispatch, getState) => {
const state = getState();
const { processedNotificationIds } = getBackendNotifications(state);
Expand All @@ -74,7 +88,7 @@ const prepareNotifications =

notifications.filter(onlyUnreadPredicate).forEach((notification) => {
dispatch(prepareNotification(notification));
dispatch(handleRefresh(notification, scenarioName, processVersionId));
dispatch(handleRefresh(notification, scenarioName, processVersionId, currentProcessingType, currentIsFragment));
});
};

Expand All @@ -87,13 +101,17 @@ export function Notifications(): JSX.Element {

const currentScenarioName = useSelector(getProcessName);
const processVersionId = useSelector(getProcessVersionId);
const currentProcessingType = useSelector(getProcessingType);
const currentIsFragment = useSelector(isFragment);

const refresh = useCallback(() => {
HttpService.loadBackendNotifications(currentScenarioName)
.then((notifications) => {
handleChangeConnectionError(null);
dispatch(updateBackendNotifications(notifications.map(({ id }) => id)));
dispatch(prepareNotifications(notifications, currentScenarioName, processVersionId));
dispatch(
prepareNotifications(notifications, currentScenarioName, processVersionId, currentProcessingType, currentIsFragment),
);
})
.catch((error) => {
const isNetworkAccess = navigator.onLine;
Expand All @@ -111,7 +129,7 @@ export function Notifications(): JSX.Element {
);
}
});
}, [currentScenarioName, dispatch, handleChangeConnectionError]);
}, [currentScenarioName, processVersionId, currentProcessingType, currentIsFragment, dispatch, handleChangeConnectionError]);
useInterval(refresh, {
refreshTime: 2000,
ignoreFirst: true,
Expand All @@ -123,7 +141,7 @@ export function Notifications(): JSX.Element {

type NotificationType = "info" | "error" | "success";

type DataToRefresh = "activity" | "state";
type DataToRefresh = "activity" | "state" | "creator";

export type BackendNotification = {
id: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import pl.touk.nussknacker.ui.util.ScenarioActivityUtils.ScenarioActivityOps
import sttp.tapir.Schema
import sttp.tapir.derevo.schema

import java.util.UUID

@derive(encoder, decoder, schema)
final case class Notification(
id: String,
Expand Down Expand Up @@ -92,6 +94,16 @@ object Notification {
)
}

def configurationReloaded: Notification =
Notification(
id = UUID.randomUUID().toString,
scenarioName = None,
message = "Configuration reloaded",
// We don't want to show this notification to other users because they might be not interested, and it can only introduce a confusion
`type` = None,
toRefresh = List(DataToRefresh.creator)
)

private def displayableActionName(actionName: ScenarioActionName): String =
actionName match {
case ScenarioActionName.Deploy => "Deployment"
Expand All @@ -116,5 +128,5 @@ object DataToRefresh extends Enumeration {
implicit val typeDecoder: Decoder[DataToRefresh.Value] = Decoder.decodeEnumeration(DataToRefresh)

type DataToRefresh = Value
val activity, state = Value
val activity, state, creator = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pl.touk.nussknacker.ui.notifications.NotificationService.NotificationsSco
import pl.touk.nussknacker.ui.process.repository.{DBIOActionRunner, ScenarioActionRepository}
import pl.touk.nussknacker.ui.process.scenarioactivity.FetchScenarioActivityService
import pl.touk.nussknacker.ui.security.api.LoggedUser
import pl.touk.nussknacker.ui.util.InMemoryTimeseriesRepository

import java.time.{Clock, Instant}
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -43,6 +44,7 @@ object NotificationService {
class NotificationServiceImpl(
fetchScenarioActivityService: FetchScenarioActivityService,
scenarioActionRepository: ScenarioActionRepository,
globalNotificationRepository: InMemoryTimeseriesRepository[Notification],
dbioRunner: DBIOActionRunner,
config: NotificationConfig,
clock: Clock = Clock.systemUTC()
Expand All @@ -53,14 +55,19 @@ class NotificationServiceImpl(
)(implicit ec: ExecutionContext): Future[List[Notification]] = {
val now = clock.instant()
val limit = now.minusMillis(config.duration.toMillis)
def fetchUserAndGlobalNotifications(user: LoggedUser) =
for {
notificationsForUserActions <- notificationsForUserActions(user, limit)
globalNotifications = fetchGlobalNotificationsAndTriggerEviction(limit)
} yield notificationsForUserActions ++ globalNotifications
scope match {
case NotificationsScope.NotificationsForLoggedUser(user) =>
notificationsForUserActions(user, limit)
fetchUserAndGlobalNotifications(user)
case NotificationsScope.NotificationsForLoggedUserAndScenario(user, processName) =>
for {
notificationsForUserActions <- notificationsForUserActions(user, limit)
userAndGlobalNotifications <- fetchUserAndGlobalNotifications(user)
notificationsForScenarioActivities <- notificationsForScenarioActivities(user, processName, limit)
} yield notificationsForUserActions ++ notificationsForScenarioActivities
} yield userAndGlobalNotifications ++ notificationsForScenarioActivities
}

}
Expand Down Expand Up @@ -107,4 +114,10 @@ class NotificationServiceImpl(
} yield notificationsForScenarioActivities
}

private def fetchGlobalNotificationsAndTriggerEviction(limit: Instant) = {
val globalNotifications = globalNotificationRepository.fetchEntries(limit)
globalNotificationRepository.evictOldEntries()
globalNotifications
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import pl.touk.nussknacker.ui.config.scenariotoolbar.CategoriesScenarioToolbarsC
import pl.touk.nussknacker.ui.config.{
AttachmentsConfig,
ComponentLinksConfigExtractor,
DesignerConfig,
FeatureTogglesConfig,
UsageStatisticsReportsConfig
}
import pl.touk.nussknacker.ui.config.DesignerConfig
import pl.touk.nussknacker.ui.db.DbRef
import pl.touk.nussknacker.ui.db.timeseries.FEStatisticsRepository
import pl.touk.nussknacker.ui.definition.component.{ComponentServiceProcessingTypeData, DefaultComponentService}
Expand All @@ -43,7 +43,7 @@ import pl.touk.nussknacker.ui.listener.ProcessChangeListenerLoader
import pl.touk.nussknacker.ui.listener.services.NussknackerServices
import pl.touk.nussknacker.ui.metrics.RepositoryGauges
import pl.touk.nussknacker.ui.migrations.{MigrationApiAdapterService, MigrationService}
import pl.touk.nussknacker.ui.notifications.{NotificationConfig, NotificationServiceImpl}
import pl.touk.nussknacker.ui.notifications.{Notification, NotificationConfig, NotificationServiceImpl}
import pl.touk.nussknacker.ui.process._
import pl.touk.nussknacker.ui.process.deployment.{
ActionService,
Expand Down Expand Up @@ -94,7 +94,7 @@ import pl.touk.nussknacker.ui.validation.{
}
import sttp.client3.SttpBackend

import java.time.Clock
import java.time.{Clock, Duration}
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Supplier
import scala.concurrent.Future
Expand Down Expand Up @@ -130,13 +130,16 @@ class AkkaHttpBasedRouteProvider(
deploymentRepository = new DeploymentRepository(dbRef, Clock.systemDefaultZone())
scenarioActivityRepository = DbScenarioActivityRepository.create(dbRef, designerClock)
dbioRunner = DBIOActionRunner(dbRef)
// 1 hour is the delay to propagate all global notifications for all users
globalNotificationRepository = InMemoryTimeseriesRepository[Notification](Duration.ofHours(1), Clock.systemUTC())
processingTypeDataProvider <- prepareProcessingTypeDataReload(
additionalUIConfigProvider,
actionServiceSupplier,
scenarioActivityRepository,
dbioRunner,
sttpBackend,
featureTogglesConfig,
globalNotificationRepository
)

deploymentsStatusesSynchronizer = new DeploymentsStatusesSynchronizer(
Expand Down Expand Up @@ -325,6 +328,7 @@ class AkkaHttpBasedRouteProvider(
val notificationService = new NotificationServiceImpl(
fetchScenarioActivityService,
actionRepository,
globalNotificationRepository,
dbioRunner,
notificationsConfig
)
Expand Down Expand Up @@ -699,7 +703,8 @@ class AkkaHttpBasedRouteProvider(
scenarioActivityRepository: ScenarioActivityRepository,
dbioActionRunner: DBIOActionRunner,
sttpBackend: SttpBackend[Future, Any],
featureTogglesConfig: FeatureTogglesConfig
featureTogglesConfig: FeatureTogglesConfig,
globalNotificationRepository: InMemoryTimeseriesRepository[Notification]
): Resource[IO, ReloadableProcessingTypeDataProvider] = {
Resource
.make(
Expand All @@ -719,7 +724,12 @@ class AkkaHttpBasedRouteProvider(
_
),
)
new ReloadableProcessingTypeDataProvider(laodProcessingTypeDataIO)
val loadAndNotifyIO = laodProcessingTypeDataIO
.map { state =>
globalNotificationRepository.saveEntry(Notification.configurationReloaded)
state
}
new ReloadableProcessingTypeDataProvider(loadAndNotifyIO)
}
)(
release = _.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package pl.touk.nussknacker.ui.util

import java.time.{Clock, Duration, Instant}
import java.util.concurrent.ConcurrentSkipListMap
import scala.jdk.CollectionConverters._

class InMemoryTimeseriesRepository[EntryT](
timeline: ConcurrentSkipListMap[Instant, EntryT],
evictionDelay: Duration,
clock: Clock
) {

def saveEntry(entry: EntryT): Unit = {
timeline.put(clock.instant(), entry)
}

def fetchEntries(lowerLimit: Instant): List[EntryT] = {
timeline.tailMap(lowerLimit).values().asScala.toList
}

def evictOldEntries(): Unit = {
timeline.headMap(clock.instant().minus(evictionDelay), true).clear()
}

}

object InMemoryTimeseriesRepository {

def apply[EntryT](retentionDelay: Duration, clock: Clock): InMemoryTimeseriesRepository[EntryT] = {
new InMemoryTimeseriesRepository[EntryT](
new ConcurrentSkipListMap[Instant, EntryT],
retentionDelay,
clock
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,53 @@ class NotificationApiHttpServiceBusinessSpec
with PatientScalaFutures {

"The endpoint for getting notifications should" - {
"return empty list if no notifications are present" in {
// We can't easily recognize if configuration was changed between restarts so just in case we send this notification
"return initial notification about configuration reload just after start of application" in {
given()
.when()
.basicAuthAdmin()
.get(s"$nuDesignerHttpAddress/api/notifications")
.Then()
.statusCode(200)
.body(
equalTo("[]")
matchJsonWithRegexValues(
s"""[{
| "id": "^\\\\w{8}-\\\\w{4}-\\\\w{4}-\\\\w{4}-\\\\w{12}$$",
| "scenarioName": null,
| "message": "Configuration reloaded",
| "type": null,
| "toRefresh": [ "creator" ]
|}]""".stripMargin
)
)
}
"return notification when processing type data are reloaded" in {
given()
.when()
.applicationState {
reloadConfiguration()
}
.basicAuthAdmin()
.get(s"$nuDesignerHttpAddress/api/notifications")
.Then()
.statusCode(200)
.body(
matchJsonWithRegexValues(
s"""[{
| "id": "^\\\\w{8}-\\\\w{4}-\\\\w{4}-\\\\w{4}-\\\\w{12}$$",
| "scenarioName": null,
| "message": "Configuration reloaded",
| "type": null,
| "toRefresh": [ "creator" ]
|},
|{
| "id": "^\\\\w{8}-\\\\w{4}-\\\\w{4}-\\\\w{4}-\\\\w{12}$$",
| "scenarioName": null,
| "message": "Configuration reloaded",
| "type": null,
| "toRefresh": [ "creator" ]
|}]""".stripMargin
)
)
}
"return a list of notifications" in {
Expand Down Expand Up @@ -62,6 +100,20 @@ class NotificationApiHttpServiceBusinessSpec
| "message": "Cancel finished",
| "type": null,
| "toRefresh": [ "activity", "state" ]
|},
|{
| "id": "^\\\\w{8}-\\\\w{4}-\\\\w{4}-\\\\w{4}-\\\\w{12}$$",
| "scenarioName": null,
| "message": "Configuration reloaded",
| "type": null,
| "toRefresh": [ "creator" ]
|},
|{
| "id": "^\\\\w{8}-\\\\w{4}-\\\\w{4}-\\\\w{4}-\\\\w{12}$$",
| "scenarioName": null,
| "message": "Configuration reloaded",
| "type": null,
| "toRefresh": [ "creator" ]
|}]""".stripMargin
)
)
Expand All @@ -81,4 +133,13 @@ class NotificationApiHttpServiceBusinessSpec
}
}

private def reloadConfiguration(): Unit = {
given()
.when()
.basicAuthAdmin()
.post(s"$nuDesignerHttpAddress/api/app/processingtype/reload")
.Then()
.statusCode(204)
}

}
Loading

0 comments on commit 580a546

Please sign in to comment.