From ae9cd450d486d9eaf6af849a8cce2f14129bcc3c Mon Sep 17 00:00:00 2001 From: ziyang Date: Mon, 16 Dec 2024 15:32:43 +0800 Subject: [PATCH 1/3] [orc][format] add cache for OrcFileFormat --- .../apache/paimon/fs/ObjectCacheManager.java | 57 +++++++++++++++++ .../paimon/format/orc/OrcFileFormat.java | 27 ++++++-- .../paimon/format/orc/OrcFileFormatTest.java | 64 +++++++++++++++++++ 3 files changed, 144 insertions(+), 4 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java new file mode 100644 index 000000000000..f157578107eb --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import java.time.Duration; +import java.util.function.Function; + +/** + * Sample Object Cache Manager . + * + * @param + * @param + */ +public class ObjectCacheManager { + private final Cache cache; + + private ObjectCacheManager(Duration timeout, int maxSize) { + this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build(); + } + + public static ObjectCacheManager newObjectCacheManager( + Duration timeout, int maxSize) { + return new ObjectCacheManager<>(timeout, maxSize); + } + + public ObjectCacheManager put(K k, V v) { + this.cache.put(k, v); + return this; + } + + public V get(K k, Function creator) { + return this.cache.get(k, creator); + } + + public V getIfPresent(K k) { + return this.cache.getIfPresent(k); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index c3521c6f1a37..5ed10bdfdd1e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -30,6 +30,7 @@ import org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor; import org.apache.paimon.format.orc.writer.RowDataVectorizer; import org.apache.paimon.format.orc.writer.Vectorizer; +import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -43,12 +44,14 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.hadoop.conf.Configuration; import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -71,13 +74,29 @@ public class OrcFileFormat extends FileFormat { private final int writeBatchSize; private final boolean deletionVectorsEnabled; + private static final org.apache.hadoop.conf.Configuration emptyConf = + new org.apache.hadoop.conf.Configuration(); + private static final ObjectCacheManager configCache = + ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + + static { + emptyConf.set("paimon.empty.configuration", "paimon.empty.configuration"); + } + public OrcFileFormat(FormatContext formatContext) { super(IDENTIFIER); this.orcProperties = getOrcProperties(formatContext.options(), formatContext); - this.readerConf = new org.apache.hadoop.conf.Configuration(); - this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString())); - this.writerConf = new org.apache.hadoop.conf.Configuration(); - this.orcProperties.forEach((k, v) -> writerConf.set(k.toString(), v.toString())); + Configuration conf; + Configuration cachedConf = configCache.getIfPresent(orcProperties); + if (cachedConf != null) { + conf = cachedConf; + } else { + conf = new org.apache.hadoop.conf.Configuration(emptyConf); + this.orcProperties.forEach((k, v) -> conf.set(k.toString(), v.toString())); + configCache.put(orcProperties, conf); + } + this.readerConf = conf; + this.writerConf = conf; this.readBatchSize = formatContext.readBatchSize(); this.writeBatchSize = formatContext.writeBatchSize(); this.deletionVectorsEnabled = formatContext.options().get(DELETION_VECTORS_ENABLED); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java index 46bf6afe6613..9e5769595c32 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java @@ -83,4 +83,68 @@ public void testSupportedDataTypes() { dataFields.add(new DataField(index++, "decimal_type", DataTypes.DECIMAL(10, 3))); orc.validateDataFields(new RowType(dataFields)); } + + @Test + public void testCreateCost() { + double createConfCost = createConfigCost(); + for (int i = 0; i < 1000; i++) { + create(); + } + int times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < times; i++) { + create(); + } + double cost = ((double) (System.nanoTime() - start)) / 1000_000 / times; + assertThat(cost * 500 < createConfCost).isTrue(); + } + + @Test + public void testCreateCostWithRandomConfig() { + double createConfCost = createConfigCost(); + for (int i = 0; i < 1000; i++) { + createRandomConfig(); + } + int times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < times; i++) { + createRandomConfig(); + } + double cost = ((double) (System.nanoTime() - start)) / 1000_000 / times; + assertThat(cost * 10 < createConfCost).isTrue(); + } + + private double createConfigCost() { + for (int i = 0; i < 1000; i++) { + createConfig(); + } + int times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < times; i++) { + createConfig(); + } + return ((double) (System.nanoTime() - start)) / 1000_000 / times; + } + + private void createConfig() { + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("a", "a"); + } + + private void create() { + Options options = new Options(); + options.setString("haha", "1"); + options.setString("compress", "zlib"); + OrcFileFormat orcFileFormat = + new OrcFileFormatFactory().create(new FormatContext(options, 1024, 1024)); + } + + private void createRandomConfig() { + Options options = new Options(); + options.setString("haha", "1"); + options.setString("compress", "zlib"); + options.setString("a", Math.random() + ""); + OrcFileFormat orcFileFormat = + new OrcFileFormatFactory().create(new FormatContext(options, 1024, 1024)); + } } From 56115d4a7bec202a1c48bf5c6831440fc5e1fc67 Mon Sep 17 00:00:00 2001 From: ziyang Date: Mon, 16 Dec 2024 19:09:53 +0800 Subject: [PATCH 2/3] [format] Switch to using the Configuration(boolean) constructor, switch to a regular cache. --- .../apache/paimon/fs/ObjectCacheManager.java | 57 ------------------- .../paimon/format/orc/OrcFileFormat.java | 15 ++--- 2 files changed, 5 insertions(+), 67 deletions(-) delete mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java deleted file mode 100644 index f157578107eb..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.fs; - -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; - -import java.time.Duration; -import java.util.function.Function; - -/** - * Sample Object Cache Manager . - * - * @param - * @param - */ -public class ObjectCacheManager { - private final Cache cache; - - private ObjectCacheManager(Duration timeout, int maxSize) { - this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build(); - } - - public static ObjectCacheManager newObjectCacheManager( - Duration timeout, int maxSize) { - return new ObjectCacheManager<>(timeout, maxSize); - } - - public ObjectCacheManager put(K k, V v) { - this.cache.put(k, v); - return this; - } - - public V get(K k, Function creator) { - return this.cache.get(k, creator); - } - - public V getIfPresent(K k) { - return this.cache.getIfPresent(k); - } -} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index 5ed10bdfdd1e..f9e70cd82870 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -30,10 +30,11 @@ import org.apache.paimon.format.orc.filter.OrcSimpleStatsExtractor; import org.apache.paimon.format.orc.writer.RowDataVectorizer; import org.apache.paimon.format.orc.writer.Vectorizer; -import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; @@ -74,14 +75,8 @@ public class OrcFileFormat extends FileFormat { private final int writeBatchSize; private final boolean deletionVectorsEnabled; - private static final org.apache.hadoop.conf.Configuration emptyConf = - new org.apache.hadoop.conf.Configuration(); - private static final ObjectCacheManager configCache = - ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); - - static { - emptyConf.set("paimon.empty.configuration", "paimon.empty.configuration"); - } + private static final Cache configCache = + Caffeine.newBuilder().maximumSize(100).expireAfterWrite(Duration.ofMinutes(30)).build(); public OrcFileFormat(FormatContext formatContext) { super(IDENTIFIER); @@ -91,7 +86,7 @@ public OrcFileFormat(FormatContext formatContext) { if (cachedConf != null) { conf = cachedConf; } else { - conf = new org.apache.hadoop.conf.Configuration(emptyConf); + conf = new org.apache.hadoop.conf.Configuration(false); this.orcProperties.forEach((k, v) -> conf.set(k.toString(), v.toString())); configCache.put(orcProperties, conf); } From 1cf3b74884a3095fb058151885b26580974f7620 Mon Sep 17 00:00:00 2001 From: ziyang Date: Mon, 16 Dec 2024 19:52:18 +0800 Subject: [PATCH 3/3] [format] Switch to using the Configuration(boolean) constructor --- .../paimon/format/orc/OrcFileFormat.java | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index f9e70cd82870..9acea56ab393 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -33,8 +33,6 @@ import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; @@ -45,14 +43,12 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; -import org.apache.hadoop.conf.Configuration; import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -75,23 +71,13 @@ public class OrcFileFormat extends FileFormat { private final int writeBatchSize; private final boolean deletionVectorsEnabled; - private static final Cache configCache = - Caffeine.newBuilder().maximumSize(100).expireAfterWrite(Duration.ofMinutes(30)).build(); - public OrcFileFormat(FormatContext formatContext) { super(IDENTIFIER); this.orcProperties = getOrcProperties(formatContext.options(), formatContext); - Configuration conf; - Configuration cachedConf = configCache.getIfPresent(orcProperties); - if (cachedConf != null) { - conf = cachedConf; - } else { - conf = new org.apache.hadoop.conf.Configuration(false); - this.orcProperties.forEach((k, v) -> conf.set(k.toString(), v.toString())); - configCache.put(orcProperties, conf); - } - this.readerConf = conf; - this.writerConf = conf; + this.readerConf = new org.apache.hadoop.conf.Configuration(false); + this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString())); + this.writerConf = new org.apache.hadoop.conf.Configuration(false); + this.orcProperties.forEach((k, v) -> writerConf.set(k.toString(), v.toString())); this.readBatchSize = formatContext.readBatchSize(); this.writeBatchSize = formatContext.writeBatchSize(); this.deletionVectorsEnabled = formatContext.options().get(DELETION_VECTORS_ENABLED);