Skip to content

Commit

Permalink
Converted generic repository stats from absolute to percent utilizati…
Browse files Browse the repository at this point in the history
…on in repository-s3

Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 committed May 11, 2024
1 parent 58d730b commit 8c54c68
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ protected S3Repository createRepository(
null,
false,
null,
null,
null
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ public class GenericStatsMetricPublisher {
private final AtomicInteger normalPriorityPermits = new AtomicInteger();
private final AtomicLong lowPriorityQSize = new AtomicLong();
private final AtomicInteger lowPriorityPermits = new AtomicInteger();
private final long normalPriorityQCapacity;
private final int maxNormalPriorityPermits;
private final long lowPriorityQCapacity;
private final int maxLowPriorityPermits;

public GenericStatsMetricPublisher(
long normalPriorityQCapacity,
int maxNormalPriorityPermits,
long lowPriorityQCapacity,
int maxLowPriorityPermits
) {
this.normalPriorityQCapacity = normalPriorityQCapacity;
this.maxNormalPriorityPermits = maxNormalPriorityPermits;
this.lowPriorityQCapacity = lowPriorityQCapacity;
this.maxLowPriorityPermits = maxLowPriorityPermits;
}

public void updateNormalPriorityQSize(long qSize) {
normalPriorityQSize.addAndGet(qSize);
Expand Down Expand Up @@ -65,10 +81,10 @@ public int getAcquiredLowPriorityPermits() {

Map<String, Long> stats() {
final Map<String, Long> results = new HashMap<>();
results.put("NormalPriorityQSize", normalPriorityQSize.get());
results.put("LowPriorityQSize", lowPriorityQSize.get());
results.put("AcquiredNormalPriorityPermits", (long) normalPriorityPermits.get());
results.put("AcquiredLowPriorityPermits", (long) lowPriorityPermits.get());
results.put("NormalPriorityQUtilization", (normalPriorityQSize.get() * 100) / normalPriorityQCapacity);
results.put("LowPriorityQUtilization", (lowPriorityQSize.get() * 100) / lowPriorityQCapacity);
results.put("NormalPriorityPermitsUtilization", (normalPriorityPermits.get() * 100L) / maxNormalPriorityPermits);
results.put("LowPriorityPermitsUtilization", (lowPriorityPermits.get() * 100L) / maxLowPriorityPermits);
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ class S3Repository extends MeteredBlobStoreRepository {
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled,
final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
final GenericStatsMetricPublisher genericStatsMetricPublisher
) {
this(
metadata,
Expand All @@ -331,7 +332,7 @@ class S3Repository extends MeteredBlobStoreRepository {
Path.of(""),
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ,
new GenericStatsMetricPublisher()
genericStatsMetricPublisher
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private TransferSemaphoresHolder transferSemaphoresHolder;
private GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher();
private GenericStatsMetricPublisher genericStatsMetricPublisher;

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down Expand Up @@ -231,28 +231,48 @@ public Collection<Object> createComponents(
threadPool.executor(STREAM_READER),
new AsyncTransferEventLoopGroup(normalEventLoopThreads)
);

this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER);
this.normalTransferQConsumerService = threadPool.executor(NORMAL_TRANSFER_QUEUE_CONSUMER);

// High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO.
int availablePermits = Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10);
double priorityPermitAllocation = ((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings()))
/ 100;
int normalPriorityPermits = (int) (priorityPermitAllocation * availablePermits);
int lowPriorityPermits = availablePermits - normalPriorityPermits;

int normalPriorityConsumers = normalPriorityTransferQConsumers(clusterService.getSettings());
int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings());

ByteSizeValue normalPriorityQCapacity = new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB);
ByteSizeValue lowPriorityQCapacity = new ByteSizeValue(lowPriorityConsumers * 20L, ByteSizeUnit.GB);

this.genericStatsMetricPublisher = new GenericStatsMetricPublisher(
normalPriorityQCapacity.getBytes(),
normalPriorityPermits,
lowPriorityQCapacity.getBytes(),
lowPriorityPermits
);

this.normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(normalPriorityConsumers * 10L, ByteSizeUnit.GB),
normalPriorityQCapacity,
normalTransferQConsumerService,
normalPriorityConsumers,
genericStatsMetricPublisher,
SizeBasedBlockingQ.QueueEventType.NORMAL
);
int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings());

LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ(
new ByteSizeValue(lowPriorityConsumers * 20L, ByteSizeUnit.GB),
lowPriorityQCapacity,
lowTransferQConsumerService,
lowPriorityConsumers,
genericStatsMetricPublisher
);
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.transferSemaphoresHolder = new TransferSemaphoresHolder(
// High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO.
Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10),
((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) / 100,
normalPriorityPermits,
lowPriorityPermits,
S3Repository.S3_PERMIT_WAIT_DURATION_MIN.get(clusterService.getSettings()),
TimeUnit.MINUTES,
genericStatsMetricPublisher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ public class TransferSemaphoresHolder {
* Constructor to create semaphores holder.
*/
public TransferSemaphoresHolder(
int availablePermits,
double priorityPermitAllocation,
int normalPriorityPermits,
int lowPriorityPermits,
int acquireWaitDuration,
TimeUnit timeUnit,
GenericStatsMetricPublisher genericStatsPublisher
) {

this.normalPriorityPermits = (int) (priorityPermitAllocation * availablePermits);
this.normalPriorityPermits = normalPriorityPermits;
this.lowPriorityPermits = lowPriorityPermits;
this.normalPrioritySemaphore = new TypeSemaphore(
normalPriorityPermits,
TypeSemaphore.PermitType.NORMAL,
genericStatsPublisher::updateNormalPermits
);
this.lowPriorityPermits = availablePermits - normalPriorityPermits;
this.lowPrioritySemaphore = new TypeSemaphore(
lowPriorityPermits,
TypeSemaphore.PermitType.LOW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ protected S3Repository createRepository(
null,
false,
null,
null,
null
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public void setUp() throws Exception {
transferQueueConsumerService = Executors.newFixedThreadPool(20);
scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1);
transferNIOGroup = new AsyncTransferEventLoopGroup(1);
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher();
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 10L, ByteSizeUnit.GB),
transferQueueConsumerService,
Expand Down Expand Up @@ -431,7 +431,7 @@ private S3BlobStore createBlobStore() {
streamReaderService,
transferNIOGroup
);

GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
return new S3BlobStore(
null,
asyncService,
Expand All @@ -453,15 +453,15 @@ private S3BlobStore createBlobStore() {
Math.max(Runtime.getRuntime().availableProcessors() * 5, 10),
5,
TimeUnit.MINUTES,
new GenericStatsMetricPublisher()
genericStatsMetricPublisher
)
),
asyncExecutorContainer,
asyncExecutorContainer,
asyncExecutorContainer,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ,
new GenericStatsMetricPublisher()
genericStatsMetricPublisher
);
}

Expand Down Expand Up @@ -637,7 +637,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
int numberOfParts = 20;
final ByteSizeValue partSize = new ByteSizeValue(capacity.getBytes() / numberOfParts + 1, ByteSizeUnit.BYTES);

GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher();
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ(
capacity,
transferQueueConsumerService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void setUp() throws Exception {
remoteTransferRetry = Executors.newFixedThreadPool(20);
transferQueueConsumerService = Executors.newFixedThreadPool(2);
scheduler = new ScheduledThreadPoolExecutor(1);
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher();
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
normalPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ(
new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB),
transferQueueConsumerService,
Expand Down Expand Up @@ -238,7 +238,7 @@ protected AsyncMultiStreamBlobContainer createBlobContainer(
streamReaderService,
transferNIOGroup
);

GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
return new S3BlobContainer(
BlobPath.cleanPath(),
new S3BlobStore(
Expand All @@ -262,15 +262,15 @@ protected AsyncMultiStreamBlobContainer createBlobContainer(
Math.max(Runtime.getRuntime().availableProcessors() * 5, 10),
5,
TimeUnit.MINUTES,
new GenericStatsMetricPublisher()
genericStatsMetricPublisher
)
),
asyncExecutorContainer,
asyncExecutorContainer,
asyncExecutorContainer,
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ,
new GenericStatsMetricPublisher()
genericStatsMetricPublisher
)
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) {
null,
null,
null,
new GenericStatsMetricPublisher()
null
) {
@Override
protected void assertSnapshotOrGenericThread() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class AsyncTransferManagerTests extends OpenSearchTestCase {
@Before
public void setUp() throws Exception {
s3AsyncClient = mock(S3AsyncClient.class);
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
asyncTransferManager = new AsyncTransferManager(
ByteSizeUnit.MB.toBytes(5),
Executors.newSingleThreadExecutor(),
Expand All @@ -75,7 +76,7 @@ public void setUp() throws Exception {
Math.max(Runtime.getRuntime().availableProcessors() * 5, 10),
5,
TimeUnit.MINUTES,
new GenericStatsMetricPublisher()
genericStatsMetricPublisher
)
);
super.setUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void tearDown() throws Exception {
}

public void testProducerConsumerOfBulkItems() throws InterruptedException {
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher();
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
SizeBasedBlockingQ.QueueEventType queueEventType = randomBoolean()
? SizeBasedBlockingQ.QueueEventType.NORMAL
: SizeBasedBlockingQ.QueueEventType.LOW;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public void testAllocation() {
double priorityAllocation = randomDoubleBetween(0.1, 0.9, true);
int normalPermits = (int) (availablePermits * priorityAllocation);
int lowPermits = availablePermits - normalPermits;
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher();
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder(
availablePermits,
priorityAllocation,
normalPermits,
lowPermits,
1,
TimeUnit.NANOSECONDS,
genericStatsPublisher
Expand All @@ -48,10 +48,10 @@ public void testLowPriorityEventPermitAcquisition() throws InterruptedException
double priorityAllocation = randomDoubleBetween(0.1, 0.9, true);
int normalPermits = (int) (availablePermits * priorityAllocation);
int lowPermits = availablePermits - normalPermits;
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher();
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder(
availablePermits,
priorityAllocation,
normalPermits,
lowPermits,
1,
TimeUnit.NANOSECONDS,
genericStatsPublisher
Expand Down Expand Up @@ -106,10 +106,10 @@ public void testNormalPermitEventAcquisition() throws InterruptedException {
double priorityAllocation = randomDoubleBetween(0.1, 0.9, true);
int normalPermits = (int) (availablePermits * priorityAllocation);
int lowPermits = availablePermits - normalPermits;
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher();
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
TransferSemaphoresHolder transferSemaphoresHolder = new TransferSemaphoresHolder(
availablePermits,
priorityAllocation,
normalPermits,
lowPermits,
1,
TimeUnit.NANOSECONDS,
genericStatsPublisher
Expand Down Expand Up @@ -169,13 +169,13 @@ private static class TestTransferSemaphoresHolder extends TransferSemaphoresHold
* Constructor to create semaphores holder.
*/
public TestTransferSemaphoresHolder(
int availablePermits,
double priorityPermitAllocation,
int normalPermits,
int lowPermits,
int acquireWaitDuration,
TimeUnit timeUnit,
GenericStatsMetricPublisher genericStatsMetricPublisher
) throws InterruptedException {
super(availablePermits, priorityPermitAllocation, acquireWaitDuration, timeUnit, genericStatsMetricPublisher);
super(normalPermits, lowPermits, acquireWaitDuration, timeUnit, genericStatsMetricPublisher);
TypeSemaphore executingNormalSemaphore = normalPrioritySemaphore;
TypeSemaphore executingLowSemaphore = lowPrioritySemaphore;

Expand All @@ -201,12 +201,13 @@ public void testNormalSemaphoreAcquiredWait() throws InterruptedException {
int availablePermits = randomIntBetween(10, 50);
double priorityAllocation = randomDoubleBetween(0.1, 0.9, true);
int normalPermits = (int) (availablePermits * priorityAllocation);
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder(
availablePermits,
priorityAllocation,
normalPermits,
availablePermits - normalPermits,
5,
TimeUnit.MINUTES,
new GenericStatsMetricPublisher()
genericStatsPublisher
);

TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext();
Expand Down Expand Up @@ -235,12 +236,13 @@ public void testLowSemaphoreAcquiredWait() throws InterruptedException {
double priorityAllocation = randomDoubleBetween(0.1, 0.9, true);
int normalPermits = (int) (availablePermits * priorityAllocation);
int lowPermits = availablePermits - normalPermits;
GenericStatsMetricPublisher genericStatsPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);
TestTransferSemaphoresHolder transferSemaphoresHolder = new TestTransferSemaphoresHolder(
availablePermits,
priorityAllocation,
normalPermits,
lowPermits,
5,
TimeUnit.MINUTES,
new GenericStatsMetricPublisher()
genericStatsPublisher
);

TransferSemaphoresHolder.RequestContext requestContext = transferSemaphoresHolder.createRequestContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ default void reload(RepositoryMetadata repositoryMetadata) {}
* Metrics for BlobStore interactions
*/
enum Metric {
GENERIC_STATS("generic_stats"),
REQUEST_SUCCESS("request_success_total"),
REQUEST_FAILURE("request_failures_total"),
REQUEST_LATENCY("request_time_in_millis"),
RETRY_COUNT("request_retry_count_total"),
GENERIC_STATS("generic_stats");
RETRY_COUNT("request_retry_count_total");

private String metricName;

Expand Down

0 comments on commit 8c54c68

Please sign in to comment.