From 58d730befde68cab529ac7cf2e5fc2e0745ab1d4 Mon Sep 17 00:00:00 2001 From: vikasvb90 Date: Fri, 3 May 2024 12:09:52 +0530 Subject: [PATCH] Added stats for permits and queue in repository-s3 for permit backed transfers Signed-off-by: vikasvb90 --- .../s3/GenericStatsMetricPublisher.java | 74 ++++++++++ .../repositories/s3/S3BlobContainer.java | 9 +- .../repositories/s3/S3BlobStore.java | 19 ++- .../repositories/s3/S3Repository.java | 20 ++- .../repositories/s3/S3RepositoryPlugin.java | 26 +++- .../s3/async/SizeBasedBlockingQ.java | 49 ++++++- .../s3/async/TransferSemaphoresHolder.java | 105 +++++++++----- .../s3/S3BlobContainerMockClientTests.java | 29 +++- .../s3/S3BlobContainerRetriesTests.java | 20 ++- .../repositories/s3/S3RepositoryTests.java | 3 +- .../s3/async/AsyncTransferManagerTests.java | 9 +- .../s3/async/SizeBasedBlockingQTests.java | 12 +- .../async/TransferSemaphoresHolderTests.java | 131 ++++++++++-------- .../common/blobstore/BlobStore.java | 3 +- 14 files changed, 384 insertions(+), 125 deletions(-) create mode 100644 plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java new file mode 100644 index 0000000000000..8a37dc06033fa --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Generic stats of repository-s3 plugin. + */ +public class GenericStatsMetricPublisher { + + private final AtomicLong normalPriorityQSize = new AtomicLong(); + private final AtomicInteger normalPriorityPermits = new AtomicInteger(); + private final AtomicLong lowPriorityQSize = new AtomicLong(); + private final AtomicInteger lowPriorityPermits = new AtomicInteger(); + + public void updateNormalPriorityQSize(long qSize) { + normalPriorityQSize.addAndGet(qSize); + } + + public void updateLowPriorityQSize(long qSize) { + lowPriorityQSize.addAndGet(qSize); + } + + public void updateNormalPermits(boolean increment) { + if (increment) { + normalPriorityPermits.incrementAndGet(); + } else { + normalPriorityPermits.decrementAndGet(); + } + } + + public void updateLowPermits(boolean increment) { + if (increment) { + lowPriorityPermits.incrementAndGet(); + } else { + lowPriorityPermits.decrementAndGet(); + } + } + + public long getNormalPriorityQSize() { + return normalPriorityQSize.get(); + } + + public int getAcquiredNormalPriorityPermits() { + return normalPriorityPermits.get(); + } + + public long getLowPriorityQSize() { + return lowPriorityQSize.get(); + } + + public int getAcquiredLowPriorityPermits() { + return lowPriorityPermits.get(); + } + + Map stats() { + final Map results = new HashMap<>(); + results.put("NormalPriorityQSize", normalPriorityQSize.get()); + results.put("LowPriorityQSize", lowPriorityQSize.get()); + results.put("AcquiredNormalPriorityPermits", (long) normalPriorityPermits.get()); + results.put("AcquiredLowPriorityPermits", (long) lowPriorityPermits.get()); + return results; + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 3c8a3f127a295..acf0c5e83a17b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -222,10 +222,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp // If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload. // Therefore, redirecting it to slow client. if ((uploadRequest.getWritePriority() == WritePriority.LOW - && blobStore.getLowPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false) + && blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false) || (uploadRequest.getWritePriority() != WritePriority.HIGH && uploadRequest.getWritePriority() != WritePriority.URGENT - && blobStore.getNormalPrioritySizeBasedBlockingQ().isBelowCapacity(uploadRequest.getContentLength()) == false)) { + && blobStore.getNormalPrioritySizeBasedBlockingQ() + .isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) { StreamContext streamContext = SocketAccess.doPrivileged( () -> writeContext.getStreamProvider(uploadRequest.getContentLength()) ); @@ -266,7 +267,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp s3AsyncClient = amazonS3Reference.get().client(); } - if (writeContext.getWritePriority() == WritePriority.URGENT || writeContext.getWritePriority() == WritePriority.HIGH) { + if (writeContext.getWritePriority() == WritePriority.URGENT + || writeContext.getWritePriority() == WritePriority.HIGH + || blobStore.isPermitBackedTransferEnabled() == false) { createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener); } else if (writeContext.getWritePriority() == WritePriority.LOW) { blobStore.getLowPrioritySizeBasedBlockingQ() diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index 414e6eeb9369b..de815f9202f44 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -57,6 +57,7 @@ import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING; import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING; +import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED; import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD; import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING; import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING; @@ -78,6 +79,8 @@ class S3BlobStore implements BlobStore { private volatile boolean uploadRetryEnabled; + private volatile boolean permitBackedTransferEnabled; + private volatile boolean serverSideEncryption; private volatile ObjectCannedACL cannedACL; @@ -97,6 +100,7 @@ class S3BlobStore implements BlobStore { private final boolean multipartUploadEnabled; private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; + private final GenericStatsMetricPublisher genericStatsMetricPublisher; S3BlobStore( S3Service service, @@ -114,7 +118,8 @@ class S3BlobStore implements BlobStore { AsyncExecutorContainer priorityExecutorBuilder, AsyncExecutorContainer normalExecutorBuilder, SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, - SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ + SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, + GenericStatsMetricPublisher genericStatsMetricPublisher ) { this.service = service; this.s3AsyncService = s3AsyncService; @@ -135,6 +140,8 @@ class S3BlobStore implements BlobStore { this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; + this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -148,6 +155,7 @@ public void reload(RepositoryMetadata repositoryMetadata) { this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings()); this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); + this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -175,6 +183,10 @@ public boolean isUploadRetryEnabled() { return uploadRetryEnabled; } + public boolean isPermitBackedTransferEnabled() { + return permitBackedTransferEnabled; + } + public String bucket() { return bucket; } @@ -216,7 +228,9 @@ public void close() throws IOException { @Override public Map stats() { - return statsMetricPublisher.getStats().toMap(); + Map stats = statsMetricPublisher.getStats().toMap(); + stats.putAll(genericStatsMetricPublisher.stats()); + return stats; } @Override @@ -226,6 +240,7 @@ public Map> extendedStats() { } Map> extendedStats = new HashMap<>(); statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap())); + extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats()); return extendedStats; } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 269135a15d411..80be255b33e9b 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -158,6 +158,15 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.NodeScope ); + /** + * Whether large uploads need to be redirected to slow sync s3 client. + */ + static final Setting PERMIT_BACKED_TRANSFER_ENABLED = Setting.boolSetting( + "permit_backed_transfer_enabled", + true, + Setting.Property.NodeScope + ); + /** * Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries. */ @@ -287,6 +296,7 @@ class S3Repository extends MeteredBlobStoreRepository { private final Path pluginConfigPath; private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; + private final GenericStatsMetricPublisher genericStatsMetricPublisher; private volatile int bulkDeletesSize; @@ -320,7 +330,8 @@ class S3Repository extends MeteredBlobStoreRepository { multipartUploadEnabled, Path.of(""), normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ + lowPrioritySizeBasedBlockingQ, + new GenericStatsMetricPublisher() ); } @@ -341,7 +352,8 @@ class S3Repository extends MeteredBlobStoreRepository { final boolean multipartUploadEnabled, Path pluginConfigPath, final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ, - final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ + final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ, + final GenericStatsMetricPublisher genericStatsMetricPublisher ) { super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.service = service; @@ -354,6 +366,7 @@ class S3Repository extends MeteredBlobStoreRepository { this.normalExecutorBuilder = normalExecutorBuilder; this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ; this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; validateRepositoryMetadata(metadata); readRepositoryMetadata(); @@ -418,7 +431,8 @@ protected S3BlobStore createBlobStore() { priorityExecutorBuilder, normalExecutorBuilder, normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ + lowPrioritySizeBasedBlockingQ, + genericStatsMetricPublisher ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index b9065a52601c6..cbb1488b520c2 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -105,6 +105,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ; private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; private TransferSemaphoresHolder transferSemaphoresHolder; + private GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(); public S3RepositoryPlugin(final Settings settings, final Path configPath) { this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath)); @@ -236,13 +237,16 @@ public Collection createComponents( this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB), normalTransferQConsumerService, - normalPriorityConsumers + normalPriorityConsumers, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL ); int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings()); LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ( new ByteSizeValue(lowPriorityConsumers * 20L, ByteSizeUnit.GB), lowTransferQConsumerService, - lowPriorityConsumers + lowPriorityConsumers, + genericStatsMetricPublisher ); this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; this.transferSemaphoresHolder = new TransferSemaphoresHolder( @@ -250,7 +254,8 @@ public Collection createComponents( Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10), ((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) / 100, S3Repository.S3_PERMIT_WAIT_DURATION_MIN.get(clusterService.getSettings()), - TimeUnit.MINUTES + TimeUnit.MINUTES, + genericStatsMetricPublisher ); return CollectionUtils.arrayAsArrayList(this.normalPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ); @@ -259,8 +264,13 @@ public Collection createComponents( // New class because in core, components are injected via guice only by instance creation due to which // same binding types fail. private static final class LowPrioritySizeBasedBlockingQ extends SizeBasedBlockingQ { - public LowPrioritySizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorService, int consumers) { - super(capacity, executorService, consumers); + public LowPrioritySizeBasedBlockingQ( + ByteSizeValue capacity, + ExecutorService executorService, + int consumers, + GenericStatsMetricPublisher genericStatsMetricPublisher + ) { + super(capacity, executorService, consumers, genericStatsMetricPublisher, QueueEventType.LOW); } } @@ -293,7 +303,8 @@ protected S3Repository createRepository( S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), configPath, normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ + lowPrioritySizeBasedBlockingQ, + genericStatsMetricPublisher ); } @@ -339,7 +350,8 @@ public List> getSettings() { S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING, S3Repository.REDIRECT_LARGE_S3_UPLOAD, S3Repository.UPLOAD_RETRY_ENABLED, - S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT + S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT, + S3Repository.PERMIT_BACKED_TRANSFER_ENABLED ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java index ace8003d0fe55..170c80f5d4db6 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java @@ -13,6 +13,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; import org.opensearch.repositories.s3.S3TransferRejectedException; import java.util.concurrent.ExecutorService; @@ -39,11 +40,19 @@ public class SizeBasedBlockingQ extends AbstractLifecycleComponent { protected final AtomicBoolean closed; protected final ExecutorService executorService; protected final int consumers; + private final GenericStatsMetricPublisher genericStatsMetricPublisher; + private final QueueEventType queueEventType; /** * Constructor to create sized based blocking queue. */ - public SizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorService, int consumers) { + public SizeBasedBlockingQ( + ByteSizeValue capacity, + ExecutorService executorService, + int consumers, + GenericStatsMetricPublisher genericStatsMetricPublisher, + QueueEventType queueEventType + ) { this.queue = new LinkedBlockingQueue<>(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); @@ -52,12 +61,19 @@ public SizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorServic this.closed = new AtomicBoolean(); this.executorService = executorService; this.consumers = consumers; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; + this.queueEventType = queueEventType; + } + + public enum QueueEventType { + NORMAL, + LOW; } @Override protected void doStart() { for (int worker = 0; worker < consumers; worker++) { - Thread consumer = new Consumer(queue, currentSize, lock, notEmpty, closed); + Thread consumer = new Consumer(queue, currentSize, lock, notEmpty, closed, genericStatsMetricPublisher, queueEventType); executorService.submit(consumer); } } @@ -88,16 +104,26 @@ public void produce(Item item) throws InterruptedException { queue.put(item); currentSize.addAndGet(item.size); notEmpty.signalAll(); + updateStats(item.size, queueEventType, genericStatsMetricPublisher); } finally { lock.unlock(); } + + } + + private static void updateStats(long itemSize, QueueEventType queueEventType, GenericStatsMetricPublisher genericStatsMetricPublisher) { + if (queueEventType == QueueEventType.NORMAL) { + genericStatsMetricPublisher.updateNormalPriorityQSize(itemSize); + } else if (queueEventType == QueueEventType.LOW) { + genericStatsMetricPublisher.updateLowPriorityQSize(itemSize); + } } public int getSize() { return queue.size(); } - public boolean isBelowCapacity(long contentLength) { + public boolean isMaxCapacityBelowContentLength(long contentLength) { return contentLength < capacity.getBytes(); } @@ -107,13 +133,25 @@ protected static class Consumer extends Thread { private final Condition notEmpty; private final AtomicLong currentSize; private final AtomicBoolean closed; - - public Consumer(LinkedBlockingQueue queue, AtomicLong currentSize, Lock lock, Condition notEmpty, AtomicBoolean closed) { + private final GenericStatsMetricPublisher genericStatsMetricPublisher; + private final QueueEventType queueEventType; + + public Consumer( + LinkedBlockingQueue queue, + AtomicLong currentSize, + Lock lock, + Condition notEmpty, + AtomicBoolean closed, + GenericStatsMetricPublisher genericStatsMetricPublisher, + QueueEventType queueEventType + ) { this.queue = queue; this.lock = lock; this.notEmpty = notEmpty; this.currentSize = currentSize; this.closed = closed; + this.genericStatsMetricPublisher = genericStatsMetricPublisher; + this.queueEventType = queueEventType; } @Override @@ -147,6 +185,7 @@ private void consume() throws InterruptedException { item = queue.take(); currentSize.addAndGet(-item.size); + updateStats(-item.size, queueEventType, genericStatsMetricPublisher); } finally { lock.unlock(); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java index 77c2a77fb5c12..ce3b8929c35f4 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolder.java @@ -11,10 +11,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Transfer semaphore holder for controlled transfer of data to remote. @@ -23,8 +25,8 @@ public class TransferSemaphoresHolder { private static final Logger log = LogManager.getLogger(TransferSemaphoresHolder.class); // For tests protected TypeSemaphore lowPrioritySemaphore; - protected TypeSemaphore highPrioritySemaphore; - private final int highPriorityPermits; + protected TypeSemaphore normalPrioritySemaphore; + private final int normalPriorityPermits; private final int lowPriorityPermits; private final int acquireWaitDuration; private final TimeUnit acquireWaitDurationUnit; @@ -32,12 +34,26 @@ public class TransferSemaphoresHolder { /** * Constructor to create semaphores holder. */ - public TransferSemaphoresHolder(int availablePermits, double priorityPermitAllocation, int acquireWaitDuration, TimeUnit timeUnit) { - - this.highPriorityPermits = (int) (priorityPermitAllocation * availablePermits); - this.highPrioritySemaphore = new TypeSemaphore(highPriorityPermits, "high"); - this.lowPriorityPermits = availablePermits - highPriorityPermits; - this.lowPrioritySemaphore = new TypeSemaphore(lowPriorityPermits, "low"); + public TransferSemaphoresHolder( + int availablePermits, + double priorityPermitAllocation, + int acquireWaitDuration, + TimeUnit timeUnit, + GenericStatsMetricPublisher genericStatsPublisher + ) { + + this.normalPriorityPermits = (int) (priorityPermitAllocation * availablePermits); + this.normalPrioritySemaphore = new TypeSemaphore( + normalPriorityPermits, + TypeSemaphore.PermitType.NORMAL, + genericStatsPublisher::updateNormalPermits + ); + this.lowPriorityPermits = availablePermits - normalPriorityPermits; + this.lowPrioritySemaphore = new TypeSemaphore( + lowPriorityPermits, + TypeSemaphore.PermitType.LOW, + genericStatsPublisher::updateLowPermits + ); this.acquireWaitDuration = acquireWaitDuration; this.acquireWaitDurationUnit = timeUnit; } @@ -46,15 +62,46 @@ public TransferSemaphoresHolder(int availablePermits, double priorityPermitAlloc * Overridden semaphore to identify transfer semaphores among all other semaphores for triaging. */ public static class TypeSemaphore extends Semaphore { - private final String type; + private final PermitType permitType; + private final Consumer permitChangeConsumer; - public TypeSemaphore(int permits, String type) { + public enum PermitType { + NORMAL, + LOW; + } + + public TypeSemaphore(int permits, PermitType permitType, Consumer permitChangeConsumer) { super(permits); - this.type = type; + this.permitType = permitType; + this.permitChangeConsumer = permitChangeConsumer; + } + + public PermitType getType() { + return permitType; + } + + @Override + public boolean tryAcquire() { + boolean acquired = super.tryAcquire(); + if (acquired) { + permitChangeConsumer.accept(true); + } + return acquired; } - public String getType() { - return type; + @Override + public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { + boolean acquired = super.tryAcquire(timeout, unit); + if (acquired) { + permitChangeConsumer.accept(true); + } + return acquired; + } + + @Override + public void release() { + super.release(); + permitChangeConsumer.accept(false); } } @@ -94,7 +141,7 @@ public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext r () -> "Acquire permit request for transfer type: " + writePriority + ". Available high priority permits: " - + highPrioritySemaphore.availablePermits() + + normalPrioritySemaphore.availablePermits() + " and low priority permits: " + lowPrioritySemaphore.availablePermits() ); @@ -103,8 +150,8 @@ public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext r if (Objects.requireNonNull(writePriority) == WritePriority.LOW) { if (lowPrioritySemaphore.tryAcquire()) { return lowPrioritySemaphore; - } else if (highPrioritySemaphore.availablePermits() > 0.4 * highPriorityPermits && highPrioritySemaphore.tryAcquire()) { - return highPrioritySemaphore; + } else if (normalPrioritySemaphore.availablePermits() > 0.4 * normalPriorityPermits && normalPrioritySemaphore.tryAcquire()) { + return normalPrioritySemaphore; } else if (lowPrioritySemaphore.tryAcquire(acquireWaitDuration, acquireWaitDurationUnit)) { return lowPrioritySemaphore; } @@ -113,12 +160,12 @@ public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext r // Try acquiring high priority permit or low priority permit immediately if available. // Otherwise, we wait for high priority permit. - if (highPrioritySemaphore.tryAcquire()) { - return highPrioritySemaphore; + if (normalPrioritySemaphore.tryAcquire()) { + return normalPrioritySemaphore; } else if (requestContext.lowPriorityPermitsConsumable && lowPrioritySemaphore.tryAcquire()) { return lowPrioritySemaphore; - } else if (highPrioritySemaphore.tryAcquire(acquireWaitDuration, acquireWaitDurationUnit)) { - return highPrioritySemaphore; + } else if (normalPrioritySemaphore.tryAcquire(acquireWaitDuration, acquireWaitDurationUnit)) { + return normalPrioritySemaphore; } return null; } @@ -126,8 +173,8 @@ public TypeSemaphore acquirePermit(WritePriority writePriority, RequestContext r /** * Used in tests. */ - public int getHighPriorityPermits() { - return highPriorityPermits; + public int getNormalPriorityPermits() { + return normalPriorityPermits; } /** @@ -136,18 +183,4 @@ public int getHighPriorityPermits() { public int getLowPriorityPermits() { return lowPriorityPermits; } - - /** - * Used in tests. - */ - public int getAvailableHighPriorityPermits() { - return highPrioritySemaphore.availablePermits(); - } - - /** - * Used in tests. - */ - public int getAvailableLowPriorityPermits() { - return lowPrioritySemaphore.availablePermits(); - } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index d5f664b1f0608..9b15db358dbb9 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -377,15 +377,20 @@ public void setUp() throws Exception { transferQueueConsumerService = Executors.newFixedThreadPool(20); scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); transferNIOGroup = new AsyncTransferEventLoopGroup(1); + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(); normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 10L, ByteSizeUnit.GB), transferQueueConsumerService, - 10 + 10, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL ); lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 20L, ByteSizeUnit.GB), transferQueueConsumerService, - 5 + 5, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL ); normalPrioritySizeBasedBlockingQ.start(); lowPrioritySizeBasedBlockingQ.start(); @@ -443,13 +448,20 @@ private S3BlobStore createBlobStore() { asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - new TransferSemaphoresHolder(3, Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), 5, TimeUnit.MINUTES) + new TransferSemaphoresHolder( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 5, + TimeUnit.MINUTES, + new GenericStatsMetricPublisher() + ) ), asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer, normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ + lowPrioritySizeBasedBlockingQ, + new GenericStatsMetricPublisher() ); } @@ -625,7 +637,14 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W int numberOfParts = 20; final ByteSizeValue partSize = new ByteSizeValue(capacity.getBytes() / numberOfParts + 1, ByteSizeUnit.BYTES); - SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ(capacity, transferQueueConsumerService, 10); + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(); + SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ( + capacity, + transferQueueConsumerService, + 10, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL + ); final long lastPartSize = new ByteSizeValue(200, ByteSizeUnit.MB).getBytes(); final long blobSize = ((numberOfParts - 1) * partSize.getBytes()) + lastPartSize; diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index f2f87231e9620..b65aee5fb8028 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -137,15 +137,20 @@ public void setUp() throws Exception { remoteTransferRetry = Executors.newFixedThreadPool(20); transferQueueConsumerService = Executors.newFixedThreadPool(2); scheduler = new ScheduledThreadPoolExecutor(1); + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(); normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), transferQueueConsumerService, - 2 + 2, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.NORMAL ); lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), transferQueueConsumerService, - 2 + 2, + genericStatsMetricPublisher, + SizeBasedBlockingQ.QueueEventType.LOW ); normalPrioritySizeBasedBlockingQ.start(); lowPrioritySizeBasedBlockingQ.start(); @@ -252,13 +257,20 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - new TransferSemaphoresHolder(3, Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), 5, TimeUnit.MINUTES) + new TransferSemaphoresHolder( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 5, + TimeUnit.MINUTES, + new GenericStatsMetricPublisher() + ) ), asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer, normalPrioritySizeBasedBlockingQ, - lowPrioritySizeBasedBlockingQ + lowPrioritySizeBasedBlockingQ, + new GenericStatsMetricPublisher() ) ) { @Override diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java index cc7aad76fcd0c..26ce3a29f36fa 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java @@ -172,7 +172,8 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { false, null, null, - null + null, + new GenericStatsMetricPublisher() ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 3a2712c0feaaf..f874f1b43849a 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -33,6 +33,7 @@ import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.blobstore.ZeroInputStream; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -69,7 +70,13 @@ public void setUp() throws Exception { Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), - new TransferSemaphoresHolder(3, Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), 5, TimeUnit.MINUTES) + new TransferSemaphoresHolder( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 5, + TimeUnit.MINUTES, + new GenericStatsMetricPublisher() + ) ); super.setUp(); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java index 53f507c72784a..6dc86b6d5b227 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java @@ -10,6 +10,7 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; import org.opensearch.repositories.s3.S3TransferRejectedException; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; @@ -40,11 +41,16 @@ public void tearDown() throws Exception { } public void testProducerConsumerOfBulkItems() throws InterruptedException { - + GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(); + SizeBasedBlockingQ.QueueEventType queueEventType = randomBoolean() + ? SizeBasedBlockingQ.QueueEventType.NORMAL + : SizeBasedBlockingQ.QueueEventType.LOW; SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ( new ByteSizeValue(ByteSizeUnit.BYTES.toBytes(10)), consumerService, - 10 + 10, + genericStatsMetricPublisher, + queueEventType ); sizeBasedBlockingQ.start(); int numOfItems = randomIntBetween(100, 1000); @@ -76,6 +82,8 @@ public void testProducerConsumerOfBulkItems() throws InterruptedException { latch.await(); sizeBasedBlockingQ.close(); assertFalse(unknownError.get()); + assertEquals(0L, genericStatsMetricPublisher.getNormalPriorityQSize()); + assertEquals(0L, genericStatsMetricPublisher.getLowPriorityQSize()); } static class TestItemToStr extends SizeBasedBlockingQ.Item { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java index bcec8ece062cf..01510aac86330 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/TransferSemaphoresHolderTests.java @@ -9,6 +9,7 @@ package org.opensearch.repositories.s3.async; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.repositories.s3.GenericStatsMetricPublisher; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -19,39 +20,47 @@ import org.mockito.Mockito; +import static org.opensearch.repositories.s3.async.TransferSemaphoresHolder.TypeSemaphore.PermitType; + public class TransferSemaphoresHolderTests extends OpenSearchTestCase { public void testAllocation() { int availablePermits = randomIntBetween(5, 20); double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int highPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - highPermits; + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(); TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( availablePermits, priorityAllocation, 1, - TimeUnit.NANOSECONDS + TimeUnit.NANOSECONDS, + genericStatsPublisher ); - assertEquals(highPermits, transferSemaphoresHolder.getHighPriorityPermits()); + assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); } public void testLowPriorityEventPermitAcquisition() throws InterruptedException { int availablePermits = randomIntBetween(5, 50); double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int highPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - highPermits; + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(); TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( availablePermits, priorityAllocation, 1, - TimeUnit.NANOSECONDS + TimeUnit.NANOSECONDS, + genericStatsPublisher ); List semaphores = new ArrayList<>(); - int highPermitsEligibleForLowEvents = highPermits - (int) (highPermits * 0.4); + int normalPermitsEligibleForLowEvents = normalPermits - (int) (normalPermits * 0.4); - int lowAcquisitionsExpected = (highPermitsEligibleForLowEvents + lowPermits); + int lowAcquisitionsExpected = (normalPermitsEligibleForLowEvents + lowPermits); for (int i = 0; i < lowAcquisitionsExpected; i++) { TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( @@ -60,19 +69,19 @@ public void testLowPriorityEventPermitAcquisition() throws InterruptedException ); semaphores.add(acquiredSemaphore); if (i >= lowPermits) { - assertEquals("high", acquiredSemaphore.getType()); + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); } else { - assertEquals("low", acquiredSemaphore.getType()); + assertEquals(PermitType.LOW, acquiredSemaphore.getType()); } } - for (int i = 0; i < highPermits - highPermitsEligibleForLowEvents; i++) { + for (int i = 0; i < normalPermits - normalPermitsEligibleForLowEvents; i++) { TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.HIGH, + WritePriority.NORMAL, requestContext ); - assertEquals("high", acquiredSemaphore.getType()); + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); semaphores.add(acquiredSemaphore); } @@ -85,44 +94,51 @@ public void testLowPriorityEventPermitAcquisition() throws InterruptedException assertEquals(availablePermits, semaphores.size()); semaphores.forEach(Semaphore::release); - assertEquals(highPermits, transferSemaphoresHolder.getHighPriorityPermits()); + assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); } - public void testHighPermitEventAcquisition() throws InterruptedException { + public void testNormalPermitEventAcquisition() throws InterruptedException { int availablePermits = randomIntBetween(5, 50); double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int highPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - highPermits; + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; + GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(); TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder( availablePermits, priorityAllocation, 1, - TimeUnit.NANOSECONDS + TimeUnit.NANOSECONDS, + genericStatsPublisher ); List semaphores = new ArrayList<>(); List lowSemaphores = new ArrayList<>(); - int highAcquisitionsExpected = highPermits + lowPermits; + int normalAcquisitionsExpected = normalPermits + lowPermits; TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - for (int i = 0; i < highAcquisitionsExpected; i++) { + for (int i = 0; i < normalAcquisitionsExpected; i++) { TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.HIGH, + WritePriority.NORMAL, requestContext ); semaphores.add(acquiredSemaphore); - if (i >= highPermits) { - assertEquals("low", acquiredSemaphore.getType()); + if (i >= normalPermits) { + assertEquals(PermitType.LOW, acquiredSemaphore.getType()); lowSemaphores.add(acquiredSemaphore); } else { - assertEquals("high", acquiredSemaphore.getType()); + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); } } assertEquals(availablePermits, semaphores.size()); int lowAcquired = lowPermits; - lowSemaphores.get(0).release(); + + Semaphore removedLowSemaphore = lowSemaphores.remove(0); + removedLowSemaphore.release(); + semaphores.remove(removedLowSemaphore); requestContext = transferSemaphoresHolder.createRequestContext(); TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( @@ -133,18 +149,20 @@ public void testHighPermitEventAcquisition() throws InterruptedException { lowSemaphores.add(acquiredSemaphore); while (lowAcquired > 1) { requestContext = transferSemaphoresHolder.createRequestContext(); - acquiredSemaphore = transferSemaphoresHolder.acquirePermit(WritePriority.HIGH, requestContext); + acquiredSemaphore = transferSemaphoresHolder.acquirePermit(WritePriority.NORMAL, requestContext); assertNull(acquiredSemaphore); lowAcquired--; } semaphores.forEach(Semaphore::release); - assertEquals(highPermits, transferSemaphoresHolder.getHighPriorityPermits()); + assertEquals(normalPermits, transferSemaphoresHolder.getNormalPriorityPermits()); assertEquals(lowPermits, transferSemaphoresHolder.getLowPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredNormalPriorityPermits()); + assertEquals(0, genericStatsPublisher.getAcquiredLowPriorityPermits()); } private static class TestTransferSemaphoresHolder extends TransferSemaphoresHolder { - AtomicInteger highWaitCount = new AtomicInteger(); + AtomicInteger normalWaitCount = new AtomicInteger(); AtomicInteger lowWaitCount = new AtomicInteger(); /** @@ -154,20 +172,21 @@ public TestTransferSemaphoresHolder( int availablePermits, double priorityPermitAllocation, int acquireWaitDuration, - TimeUnit timeUnit + TimeUnit timeUnit, + GenericStatsMetricPublisher genericStatsMetricPublisher ) throws InterruptedException { - super(availablePermits, priorityPermitAllocation, acquireWaitDuration, timeUnit); - TypeSemaphore executingHighSemaphore = highPrioritySemaphore; + super(availablePermits, priorityPermitAllocation, acquireWaitDuration, timeUnit, genericStatsMetricPublisher); + TypeSemaphore executingNormalSemaphore = normalPrioritySemaphore; TypeSemaphore executingLowSemaphore = lowPrioritySemaphore; - this.highPrioritySemaphore = Mockito.spy(highPrioritySemaphore); + this.normalPrioritySemaphore = Mockito.spy(normalPrioritySemaphore); this.lowPrioritySemaphore = Mockito.spy(lowPrioritySemaphore); Mockito.doAnswer(invocation -> { - highWaitCount.incrementAndGet(); + normalWaitCount.incrementAndGet(); return false; - }).when(highPrioritySemaphore).tryAcquire(Mockito.anyLong(), Mockito.any(TimeUnit.class)); - Mockito.doAnswer(invocation -> executingHighSemaphore.availablePermits()).when(highPrioritySemaphore).availablePermits(); - Mockito.doAnswer(invocation -> executingHighSemaphore.tryAcquire()).when(highPrioritySemaphore).tryAcquire(); + }).when(normalPrioritySemaphore).tryAcquire(Mockito.anyLong(), Mockito.any(TimeUnit.class)); + Mockito.doAnswer(invocation -> executingNormalSemaphore.availablePermits()).when(normalPrioritySemaphore).availablePermits(); + Mockito.doAnswer(invocation -> executingNormalSemaphore.tryAcquire()).when(normalPrioritySemaphore).tryAcquire(); Mockito.doAnswer(invocation -> { lowWaitCount.incrementAndGet(); @@ -178,58 +197,60 @@ public TestTransferSemaphoresHolder( } } - public void testHighSemaphoreAcquiredWait() throws InterruptedException { + public void testNormalSemaphoreAcquiredWait() throws InterruptedException { int availablePermits = randomIntBetween(10, 50); double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int highPermits = (int) (availablePermits * priorityAllocation); + int normalPermits = (int) (availablePermits * priorityAllocation); TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder( availablePermits, priorityAllocation, 5, - TimeUnit.MINUTES + TimeUnit.MINUTES, + new GenericStatsMetricPublisher() ); TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); TransferSemaphoresHolder.TypeSemaphore lowSemaphore = transferSemaphoresHolder.acquirePermit(WritePriority.LOW, requestContext); - assertEquals("low", lowSemaphore.getType()); - for (int i = 0; i < highPermits; i++) { + assertEquals(PermitType.LOW, lowSemaphore.getType()); + for (int i = 0; i < normalPermits; i++) { requestContext = transferSemaphoresHolder.createRequestContext(); TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.HIGH, + WritePriority.NORMAL, requestContext ); - assertEquals("high", acquiredSemaphore.getType()); + assertEquals(PermitType.NORMAL, acquiredSemaphore.getType()); } TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.HIGH, + WritePriority.NORMAL, requestContext ); assertNull(acquiredSemaphore); - assertEquals(1, transferSemaphoresHolder.highWaitCount.get()); + assertEquals(1, transferSemaphoresHolder.normalWaitCount.get()); assertEquals(0, transferSemaphoresHolder.lowWaitCount.get()); } public void testLowSemaphoreAcquiredWait() throws InterruptedException { int availablePermits = randomIntBetween(10, 50); double priorityAllocation = randomDoubleBetween(0.1, 0.9, true); - int highPermits = (int) (availablePermits * priorityAllocation); - int lowPermits = availablePermits - highPermits; + int normalPermits = (int) (availablePermits * priorityAllocation); + int lowPermits = availablePermits - normalPermits; TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder( availablePermits, priorityAllocation, 5, - TimeUnit.MINUTES + TimeUnit.MINUTES, + new GenericStatsMetricPublisher() ); TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext(); - int highPermitsEligibleForLowEvents = highPermits - (int) (highPermits * 0.4); - for (int i = 0; i < highPermitsEligibleForLowEvents; i++) { + int normalPermitsEligibleForLowEvents = normalPermits - (int) (normalPermits * 0.4); + for (int i = 0; i < normalPermitsEligibleForLowEvents; i++) { TransferSemaphoresHolder.TypeSemaphore lowSemaphore = transferSemaphoresHolder.acquirePermit( - WritePriority.HIGH, + WritePriority.NORMAL, requestContext ); - assertEquals("high", lowSemaphore.getType()); + assertEquals(PermitType.NORMAL, lowSemaphore.getType()); } for (int i = 0; i < lowPermits; i++) { @@ -238,7 +259,7 @@ public void testLowSemaphoreAcquiredWait() throws InterruptedException { WritePriority.LOW, requestContext ); - assertEquals("low", acquiredSemaphore.getType()); + assertEquals(PermitType.LOW, acquiredSemaphore.getType()); } TransferSemaphoresHolder.TypeSemaphore acquiredSemaphore = transferSemaphoresHolder.acquirePermit( @@ -247,7 +268,7 @@ public void testLowSemaphoreAcquiredWait() throws InterruptedException { ); assertNull(acquiredSemaphore); assertEquals(1, transferSemaphoresHolder.lowWaitCount.get()); - assertEquals(0, transferSemaphoresHolder.highWaitCount.get()); + assertEquals(0, transferSemaphoresHolder.normalWaitCount.get()); } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java index 0f6646d37f950..d702b2776c3e7 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java @@ -78,7 +78,8 @@ enum Metric { REQUEST_SUCCESS("request_success_total"), REQUEST_FAILURE("request_failures_total"), REQUEST_LATENCY("request_time_in_millis"), - RETRY_COUNT("request_retry_count_total"); + RETRY_COUNT("request_retry_count_total"), + GENERIC_STATS("generic_stats"); private String metricName;