From d15a18d24674d5c0ac206d06218d3b4257603fee Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Tue, 21 Jan 2025 13:54:42 +0100 Subject: [PATCH 1/3] Add validation to segment size configuration --- .../protocol/spooling/SpoolingConfig.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java index 3db34cf7e35d..c33caa700f66 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingConfig.java @@ -17,8 +17,12 @@ import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.ConfigSecuritySensitive; import io.airlift.units.DataSize; +import io.airlift.units.MaxDataSize; +import io.airlift.units.MinDataSize; import io.trino.util.Ciphers; import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; @@ -68,6 +72,8 @@ public SpoolingConfig setRetrievalMode(SegmentRetrievalMode retrievalMode) return this; } + @MinDataSize("1kB") + @MaxDataSize("128MB") public DataSize getInitialSegmentSize() { return initialSegmentSize; @@ -81,6 +87,8 @@ public SpoolingConfig setInitialSegmentSize(DataSize initialSegmentSize) return this; } + @MinDataSize("1kB") + @MaxDataSize("128MB") public DataSize getMaximumSegmentSize() { return maximumSegmentSize; @@ -107,6 +115,8 @@ public SpoolingConfig setAllowInlining(boolean allowInlining) return this; } + @Min(1) + @Max(1_000_000) public long getMaximumInlinedRows() { return maximumInlinedRows; @@ -120,6 +130,8 @@ public SpoolingConfig setMaximumInlinedRows(long maximumInlinedRows) return this; } + @MinDataSize("1kB") + @MaxDataSize("1MB") public DataSize getMaximumInlinedSize() { return maximumInlinedSize; @@ -141,6 +153,12 @@ public boolean isSharedEncryptionKeyAes256() .orElse(true); } + @AssertTrue(message = "protocol.spooling.initial-segment-size must be smaller than protocol.spooling.maximum-segment-size") + public boolean areSegmentSizesCorrect() + { + return getInitialSegmentSize().compareTo(getMaximumSegmentSize()) < 0; + } + @AssertTrue(message = "protocol.spooling.shared-secret-key must be set") public boolean isSharedEncryptionKeySet() { From d856917f1c51e584d04407a14acc580eeed95651 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Tue, 21 Jan 2025 14:42:02 +0100 Subject: [PATCH 2/3] Add spooling session properties --- .../OutputSpoolingOperatorFactory.java | 15 ++- .../spooling/SpoolingServerModule.java | 4 + .../spooling/SpoolingSessionProperties.java | 126 ++++++++++++++++++ 3 files changed, 140 insertions(+), 5 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingSessionProperties.java diff --git a/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java index 9b392b5ff475..a25b3ff6dbcd 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/OutputSpoolingOperatorFactory.java @@ -56,6 +56,11 @@ import static io.trino.server.protocol.spooling.SpooledBlock.SPOOLING_METADATA_SYMBOL; import static io.trino.server.protocol.spooling.SpooledBlock.SPOOLING_METADATA_TYPE; import static io.trino.server.protocol.spooling.SpooledBlock.createNonSpooledPage; +import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getInitialSegmentSize; +import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getMaxInlinedRows; +import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getMaxInlinedSize; +import static io.trino.server.protocol.spooling.SpoolingSessionProperties.getMaxSegmentSize; +import static io.trino.server.protocol.spooling.SpoolingSessionProperties.isAllowInlining; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -160,11 +165,11 @@ public OutputSpoolingOperator(OperatorContext operatorContext, QueryDataEncoder { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); this.controller = new OutputSpoolingController( - spoolingConfig.isAllowInlining(), - spoolingConfig.getMaximumInlinedRows(), - spoolingConfig.getMaximumSegmentSize().toBytes(), - spoolingConfig.getInitialSegmentSize().toBytes(), - spoolingConfig.getMaximumSegmentSize().toBytes()); + isAllowInlining(operatorContext.getSession()), + getMaxInlinedRows(operatorContext.getSession()), + getMaxInlinedSize(operatorContext.getSession()).toBytes(), + getInitialSegmentSize(operatorContext.getSession()).toBytes(), + getMaxSegmentSize(operatorContext.getSession()).toBytes()); this.userMemoryContext = operatorContext.newLocalUserMemoryContext(OutputSpoolingOperator.class.getSimpleName()); this.queryDataEncoder = requireNonNull(queryDataEncoder, "queryDataEncoder is null"); this.spoolingManager = requireNonNull(spoolingManager, "spoolingManager is null"); diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingServerModule.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingServerModule.java index d54edd43c5f8..478c973891c7 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingServerModule.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingServerModule.java @@ -22,10 +22,12 @@ import com.google.inject.multibindings.OptionalBinder; import com.google.inject.multibindings.ProvidesIntoSet; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.SystemSessionPropertiesProvider; import io.trino.server.ServerConfig; import io.trino.server.protocol.spooling.SpoolingConfig.SegmentRetrievalMode; import io.trino.spi.spool.SpoolingManager; +import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder; import static io.trino.server.protocol.spooling.QueryDataEncoder.EncoderSelector.noEncoder; @@ -49,6 +51,8 @@ protected void setup(Binder binder) return; } + newSetBinder(binder, SystemSessionPropertiesProvider.class).addBinding().to(SpoolingSessionProperties.class).in(Scopes.SINGLETON); + boolean isCoordinator = buildConfigObject(ServerConfig.class).isCoordinator(); SpoolingConfig spoolingConfig = buildConfigObject(SpoolingConfig.class); binder.bind(QueryDataEncoder.EncoderSelector.class).to(PreferredQueryDataEncoderSelector.class).in(Scopes.SINGLETON); diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingSessionProperties.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingSessionProperties.java new file mode 100644 index 000000000000..1a49069182f7 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingSessionProperties.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.server.protocol.spooling; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.airlift.units.DataSize; +import io.trino.Session; +import io.trino.SystemSessionPropertiesProvider; +import io.trino.spi.TrinoException; +import io.trino.spi.session.PropertyMetadata; + +import java.util.List; +import java.util.function.Consumer; + +import static io.airlift.units.DataSize.Unit.KILOBYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty; +import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; +import static io.trino.spi.session.PropertyMetadata.booleanProperty; +import static io.trino.spi.session.PropertyMetadata.longProperty; + +public class SpoolingSessionProperties + implements SystemSessionPropertiesProvider +{ + // Spooled segments + public static final String INITIAL_SEGMENT_SIZE = "spooling_initial_segment_size"; + public static final String MAX_SEGMENT_SIZE = "spooling_max_segment_size"; + + // Inlined segments + public static final String ALLOW_INLINING = "spooling_inlining_enabled"; + public static final String MAX_INLINED_SIZE = "spooling_max_inlined_size"; + public static final String MAX_INLINED_ROWS = "spooling_max_inlined_rows"; + + private final List> sessionProperties; + + @Inject + public SpoolingSessionProperties(SpoolingConfig spoolingConfig) + { + sessionProperties = ImmutableList.>builder() + .add(dataSizeProperty( + INITIAL_SEGMENT_SIZE, + "Initial size of a spooled segment", + spoolingConfig.getInitialSegmentSize(), + isDataSizeBetween(INITIAL_SEGMENT_SIZE, DataSize.of(1, KILOBYTE), DataSize.of(128, MEGABYTE)), + false)) + .add(dataSizeProperty( + MAX_SEGMENT_SIZE, + "Maximum size of a spooled segment", + spoolingConfig.getMaximumSegmentSize(), + isDataSizeBetween(MAX_SEGMENT_SIZE, DataSize.of(1, KILOBYTE), DataSize.of(128, MEGABYTE)), + false)) + .add(booleanProperty( + ALLOW_INLINING, + "Allow inlining initial rows", + spoolingConfig.isAllowInlining(), + false)) + .add(dataSizeProperty( + MAX_INLINED_SIZE, + "Maximum size of inlined data", + spoolingConfig.getMaximumInlinedSize(), + isDataSizeBetween(MAX_INLINED_SIZE, DataSize.of(1, KILOBYTE), DataSize.of(1, MEGABYTE)), + false)) + .add(longProperty( + MAX_INLINED_ROWS, + "Maximum number of rows that are allowed to be inlined per worker", + spoolingConfig.getMaximumInlinedRows(), + false)) + .build(); + } + + private Consumer isDataSizeBetween(String property, DataSize min, DataSize max) + { + return value -> { + if (min.compareTo(value) > 0) { + throw new TrinoException(INVALID_SESSION_PROPERTY, "Session property '" + property + "' must be greater than " + min); + } + + if (max.compareTo(value) < 0) { + throw new TrinoException(INVALID_SESSION_PROPERTY, "Session property '" + property + "' must be smaller than " + max); + } + }; + } + + public static DataSize getInitialSegmentSize(Session session) + { + return session.getSystemProperty(INITIAL_SEGMENT_SIZE, DataSize.class); + } + + public static DataSize getMaxSegmentSize(Session session) + { + return session.getSystemProperty(MAX_SEGMENT_SIZE, DataSize.class); + } + + public static boolean isAllowInlining(Session session) + { + return session.getSystemProperty(ALLOW_INLINING, Boolean.class); + } + + public static DataSize getMaxInlinedSize(Session session) + { + return session.getSystemProperty(MAX_INLINED_SIZE, DataSize.class); + } + + public static long getMaxInlinedRows(Session session) + { + return session.getSystemProperty(MAX_INLINED_ROWS, Long.class); + } + + @Override + public List> getSessionProperties() + { + return sessionProperties; + } +} From 259c48275d79856492b247358f485291d8c9e132 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Tue, 21 Jan 2025 15:07:40 +0100 Subject: [PATCH 3/3] Rename session property for consistency --- .../src/main/java/io/trino/SystemSessionProperties.java | 8 ++++---- .../main/java/io/trino/execution/QueryStateMachine.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 40e7f1b41625..d6cd30d41b3b 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -219,7 +219,7 @@ public final class SystemSessionProperties public static final String IDLE_WRITER_MIN_DATA_SIZE_THRESHOLD = "idle_writer_min_data_size_threshold"; public static final String CLOSE_IDLE_WRITERS_TRIGGER_DURATION = "close_idle_writers_trigger_duration"; public static final String COLUMNAR_FILTER_EVALUATION_ENABLED = "columnar_filter_evaluation_enabled"; - public static final String SPOOLING_PROTOCOL_ENABLED = "spooling_protocol_enabled"; + public static final String SPOOLING_ENABLED = "spooling_enabled"; private final List> sessionProperties; @@ -1131,7 +1131,7 @@ public SystemSessionProperties( optimizerConfig.isUnsafePushdownAllowed(), true), booleanProperty( - SPOOLING_PROTOCOL_ENABLED, + SPOOLING_ENABLED, "Enable client spooling protocol", true, true)); @@ -2024,9 +2024,9 @@ public static boolean isColumnarFilterEvaluationEnabled(Session session) return session.getSystemProperty(COLUMNAR_FILTER_EVALUATION_ENABLED, Boolean.class); } - public static boolean isSpoolingProtocolEnabled(Session session) + public static boolean isSpoolingEnabled(Session session) { - return session.getSystemProperty(SPOOLING_PROTOCOL_ENABLED, Boolean.class); + return session.getSystemProperty(SPOOLING_ENABLED, Boolean.class); } public static boolean isUnsafePushdownAllowed(Session session) diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 83a754e92193..7e84de77b97d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -88,7 +88,7 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.SystemSessionProperties.getRetryPolicy; -import static io.trino.SystemSessionProperties.isSpoolingProtocolEnabled; +import static io.trino.SystemSessionProperties.isSpoolingEnabled; import static io.trino.execution.BasicStageStats.EMPTY_STAGE_STATS; import static io.trino.execution.QueryState.DISPATCHING; import static io.trino.execution.QueryState.FAILED; @@ -310,7 +310,7 @@ static QueryStateMachine beginWithTicker( session = session.withExchangeEncryption(serializeAesEncryptionKey(createRandomAesEncryptionKey())); } - if (!queryType.map(SELECT::equals).orElse(false) || !isSpoolingProtocolEnabled(session)) { + if (!queryType.map(SELECT::equals).orElse(false) || !isSpoolingEnabled(session)) { session = session.withoutSpooling(); }