diff --git a/plugin/trino-loki/pom.xml b/plugin/trino-loki/pom.xml index c88ce05e8a60..3853ae965712 100644 --- a/plugin/trino-loki/pom.xml +++ b/plugin/trino-loki/pom.xml @@ -56,7 +56,7 @@ io.github.jeschkies loki-client - 0.0.2 + 0.0.3 diff --git a/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiRecordSet.java b/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiRecordSet.java index abc730c02ec8..a565ccc66e9e 100644 --- a/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiRecordSet.java +++ b/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiRecordSet.java @@ -49,7 +49,11 @@ public LokiRecordSet(LokiClient lokiClient, LokiSplit split, List splits = ImmutableList.of(new LokiSplit(table.query(), table.start(), table.end())); + List splits = ImmutableList.of(new LokiSplit(table.query(), table.start(), table.end(), table.step())); log.debug("created %d splits", splits.size()); return new FixedSplitSource(splits); diff --git a/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiTableHandle.java b/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiTableHandle.java index d752f9a81813..2b2e0da26f3d 100644 --- a/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiTableHandle.java +++ b/plugin/trino-loki/src/main/java/io/trino/plugin/loki/LokiTableHandle.java @@ -22,7 +22,7 @@ import static java.util.Objects.requireNonNull; -public record LokiTableHandle(String query, Instant start, Instant end, List columnHandles) +public record LokiTableHandle(String query, Instant start, Instant end, Long step, List columnHandles) implements ConnectorTableHandle { public LokiTableHandle diff --git a/plugin/trino-loki/src/main/java/io/trino/plugin/loki/QueryRangeTableFunction.java b/plugin/trino-loki/src/main/java/io/trino/plugin/loki/QueryRangeTableFunction.java index 6d0766d7743f..84cc618ebdb4 100644 --- a/plugin/trino-loki/src/main/java/io/trino/plugin/loki/QueryRangeTableFunction.java +++ b/plugin/trino-loki/src/main/java/io/trino/plugin/loki/QueryRangeTableFunction.java @@ -41,6 +41,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; +import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_NANOS; import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_SECOND; import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MILLISECOND; @@ -74,6 +75,11 @@ public QueryRangeTableFunction(LokiMetadata metadata) ScalarArgumentSpecification.builder() .name("END") .type(TIMESTAMP_TZ_NANOS) + .build(), + ScalarArgumentSpecification.builder() + .name("STEP") + .type(INTEGER) + .defaultValue(0L) .build()), GENERIC_TABLE); @@ -89,6 +95,11 @@ public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransact LongTimestampWithTimeZone startArgument = (LongTimestampWithTimeZone) ((ScalarArgument) arguments.get("START")).getValue(); LongTimestampWithTimeZone endArgument = (LongTimestampWithTimeZone) ((ScalarArgument) arguments.get("END")).getValue(); + Long step = (Long) ((ScalarArgument) arguments.get("STEP")).getValue(); + if (step == null) { + step = 0L; + } + if (Strings.isNullOrEmpty(query)) { throw new TrinoException(INVALID_FUNCTION_ARGUMENT, query); } @@ -118,6 +129,7 @@ public TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransact query, start, end, + step, columnHandles); return TableFunctionAnalysis.builder() diff --git a/plugin/trino-loki/src/test/java/io/trino/plugin/loki/TestLokiIntegration.java b/plugin/trino-loki/src/test/java/io/trino/plugin/loki/TestLokiIntegration.java index 34f7a8ffa42b..050ba473d1b8 100644 --- a/plugin/trino-loki/src/test/java/io/trino/plugin/loki/TestLokiIntegration.java +++ b/plugin/trino-loki/src/test/java/io/trino/plugin/loki/TestLokiIntegration.java @@ -21,13 +21,11 @@ import java.time.Duration; import java.time.Instant; -import java.time.LocalDate; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; -import static io.trino.type.DateTimes.MILLISECONDS_PER_DAY; import static java.lang.String.format; final class TestLokiIntegration @@ -37,6 +35,7 @@ final class TestLokiIntegration private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss.SSS'Z'").withZone(ZoneOffset.UTC); private static final DateTimeFormatter timestampFormatterAtEasternTime = DateTimeFormatter.ofPattern("uuuu-MM-dd HH:mm:ss'-05:00'").withZone(ZoneId.of("US/Eastern")); + private static final DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneOffset.UTC); @Override protected QueryRunner createQueryRunner() @@ -155,7 +154,6 @@ void testLabelsComplex() void testSelectTimestampLogsQuery() throws Exception { - DateTimeFormatter isoTimestampFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(ZoneOffset.UTC); Instant start = Instant.now().truncatedTo(ChronoUnit.DAYS).minus(Duration.ofHours(12)); Instant end = start.plus(Duration.ofHours(4)); Instant firstLineTimestamp = start.truncatedTo(ChronoUnit.MILLIS); @@ -184,24 +182,24 @@ void testSelectTimestampLogsQuery() void testTimestampMetricsQuery() throws Exception { - LocalDate baseLineDate = LocalDate.now(); - Instant start = Instant.ofEpochMilli(baseLineDate.toEpochDay() * MILLISECONDS_PER_DAY); - Instant end = start.plus(Duration.ofHours(4)); + Instant start = Instant.now().truncatedTo(ChronoUnit.HOURS).minus(Duration.ofHours(4)); + Instant end = start.plus(Duration.ofHours(3)); - this.client.pushLogLine("line 1", start.plus(Duration.ofHours(1)), ImmutableMap.of("test", "timestamp_metrics_query")); + this.client.pushLogLine("line 1", start.plus(Duration.ofMinutes(4)), ImmutableMap.of("test", "timestamp_metrics_query")); this.client.pushLogLine("line 2", start.plus(Duration.ofHours(2)), ImmutableMap.of("test", "timestamp_metrics_query")); this.client.pushLogLine("line 3", start.plus(Duration.ofHours(3)), ImmutableMap.of("test", "timestamp_metrics_query")); this.client.flush(); assertQuery(format(""" - SELECT CAST(timestamp AS DATE) FROM + SELECT to_iso8601(timestamp), value FROM TABLE(system.query_range( 'count_over_time({test="timestamp_metrics_query"}[5m])', TIMESTAMP '%s', - TIMESTAMP '%s' + TIMESTAMP '%s', + 300 )) LIMIT 1 """, timestampFormatter.format(start), timestampFormatter.format(end)), - "VALUES DATE '%s'".formatted(baseLineDate)); + "VALUES ('%s', 1.0)".formatted(isoTimestampFormatter.format(start.plus(Duration.ofMinutes(5))))); } @Test