Skip to content

Commit

Permalink
[NU-1724] Statistics fixes (#6277)
Browse files Browse the repository at this point in the history
* Introducing empty maps, so we will send 0 if non-existent
Also max and min fix

* wip - some renaming and added emptyMap for ScenarioStatistics
Also added a test to check if all statistics are send even without created scenarios

* Added CorrelationId based on current timestamp which is added to all urls if we send too much data for one url

* Added DesignerUptime statistics which shows amount of seconds since designer has been initialized

* wip - need to fully and correctly replace instant with clock
made some changes to CorrelationId and extracted empty maps in the top of the file

* Added test to check if all statistics are included even without created scenarios

* Review fixes - some renaming and added test to check if all statistics are given even without created scenarios

---------

Co-authored-by: Szymon Bogusz <[email protected]>
  • Loading branch information
2 people authored and Łukasz Bigorajski committed Jul 2, 2024
1 parent 2814790 commit 9f4f738
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ class NussknackerAppFactory(processingTypeDataStateFactory: ProcessingTypeDataSt
db <- DbRef.create(config.resolved)
feStatisticsRepository <- QuestDbFEStatisticsRepository.create(system, clock, config.resolved)
server = new NussknackerHttpServer(
new AkkaHttpBasedRouteProvider(db, metricsRegistry, processingTypeDataStateFactory, feStatisticsRepository)(
new AkkaHttpBasedRouteProvider(
db,
metricsRegistry,
processingTypeDataStateFactory,
feStatisticsRepository,
clock
)(
system,
materializer
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class AkkaHttpBasedRouteProvider(
dbRef: DbRef,
metricsRegistry: MetricRegistry,
processingTypeDataStateFactory: ProcessingTypeDataStateFactory,
feStatisticsRepository: FEStatisticsRepository[Future]
feStatisticsRepository: FEStatisticsRepository[Future],
designerClock: Clock
)(implicit system: ActorSystem, materializer: Materializer)
extends RouteProvider[Route]
with Directives
Expand Down Expand Up @@ -494,6 +495,7 @@ class AkkaHttpBasedRouteProvider(
.values
.flatten
.toList,
designerClock
)

val statisticsApiHttpService = new StatisticsApiHttpService(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pl.touk.nussknacker.ui.statistics

import java.util.UUID

class CorrelationId(val value: String) extends AnyVal

object CorrelationId {
def apply(): CorrelationId =
new CorrelationId(UUID.randomUUID().toString)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.touk.nussknacker.ui.statistics

import cats.implicits.toFoldableOps
import pl.touk.nussknacker.engine.api.component.{BuiltInComponentId, ComponentType, ProcessingMode}
import pl.touk.nussknacker.engine.api.component.{ComponentType, ProcessingMode}
import pl.touk.nussknacker.engine.api.deployment.simple.SimpleStateStatus
import pl.touk.nussknacker.engine.definition.component.ComponentDefinitionWithImplementation
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
Expand All @@ -26,16 +26,64 @@ object ScenarioStatistics {

private val componentStatisticPrefix = "c_"

private[statistics] val emptyScenarioStatistics: Map[String, String] = Map(
ScenarioCount -> 0,
FragmentCount -> 0,
UnboundedStreamCount -> 0,
BoundedStreamCount -> 0,
RequestResponseCount -> 0,
FlinkDMCount -> 0,
LiteK8sDMCount -> 0,
LiteEmbeddedDMCount -> 0,
UnknownDMCount -> 0,
ActiveScenarioCount -> 0
).map { case (k, v) => (k.toString, v.toString) }

private[statistics] val emptyActivityStatistics: Map[String, String] = Map(
AttachmentsAverage -> 0,
AttachmentsTotal -> 0,
CommentsTotal -> 0,
CommentsAverage -> 0
).map { case (k, v) => (k.toString, v.toString) }

private[statistics] val emptyComponentStatistics: Map[String, String] =
Map(ComponentsCount.toString -> "0")

private[statistics] val emptyUptimeStats: Map[String, String] = Map(
UptimeInSecondsAverage -> 0,
UptimeInSecondsMax -> 0,
UptimeInSecondsMin -> 0,
).map { case (k, v) => (k.toString, v.toString) }

private[statistics] val emptyGeneralStatistics: Map[String, String] = Map(
NodesMedian -> 0,
NodesAverage -> 0,
NodesMax -> 0,
NodesMin -> 0,
CategoriesCount -> 0,
VersionsMedian -> 0,
VersionsAverage -> 0,
VersionsMax -> 0,
VersionsMin -> 0,
AuthorsCount -> 0,
FragmentsUsedMedian -> 0,
FragmentsUsedAverage -> 0,
UptimeInSecondsAverage -> 0,
UptimeInSecondsMax -> 0,
UptimeInSecondsMin -> 0,
).map { case (k, v) => (k.toString, v.toString) }

def getScenarioStatistics(scenariosInputData: List[ScenarioStatisticsInputData]): Map[String, String] = {
scenariosInputData
.map(ScenarioStatistics.determineStatisticsForScenario)
.combineAll
.mapValuesNow(_.toString)
emptyScenarioStatistics ++
scenariosInputData
.map(ScenarioStatistics.determineStatisticsForScenario)
.combineAll
.mapValuesNow(_.toString)
}

def getGeneralStatistics(scenariosInputData: List[ScenarioStatisticsInputData]): Map[String, String] = {
if (scenariosInputData.isEmpty) {
Map.empty
emptyGeneralStatistics
} else {
// Nodes stats
val sortedNodes = scenariosInputData.map(_.nodesCount).sorted
Expand Down Expand Up @@ -64,21 +112,16 @@ object ScenarioStatistics {
}.sorted
val uptimeStatsMap = {
if (sortedUptimes.isEmpty) {
Map(
UptimeInSecondsAverage -> 0,
UptimeInSecondsMax -> 0,
UptimeInSecondsMin -> 0,
)
emptyUptimeStats
} else {
Map(
UptimeInSecondsAverage -> calculateAverage(sortedUptimes),
UptimeInSecondsMax -> getMax(sortedUptimes),
UptimeInSecondsMin -> getMin(sortedUptimes)
)
).map { case (k, v) => (k.toString, v.toString) }
}
}

(Map(
Map(
NodesMedian -> nodesMedian,
NodesAverage -> nodesAverage,
NodesMax -> nodesMax,
Expand All @@ -91,16 +134,17 @@ object ScenarioStatistics {
AuthorsCount -> authorsCount,
FragmentsUsedMedian -> fragmentsUsedMedian,
FragmentsUsedAverage -> fragmentsUsedAverage
) ++ uptimeStatsMap)
.map { case (k, v) => (k.toString, v.toString) }
)
.map { case (k, v) => (k.toString, v.toString) } ++
uptimeStatsMap
}
}

def getActivityStatistics(
listOfActivities: List[DbProcessActivityRepository.ProcessActivity]
): Map[String, String] = {
if (listOfActivities.isEmpty) {
Map.empty
emptyActivityStatistics
} else {
// Attachment stats
val sortedAttachmentCountList = listOfActivities.map(_.attachments.length)
Expand All @@ -126,7 +170,7 @@ object ScenarioStatistics {
components: List[ComponentDefinitionWithImplementation]
): Map[String, String] = {
if (componentList.isEmpty) {
Map.empty
emptyComponentStatistics
} else {

// Get number of available components to check how many custom components created
Expand Down Expand Up @@ -204,15 +248,15 @@ object ScenarioStatistics {

private def getMax[T: Numeric](orderedList: List[T]): T = {
if (orderedList.isEmpty) implicitly[Numeric[T]].zero
else orderedList.head
else orderedList.last
}

private def getMin[T: Numeric](orderedList: List[T]): T = {
if (orderedList.isEmpty) implicitly[Numeric[T]].zero
else orderedList.last
else orderedList.head
}

def mapNameToStat(componentId: String): String = {
private def mapNameToStat(componentId: String): String = {
val shortenedName = componentId.replaceAll(vowelsRegex, "").toLowerCase

componentStatisticPrefix + shortenedName
Expand Down Expand Up @@ -254,6 +298,9 @@ case object LiteK8sDMCount extends StatisticKey("s_dm_l")
case object LiteEmbeddedDMCount extends StatisticKey("s_dm_e")
case object UnknownDMCount extends StatisticKey("s_dm_c")
case object ActiveScenarioCount extends StatisticKey("s_a")
case object NuSource extends StatisticKey("source") // f.e docker, helmchart, docker-quickstart, binaries
case object NuFingerprint extends StatisticKey("fingerprint")
case object NuVersion extends StatisticKey("version")
// Not scenario related statistics
case object NuSource extends StatisticKey("source") // f.e docker, helmchart, docker-quickstart, binaries
case object NuFingerprint extends StatisticKey("fingerprint")
case object NuVersion extends StatisticKey("version")
case object CorrelationIdStat extends StatisticKey("co_id")
case object DesignerUptimeInSeconds extends StatisticKey("d_u")
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ import java.nio.charset.StandardCharsets

class StatisticsUrls(cfg: StatisticUrlConfig) extends LazyLogging {

def prepare(fingerprint: Fingerprint, rawStatistics: Map[String, String]): List[String] =
def prepare(
fingerprint: Fingerprint,
correlationId: CorrelationId,
rawStatistics: Map[String, String]
): List[String] =
rawStatistics.toList
// Sorting for purpose of easier testing
.sortBy(_._1)
.map(encodeQueryParam)
.groupByMaxChunkSize(cfg.urlBytesSizeLimit)
.flatMap(queryParams => prepareUrlString(queryParams, queryParamsForEveryURL(fingerprint)))
.flatMap(queryParams => prepareUrlString(queryParams, queryParamsForEveryURL(fingerprint, correlationId)))

private def encodeQueryParam(entry: (String, String)): String =
s"${URLEncoder.encode(entry._1, StandardCharsets.UTF_8)}=${URLEncoder.encode(entry._2, StandardCharsets.UTF_8)}"
Expand All @@ -28,8 +32,9 @@ class StatisticsUrls(cfg: StatisticUrlConfig) extends LazyLogging {
}
}

private def queryParamsForEveryURL(fingerprint: Fingerprint): List[String] = List(
encodeQueryParam(NuFingerprint.name -> fingerprint.value)
private def queryParamsForEveryURL(fingerprint: Fingerprint, correlationId: CorrelationId): List[String] = List(
encodeQueryParam(NuFingerprint.name -> fingerprint.value),
encodeQueryParam(CorrelationIdStat.name -> correlationId.value)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import pl.touk.nussknacker.ui.process.{ProcessService, ScenarioQuery}
import pl.touk.nussknacker.ui.security.api.{LoggedUser, NussknackerInternalUser}
import pl.touk.nussknacker.ui.statistics.UsageStatisticsReportsSettingsService.nuFingerprintFileName

import java.time.Clock
import scala.concurrent.{ExecutionContext, Future}

object UsageStatisticsReportsSettingsService extends LazyLogging {
Expand All @@ -38,7 +39,8 @@ object UsageStatisticsReportsSettingsService extends LazyLogging {
// TODO: Should not depend on DTO, need to extract usageCount and check if all available components are present using processingTypeDataProvider
componentService: ComponentService,
statisticsRepository: FEStatisticsRepository[Future],
componentList: List[ComponentDefinitionWithImplementation]
componentList: List[ComponentDefinitionWithImplementation],
designerClock: Clock
)(implicit ec: ExecutionContext): UsageStatisticsReportsSettingsService = {
val ignoringErrorsFEStatisticsRepository = new IgnoringErrorsFEStatisticsRepository(statisticsRepository)
implicit val user: LoggedUser = NussknackerInternalUser.instance
Expand Down Expand Up @@ -93,7 +95,8 @@ object UsageStatisticsReportsSettingsService extends LazyLogging {
fetchActivity,
fetchComponentList,
() => ignoringErrorsFEStatisticsRepository.read(),
componentList
componentList,
designerClock
)

}
Expand Down Expand Up @@ -121,16 +124,21 @@ class UsageStatisticsReportsSettingsService(
],
fetchComponentList: () => Future[Either[StatisticError, List[ComponentListElement]]],
fetchFeStatistics: () => Future[Map[String, Long]],
components: List[ComponentDefinitionWithImplementation]
components: List[ComponentDefinitionWithImplementation],
designerClock: Clock
)(implicit ec: ExecutionContext) {
private val statisticsUrls = new StatisticsUrls(urlConfig)
private val statisticsUrls = new StatisticsUrls(urlConfig)
private val designerStartTime = designerClock.instant()

def prepareStatisticsUrl(): Future[Either[StatisticError, List[String]]] = {
if (config.enabled) {
val maybeUrls = for {
queryParams <- determineQueryParams()
fingerprint <- new EitherT(fingerprintService.fingerprint(config, nuFingerprintFileName))
urls <- EitherT.pure[Future, StatisticError](statisticsUrls.prepare(fingerprint, queryParams))
correlationId = CorrelationId.apply()
urls <- EitherT.pure[Future, StatisticError](
statisticsUrls.prepare(fingerprint, correlationId, queryParams)
)
} yield urls
maybeUrls.value
} else {
Expand All @@ -149,11 +157,13 @@ class UsageStatisticsReportsSettingsService(
componentList <- new EitherT(fetchComponentList())
componentStatistics = ScenarioStatistics.getComponentStatistic(componentList, components)
feStatistics <- EitherT.liftF(fetchFeStatistics())
designerUptimeStatistics = getDesignerUptimeStatistics
} yield basicStatistics ++
scenariosStatistics ++
generalStatistics ++
activityStatistics ++
componentStatistics ++
designerUptimeStatistics ++
feStatistics.map { case (k, v) =>
k -> v.toString
}
Expand All @@ -168,6 +178,14 @@ class UsageStatisticsReportsSettingsService(
NuVersion.name -> BuildInfo.version
)

private def getDesignerUptimeStatistics: Map[String, String] = {
Map(
DesignerUptimeInSeconds.name -> (designerClock
.instant()
.getEpochSecond - designerStartTime.getEpochSecond).toString
)
}

}

private[statistics] case class ScenarioStatisticsInputData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class StatisticsApiHttpServiceBusinessSpec
private val statisticsByIndex = statisticsNames.zipWithIndex.map(p => p._2 -> p._1).toMap
private val quote = '"'
private val random = new Random()
private val uuidRegex = "[0-9a-f]{8}(-[a-f0-9]{4}){4}[a-f0-9]{8}"

private val mockedClock = mock[Clock](new Answer[Instant] {
override def answer(invocation: InvocationOnMock): Instant = Instant.now()
Expand Down Expand Up @@ -97,10 +98,41 @@ class StatisticsApiHttpServiceBusinessSpec
.Then()
.statusCode(200)
.bodyWithStatisticsURL(
(AuthorsCount.name, equalTo("0")),
(AttachmentsTotal.name, equalTo("0")),
(AttachmentsAverage.name, equalTo("0")),
(CategoriesCount.name, equalTo("0")),
(ComponentsCount.name, new GreaterThanOrEqualToLongMatcher(62L)),
(CommentsTotal.name, equalTo("0")),
(CommentsAverage.name, equalTo("0")),
(FragmentsUsedMedian.name, equalTo("0")),
(FragmentsUsedAverage.name, equalTo("0")),
(NuFingerprint.name, matchesRegex("[\\w-]+?")),
(NodesMedian.name, equalTo("0")),
(NodesMax.name, equalTo("0")),
(NodesMin.name, equalTo("0")),
(NodesAverage.name, equalTo("0")),
(ActiveScenarioCount.name, equalTo("0")),
(UnknownDMCount.name, equalTo("0")),
(LiteEmbeddedDMCount.name, equalTo("0")),
(FlinkDMCount.name, equalTo("0")),
(LiteK8sDMCount.name, equalTo("0")),
(FragmentCount.name, equalTo("0")),
(BoundedStreamCount.name, equalTo("0")),
(RequestResponseCount.name, equalTo("0")),
(UnboundedStreamCount.name, equalTo("0")),
(ScenarioCount.name, equalTo("0")),
(NuSource.name, equalTo("sources")),
(UptimeInSecondsMax.name, equalTo("0")),
(UptimeInSecondsMin.name, equalTo("0")),
(UptimeInSecondsAverage.name, equalTo("0")),
(VersionsMedian.name, equalTo("0")),
(VersionsMax.name, equalTo("0")),
(VersionsMin.name, equalTo("0")),
(VersionsAverage.name, equalTo("0")),
(NuVersion.name, equalTo(nuVersion)),
(CorrelationIdStat.name, matchesRegex(uuidRegex)),
(DesignerUptimeInSeconds.name, matchesRegex("\\d+"))
)
}

Expand Down Expand Up @@ -148,6 +180,8 @@ class StatisticsApiHttpServiceBusinessSpec
(VersionsMin.name, equalTo("1")),
(VersionsAverage.name, equalTo("1")),
(NuVersion.name, equalTo(nuVersion)),
(CorrelationIdStat.name, matchesRegex(uuidRegex)),
(DesignerUptimeInSeconds.name, matchesRegex("\\d+")),
// TODO: Should make a proper test for component mapping
("c_bltnfltr", equalTo("1"))
)
Expand Down
Loading

0 comments on commit 9f4f738

Please sign in to comment.