diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java index ec5c1f0a029924..e42186395edac2 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java @@ -226,7 +226,6 @@ private void printResult(VisorDefragmentationTaskResult res, Logger log) { private VisorDefragmentationTaskArg convertArguments() { return new VisorDefragmentationTaskArg( convertSubcommand(args.subcommand()), - args.nodeIds(), args.cacheNames() ); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 56f976534fe84d..08c93823346405 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -452,6 +453,13 @@ public interface GridKernalContext extends Iterable { */ public GridEncryptionManager encryption(); + /** + * Gets defragmentation manager. + * + * @return Defragmentation manager. + */ + public IgniteDefragmentation defragmentation(); + /** * Gets workers registry. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index ae589adff86e3e..78d88a945ebe87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -58,6 +58,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentationImpl; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -174,6 +176,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude private GridEncryptionManager encryptionMgr; + /** */ + @GridToStringExclude + private IgniteDefragmentation defragMgr; + /** */ @GridToStringExclude private GridTracingManager tracingMgr; @@ -557,6 +563,8 @@ protected GridKernalContextImpl( marshCtx = new MarshallerContextImpl(plugins, clsFilter); + defragMgr = new IgniteDefragmentationImpl(this); + try { spring = SPRING.create(false); } @@ -906,6 +914,11 @@ public void addHelper(Object helper) { return encryptionMgr; } + /** {@inheritDoc} */ + @Override public IgniteDefragmentation defragmentation() { + return defragMgr; + } + /** {@inheritDoc} */ @Override public WorkersRegistry workersRegistry() { return workersRegistry; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java index 550b60bf0eaaef..81d84657bced37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.TransactionsMXBeanImpl; import org.apache.ignite.internal.managers.encryption.EncryptionMXBeanImpl; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMXBeanImpl; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationMXBeanImpl; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMXBeanImpl; import org.apache.ignite.internal.processors.cache.warmup.WarmUpMXBeanImpl; import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustMXBeanImpl; @@ -51,6 +52,7 @@ import org.apache.ignite.mxbean.ClusterMetricsMXBean; import org.apache.ignite.mxbean.ComputeMXBean; import org.apache.ignite.mxbean.DataStorageMXBean; +import org.apache.ignite.mxbean.DefragmentationMXBean; import org.apache.ignite.mxbean.EncryptionMXBean; import org.apache.ignite.mxbean.FailureHandlingMxBean; import org.apache.ignite.mxbean.IgniteMXBean; @@ -185,6 +187,10 @@ public void registerMBeansAfterNodeStarted( SnapshotMXBean snpMXBean = new SnapshotMXBeanImpl(ctx); registerMBean("Snapshot", snpMXBean.getClass().getSimpleName(), snpMXBean, SnapshotMXBean.class); + // Defragmentation. + DefragmentationMXBean defragMXBean = new DefragmentationMXBeanImpl(ctx); + registerMBean("Defragmentation", defragMXBean.getClass().getSimpleName(), defragMXBean, DefragmentationMXBean.class); + // Metrics configuration MetricsMxBean metricsMxBean = new MetricsMxBeanImpl(ctx.metric(), log); registerMBean("Metrics", metricsMxBean.getClass().getSimpleName(), metricsMxBean, MetricsMxBean.class); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java index 6a9875f23ba7d1..73468ca79f2885 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java @@ -19,12 +19,10 @@ import java.io.File; import java.nio.file.Path; -import java.text.DecimalFormat; -import java.text.DecimalFormatSymbols; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -46,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheType; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CheckpointState; @@ -165,7 +164,7 @@ public class CachePartitionDefragmentationManager { private final AtomicBoolean cancel = new AtomicBoolean(); /** */ - private final DefragmentationStatus status = new DefragmentationStatus(); + private final Status status = new Status(); /** */ private final GridFutureAdapter completionFut = new GridFutureAdapter<>(); @@ -232,7 +231,30 @@ public void beforeDefragmentation() throws IgniteCheckedException { /** */ public void executeDefragmentation() throws IgniteCheckedException { - status.onStart(cacheGrpCtxsForDefragmentation); + Map> oldStores = new HashMap<>(); + + for (CacheGroupContext oldGrpCtx : cacheGrpCtxsForDefragmentation) { + int grpId = oldGrpCtx.groupId(); + + final IgniteCacheOffheapManager offheap = oldGrpCtx.offheap(); + + List oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false) + .filter(store -> { + try { + return filePageStoreMgr.exists(grpId, store.partId()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }) + .collect(Collectors.toList()); + + oldStores.put(grpId, oldCacheDataStores); + } + + int partitionCount = oldStores.values().stream().mapToInt(List::size).sum(); + + status.onStart(cacheGrpCtxsForDefragmentation, partitionCount); try { // Now the actual process starts. @@ -246,8 +268,10 @@ public void executeDefragmentation() throws IgniteCheckedException { File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName()); + List oldCacheDataStores = oldStores.get(grpId); + if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) { - status.onCacheGroupSkipped(oldGrpCtx); + status.onCacheGroupSkipped(oldGrpCtx, oldCacheDataStores.size()); continue; } @@ -257,17 +281,6 @@ public void executeDefragmentation() throws IgniteCheckedException { GridSpinBusyLock busyLock = offheap.busyLock(); - List oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false) - .filter(store -> { - try { - return filePageStoreMgr.exists(grpId, store.partId()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - }) - .collect(Collectors.toList()); - status.onCacheGroupStart(oldGrpCtx, oldCacheDataStores.size()); if (workDir == null || oldCacheDataStores.isEmpty()) { @@ -619,8 +632,8 @@ private void checkCancellation() throws DefragmentationCancelledException { } /** */ - public String status() { - return status.toString(); + public Status status() { + return status; } /** */ @@ -1013,7 +1026,7 @@ private static class DefragmentationCancelledException extends RuntimeException } /** */ - private class DefragmentationStatus { + class Status { /** */ private long startTs; @@ -1021,49 +1034,78 @@ private class DefragmentationStatus { private long finishTs; /** */ - private final Set scheduledGroups = new TreeSet<>(); + private int totalPartitionCount; + + /** */ + private int defragmentedPartitionCount; /** */ - private final Map progressGroups - = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + private final Set scheduledGroups; /** */ - private final Map finishedGroups - = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + private final Map progressGroups; /** */ - private final Set skippedGroups = new TreeSet<>(); + private final Map finishedGroups; /** */ - public synchronized void onStart(Set scheduledGroups) { + private final Set skippedGroups; + + public Status() { + scheduledGroups = new TreeSet<>(); + progressGroups = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + finishedGroups = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + skippedGroups = new TreeSet<>(); + } + + public Status( + long startTs, + long finishTs, + Set scheduledGroups, + Map progressGroups, + Map finishedGroups, + Set skippedGroups + ) { + this.startTs = startTs; + this.finishTs = finishTs; + this.scheduledGroups = scheduledGroups; + this.progressGroups = progressGroups; + this.finishedGroups = finishedGroups; + this.skippedGroups = skippedGroups; + } + + /** */ + public synchronized void onStart(Set scheduledGroups, int partitions) { startTs = System.currentTimeMillis(); + totalPartitionCount = partitions; - for (CacheGroupContext grp : scheduledGroups) { + for (CacheGroupContext grp : scheduledGroups) this.scheduledGroups.add(grp.cacheOrGroupName()); - } log.info("Defragmentation started."); } /** */ - public synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) { + private synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) { scheduledGroups.remove(grpCtx.cacheOrGroupName()); progressGroups.put(grpCtx, new DefragmentationCacheGroupProgress(parts)); } /** */ - public synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { + private synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { progressGroups.get(grpCtx).onPartitionDefragmented(oldSize, newSize); + + defragmentedPartitionCount++; } /** */ - public synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { + private synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { progressGroups.get(grpCtx).onIndexDefragmented(oldSize, newSize); } /** */ - public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) { + private synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) { DefragmentationCacheGroupProgress progress = progressGroups.remove(grpCtx); progress.onFinish(); @@ -1072,61 +1114,76 @@ public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) { } /** */ - public synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx) { + private synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx, int partitions) { scheduledGroups.remove(grpCtx.cacheOrGroupName()); skippedGroups.add(grpCtx.cacheOrGroupName()); + + defragmentedPartitionCount += partitions; } /** */ - public synchronized void onFinish() { + private synchronized void onFinish() { finishTs = System.currentTimeMillis(); log.info("Defragmentation process completed. Time: " + (finishTs - startTs) * 1e-3 + "s."); } - /** {@inheritDoc} */ - @Override public synchronized String toString() { - StringBuilder sb = new StringBuilder(); - - if (!finishedGroups.isEmpty()) { - sb.append("Defragmentation is completed for cache groups:\n"); + /** */ + private synchronized Status copy() { + return new Status( + startTs, + finishTs, + new HashSet<>(scheduledGroups), + new HashMap<>(progressGroups), + new HashMap<>(finishedGroups), + new HashSet<>(skippedGroups) + ); + } - for (Map.Entry entry : finishedGroups.entrySet()) { - sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - "); + /** */ + public long getStartTs() { + return startTs; + } - sb.append(entry.getValue().toString()).append('\n'); - } - } + /** */ + public long getFinishTs() { + return finishTs; + } - if (!progressGroups.isEmpty()) { - sb.append("Defragmentation is in progress for cache groups:\n"); + /** */ + public Set getScheduledGroups() { + return scheduledGroups; + } - for (Map.Entry entry : progressGroups.entrySet()) { - sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - "); + /** */ + public Map getProgressGroups() { + return progressGroups; + } - sb.append(entry.getValue().toString()).append('\n'); - } - } + /** */ + public Map getFinishedGroups() { + return finishedGroups; + } - if (!skippedGroups.isEmpty()) - sb.append("Skipped cache groups: ").append(String.join(", ", skippedGroups)).append('\n'); + /** */ + public Set getSkippedGroups() { + return skippedGroups; + } - if (!scheduledGroups.isEmpty()) - sb.append("Awaiting defragmentation: ").append(String.join(", ", scheduledGroups)).append('\n'); + /** */ + public int getTotalPartitionCount() { + return totalPartitionCount; + } - return sb.toString(); + /** */ + public int getDefragmentedPartitionCount() { + return defragmentedPartitionCount; } } /** */ - private static class DefragmentationCacheGroupProgress { - /** */ - private static final DecimalFormat MB_FORMAT = new DecimalFormat( - "#.##", - DecimalFormatSymbols.getInstance(Locale.US) - ); - + static class DefragmentationCacheGroupProgress { /** */ private final int partsTotal; @@ -1173,43 +1230,38 @@ public void onIndexDefragmented(long oldSize, long newSize) { } /** */ - public void onFinish() { - finishTs = System.currentTimeMillis(); + public long getOldSize() { + return oldSize; } - /** {@inheritDoc} */ - @Override public String toString() { - StringBuilder sb = new StringBuilder(); - - if (finishTs == 0) { - sb.append("partitions processed/all: ").append(partsCompleted).append("/").append(partsTotal); - - sb.append(", time elapsed: "); - - appendDuration(sb, System.currentTimeMillis()); - } - else { - double mb = 1024 * 1024; - - sb.append("size before/after: ").append(MB_FORMAT.format(oldSize / mb)).append("MB/"); - sb.append(MB_FORMAT.format(newSize / mb)).append("MB"); - - sb.append(", time took: "); + /** */ + public long getNewSize() { + return newSize; + } - appendDuration(sb, finishTs); - } + /** */ + public long getStartTs() { + return startTs; + } - return sb.toString(); + /** */ + public long getFinishTs() { + return finishTs; } /** */ - private void appendDuration(StringBuilder sb, long end) { - long duration = Math.round((end - startTs) * 1e-3); + public int getPartsTotal() { + return partsTotal; + } - long mins = duration / 60; - long secs = duration % 60; + /** */ + public int getPartsCompleted() { + return partsCompleted; + } - sb.append(mins).append(" mins ").append(secs).append(" secs"); + /** */ + public void onFinish() { + finishTs = System.currentTimeMillis(); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java new file mode 100644 index 00000000000000..cfb503e616a953 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.mxbean.DefragmentationMXBean; + +/** + * Defragmentation MX bean implementation. + */ +public class DefragmentationMXBeanImpl implements DefragmentationMXBean { + /** Defragmentation manager. */ + private final IgniteDefragmentation defragmentation; + + public DefragmentationMXBeanImpl(GridKernalContext ctx) { + this.defragmentation = ctx.defragmentation(); + } + + /** {@inheritDoc} + * @return*/ + @Override public boolean schedule(String cacheNames) { + final List caches = Arrays.stream(cacheNames.split(",")) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + try { + defragmentation.schedule(caches); + + return true; + } + catch (IgniteCheckedException e) { + return false; + } + } + + /** {@inheritDoc} + * @return*/ + @Override public boolean cancel() { + try { + defragmentation.cancel(); + + return true; + } + catch (IgniteCheckedException e) { + return false; + } + } + + @Override public boolean inProgress() { + return defragmentation.inProgress(); + } + + @Override public int processedPartitions() { + return defragmentation.processedPartitions(); + } + + @Override public int totalPartitions() { + return defragmentation.totalPartitions(); + } + + @Override public long startTime() { + return defragmentation.startTime(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java new file mode 100644 index 00000000000000..a5dc811f908803 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; + +/** + * Defragmentation operation service. + */ +public interface IgniteDefragmentation { + /** + * Schedule defragmentaton on next start of the node. + * + * @param cacheNames Names of caches to run defragmentation on. + * @return Result of the scheduling. + * @throws IgniteCheckedException If failed. + */ + ScheduleResult schedule(List cacheNames) throws IgniteCheckedException; + + /** + * Cancel scheduled or ongoing defragmentation. + * @return Result of the cancellation. + * @throws IgniteCheckedException If failed. + */ + CancelResult cancel() throws IgniteCheckedException; + + /** + * Get the status of the ongoing defragmentation. + * @return Defragmentation status. + * @throws IgniteCheckedException If failed. + */ + DefragmentationStatus status() throws IgniteCheckedException; + + /** + * @return {@code true} if there is an ongoing defragmentation. + */ + boolean inProgress(); + + /** + * @return Number of processed partitions, or 0 if there is no ongoing defragmentation. + */ + int processedPartitions(); + + /** + * @return Number of total partitions, or 0 if there is no ongoing defragmentation. + */ + int totalPartitions(); + + /** + * @return Timestamp of the beginning of the ongoing defragmentation or 0 if there is none. + */ + long startTime(); + + /** Result of the scheduling. */ + public enum ScheduleResult { + /** + * Successfully scheduled. + */ + SUCCESS, + + /** + * Successfuly scheduled, superseding previously scheduled defragmentation. + */ + SUCCESS_SUPERSEDED_PREVIOUS + } + + /** Result of the cancellation. */ + public enum CancelResult { + /** + * Cancelled scheduled defragmentation. + */ + CANCELLED_SCHEDULED, + + /** + * Nothing to cancel, no ongoing defragmentation. + */ + SCHEDULED_NOT_FOUND, + + /** + * Cancelled ongoing defragmentation. + */ + CANCELLED, + + /** + * Defragmentation is already completed or cancelled. + */ + COMPLETED_OR_CANCELLED + } + + /** */ + public static class DefragmentationStatus { + /** */ + private final Map completedCaches; + + /** */ + private final Map inProgressCaches; + + /** */ + private final Set awaitingCaches; + + /** */ + private final Set skippedCaches; + + /** */ + private final int totalPartitions; + + /** */ + private final int processedPartitions; + + /** */ + private final long startTs; + + /** */ + private final long totalElapsedTime; + + public DefragmentationStatus( + Map completedCaches, + Map inProgressCaches, + Set awaitingCaches, + Set skippedCaches, + int totalPartitions, + int processedPartitions, + long startTs, + long totalElapsedTime + ) { + this.completedCaches = completedCaches; + this.inProgressCaches = inProgressCaches; + this.awaitingCaches = awaitingCaches; + this.skippedCaches = skippedCaches; + this.totalPartitions = totalPartitions; + this.processedPartitions = processedPartitions; + this.startTs = startTs; + this.totalElapsedTime = totalElapsedTime; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + if (!completedCaches.isEmpty()) { + sb.append("Defragmentation is completed for cache groups:\n"); + + for (Map.Entry entry : completedCaches.entrySet()) { + sb.append(" ").append(entry.getKey()).append(" - "); + + sb.append(entry.getValue().toString()).append('\n'); + } + } + + if (!inProgressCaches.isEmpty()) { + sb.append("Defragmentation is in progress for cache groups:\n"); + + for (Map.Entry entry : inProgressCaches.entrySet()) { + sb.append(" ").append(entry.getKey()).append(" - "); + + sb.append(entry.getValue().toString()).append('\n'); + } + } + + if (!skippedCaches.isEmpty()) + sb.append("Skipped cache groups: ").append(String.join(", ", skippedCaches)).append('\n'); + + if (!awaitingCaches.isEmpty()) + sb.append("Awaiting defragmentation: ").append(String.join(", ", awaitingCaches)).append('\n'); + + return sb.toString(); + } + + /** */ + public Map getCompletedCaches() { + return completedCaches; + } + + /** */ + public Map getInProgressCaches() { + return inProgressCaches; + } + + /** */ + public Set getAwaitingCaches() { + return awaitingCaches; + } + + /** */ + public Set getSkippedCaches() { + return skippedCaches; + } + + /** */ + public long getTotalElapsedTime() { + return totalElapsedTime; + } + + /** */ + public int getTotalPartitions() { + return totalPartitions; + } + + /** */ + public int getProcessedPartitions() { + return processedPartitions; + } + + /** */ + public long getStartTs() { + return startTs; + } + } + + /** */ + abstract class DefragmentationInfo { + /** */ + long elapsedTime; + + public DefragmentationInfo(long elapsedTime) { + this.elapsedTime = elapsedTime; + } + + /** */ + void appendDuration(StringBuilder sb, long elapsedTime) { + long duration = Math.round(elapsedTime * 1e-3); + + long mins = duration / 60; + long secs = duration % 60; + + sb.append(mins).append(" mins ").append(secs).append(" secs"); + } + + /** */ + public long getElapsedTime() { + return elapsedTime; + } + } + + /** */ + public static class CompletedDefragmentationInfo extends DefragmentationInfo { + /** */ + private static final DecimalFormat MB_FORMAT = new DecimalFormat( + "#.##", + DecimalFormatSymbols.getInstance(Locale.US) + ); + + /** */ + long sizeBefore; + + /** */ + long sizeAfter; + + public CompletedDefragmentationInfo(long elapsedTime, long sizeBefore, long sizeAfter) { + super(elapsedTime); + this.sizeBefore = sizeBefore; + this.sizeAfter = sizeAfter; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + double mb = 1024 * 1024; + + sb.append("size before/after: ").append(MB_FORMAT.format(sizeBefore / mb)).append("MB/"); + sb.append(MB_FORMAT.format(sizeAfter / mb)).append("MB"); + + sb.append(", time took: "); + + appendDuration(sb, elapsedTime); + + return sb.toString(); + } + + /** */ + public long getSizeBefore() { + return sizeBefore; + } + + /** */ + public long getSizeAfter() { + return sizeAfter; + } + } + + /** */ + public static class InProgressDefragmentationInfo extends DefragmentationInfo { + /** */ + int processedPartitions; + + /** */ + int totalPartitions; + + public InProgressDefragmentationInfo(long elapsedTime, int processedPartitions, int totalPartitions) { + super(elapsedTime); + this.processedPartitions = processedPartitions; + this.totalPartitions = totalPartitions; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("partitions processed/all: ").append(processedPartitions).append("/").append(totalPartitions); + + sb.append(", time elapsed: "); + + appendDuration(sb, elapsedTime); + + return sb.toString(); + } + + /** */ + public int getProcessedPartitions() { + return processedPartitions; + } + + /** */ + public int getTotalPartitions() { + return totalPartitions; + } + } + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java new file mode 100644 index 00000000000000..5c443baac411bc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.Status; +import org.apache.ignite.maintenance.MaintenanceAction; +import org.apache.ignite.maintenance.MaintenanceRegistry; +import org.apache.ignite.maintenance.MaintenanceTask; + +import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME; +import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore; + +/** + * Defragmentation operation service implementation. + */ +public class IgniteDefragmentationImpl implements IgniteDefragmentation { + /** Kernal context. */ + private final GridKernalContext ctx; + + public IgniteDefragmentationImpl(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public ScheduleResult schedule(List cacheNames) throws IgniteCheckedException { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + MaintenanceTask oldTask; + + try { + oldTask = maintenanceRegistry.registerMaintenanceTask(toStore(cacheNames != null ? cacheNames : Collections.emptyList())); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Scheduling failed: " + e.getMessage()); + } + + return oldTask != null ? ScheduleResult.SUCCESS_SUPERSEDED_PREVIOUS : ScheduleResult.SUCCESS; + } + + /** {@inheritDoc} */ + @Override public CancelResult cancel() throws IgniteCheckedException { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + if (!maintenanceRegistry.isMaintenanceMode()) { + boolean deleted = maintenanceRegistry.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + + return deleted ? CancelResult.CANCELLED_SCHEDULED : CancelResult.SCHEDULED_NOT_FOUND; + } + else { + List> actions; + + try { + actions = maintenanceRegistry.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + } + catch (IgniteException e) { + return CancelResult.COMPLETED_OR_CANCELLED; + } + + Optional> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny(); + + assert stopAct.isPresent(); + + try { + Object res = stopAct.get().execute(); + + assert res instanceof Boolean; + + boolean cancelled = (Boolean)res; + + return cancelled ? CancelResult.CANCELLED : CancelResult.COMPLETED_OR_CANCELLED; + } + catch (Exception e) { + throw new IgniteCheckedException("Exception occurred: " + e.getMessage(), e); + } + } + } + + /** {@inheritDoc} */ + @Override public DefragmentationStatus status() throws IgniteCheckedException { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + if (!maintenanceRegistry.isMaintenanceMode()) + throw new IgniteCheckedException("Node is not in maintenance mode."); + + IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database(); + + assert dbMgr instanceof GridCacheDatabaseSharedManager; + + CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr) + .defragmentationManager(); + + if (defrgMgr == null) + throw new IgniteCheckedException("There's no active defragmentation process on the node."); + + final Status status = defrgMgr.status(); + + final long startTs = status.getStartTs(); + final long finishTs = status.getFinishTs(); + final long elapsedTime = finishTs != 0 ? finishTs - startTs : System.currentTimeMillis() - startTs; + + Map completedCaches = new HashMap<>(); + Map progressCaches = new HashMap<>(); + + status.getFinishedGroups().forEach((context, progress) -> { + final String name = context.cacheOrGroupName(); + + final long oldSize = progress.getOldSize(); + final long newSize = progress.getNewSize(); + final long cgElapsedTime = progress.getFinishTs() - progress.getStartTs(); + + final CompletedDefragmentationInfo info = new CompletedDefragmentationInfo(cgElapsedTime, oldSize, newSize); + completedCaches.put(name, info); + }); + + status.getProgressGroups().forEach((context, progress) -> { + final String name = context.cacheOrGroupName(); + + final long cgElapsedTime = System.currentTimeMillis() - progress.getStartTs(); + final int partsTotal = progress.getPartsTotal(); + final int partsCompleted = progress.getPartsCompleted(); + + final InProgressDefragmentationInfo info = new InProgressDefragmentationInfo(cgElapsedTime, partsCompleted, partsTotal); + progressCaches.put(name, info); + }); + + return new DefragmentationStatus( + completedCaches, + progressCaches, + status.getScheduledGroups(), + status.getSkippedGroups(), + status.getTotalPartitionCount(), + status.getDefragmentedPartitionCount(), + startTs, + elapsedTime + ); + } + + /** {@inheritDoc} */ + @Override public boolean inProgress() { + final Status status = getStatus(); + + return status != null && status.getFinishTs() == 0; + } + + /** {@inheritDoc} */ + @Override public int processedPartitions() { + final Status status = getStatus(); + + if (status == null) + return 0; + + return status.getDefragmentedPartitionCount(); + } + + /** {@inheritDoc} */ + @Override public int totalPartitions() { + final CachePartitionDefragmentationManager.Status status = getStatus(); + + if (status == null) + return 0; + + return status.getTotalPartitionCount(); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + final CachePartitionDefragmentationManager.Status status = getStatus(); + + if (status == null) + return 0; + + return status.getStartTs(); + } + + /** + * Get defragmentation status. + * @return Defragmentation status or {@code null} if there is no ongoing defragmentation. + */ + private Status getStatus() { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + if (!maintenanceRegistry.isMaintenanceMode()) + return null; + + IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database(); + + assert dbMgr instanceof GridCacheDatabaseSharedManager; + + CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager) dbMgr) + .defragmentationManager(); + + if (defrgMgr == null) + return null; + + return defrgMgr.status(); + } + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index d5a9ed05f59560..e0e2092d770f5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; @@ -490,6 +491,11 @@ protected IgniteConfiguration prepareIgniteConfiguration() { return null; } + /** {@inheritDoc} */ + @Override public IgniteDefragmentation defragmentation() { + return null; + } + /** {@inheritDoc} */ @Override public WorkersRegistry workersRegistry() { return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java index 14cea626e7724c..88fde8b5af6f6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java @@ -17,27 +17,17 @@ package org.apache.ignite.internal.visor.defragmentation; -import java.util.Collections; import java.util.List; -import java.util.Optional; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.task.GridVisorManagementTask; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; -import org.apache.ignite.maintenance.MaintenanceAction; -import org.apache.ignite.maintenance.MaintenanceRegistry; -import org.apache.ignite.maintenance.MaintenanceTask; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME; -import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore; - /** */ @GridInternal @GridVisorManagementTask @@ -120,91 +110,71 @@ protected VisorDefragmentationJob(@Nullable VisorDefragmentationTaskArg arg, boo /** */ private VisorDefragmentationTaskResult runSchedule(VisorDefragmentationTaskArg arg) { - MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); + final IgniteDefragmentation defragmentation = ignite.context().defragmentation(); - MaintenanceTask oldTask; + final IgniteDefragmentation.ScheduleResult scheduleResult; try { - List cacheNames = arg.cacheNames(); - - oldTask = mntcReg.registerMaintenanceTask(toStore(cacheNames == null ? Collections.emptyList() : cacheNames)); + scheduleResult = defragmentation.schedule(arg.cacheNames()); } catch (IgniteCheckedException e) { - return new VisorDefragmentationTaskResult(false, "Scheduling failed: " + e.getMessage()); + return new VisorDefragmentationTaskResult(false, e.getMessage()); } - return new VisorDefragmentationTaskResult( - true, - "Scheduling completed successfully." + - (oldTask == null ? "" : " Previously scheduled task has been removed.") - ); + String message; + + switch (scheduleResult) { + case SUCCESS_SUPERSEDED_PREVIOUS: + message = "Scheduling completed successfully. Previously scheduled task has been removed."; + break; + case SUCCESS: + default: + message = "Scheduling completed successfully."; + break; + } + + return new VisorDefragmentationTaskResult(true, message); } /** */ private VisorDefragmentationTaskResult runStatus(VisorDefragmentationTaskArg arg) { - MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); - - if (!mntcReg.isMaintenanceMode()) - return new VisorDefragmentationTaskResult(false, "Node is not in maintenance node."); - - IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database(); + final IgniteDefragmentation defragmentation = ignite.context().defragmentation(); - assert dbMgr instanceof GridCacheDatabaseSharedManager; - - CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr) - .defragmentationManager(); - - if (defrgMgr == null) - return new VisorDefragmentationTaskResult(true, "There's no active defragmentation process on the node."); - - return new VisorDefragmentationTaskResult(true, defrgMgr.status()); + try { + return new VisorDefragmentationTaskResult(true, defragmentation.status().toString()); + } catch (IgniteCheckedException e) { + return new VisorDefragmentationTaskResult(false, e.getMessage()); + } } /** */ private VisorDefragmentationTaskResult runCancel(VisorDefragmentationTaskArg arg) { - assert arg.cacheNames() == null : "Cancelling specific caches is not yet implemented"; - - MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); - - if (!mntcReg.isMaintenanceMode()) { - boolean deleted = mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + final IgniteDefragmentation defragmentation = ignite.context().defragmentation(); - String msg = deleted - ? "Scheduled defragmentation task cancelled successfully." - : "Scheduled defragmentation task is not found."; - - return new VisorDefragmentationTaskResult(true, msg); - } - else { - List> actions; - - try { - actions = mntcReg.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); - } - catch (IgniteException e) { - return new VisorDefragmentationTaskResult(true, "Defragmentation is already completed or has been cancelled previously."); + try { + final IgniteDefragmentation.CancelResult cancelResult = defragmentation.cancel(); + + String message; + + switch (cancelResult) { + case SCHEDULED_NOT_FOUND: + message = "Scheduled defragmentation task is not found."; + break; + case CANCELLED: + message = "Defragmentation cancelled successfully."; + break; + case COMPLETED_OR_CANCELLED: + message = "Defragmentation is already completed or has been cancelled previously."; + break; + case CANCELLED_SCHEDULED: + default: + message = "Scheduled defragmentation task cancelled successfully."; + break; } - Optional> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny(); - - assert stopAct.isPresent(); - - try { - Object res = stopAct.get().execute(); - - assert res instanceof Boolean; - - boolean cancelled = (Boolean)res; - - String msg = cancelled - ? "Defragmentation cancelled successfully." - : "Defragmentation is already completed or has been cancelled previously."; - - return new VisorDefragmentationTaskResult(true, msg); - } - catch (Exception e) { - return new VisorDefragmentationTaskResult(false, "Exception occurred: " + e.getMessage()); - } + return new VisorDefragmentationTaskResult(true, message); + } catch (IgniteCheckedException e) { + return new VisorDefragmentationTaskResult(false, e.getMessage()); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java index 1b1c8b12ba0238..9e6ec53f5e48bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java @@ -33,9 +33,6 @@ public class VisorDefragmentationTaskArg extends IgniteDataTransferObject { /** */ private VisorDefragmentationOperation operation; - /** */ - private List nodeIds; - /** */ private List cacheNames; @@ -47,12 +44,10 @@ public VisorDefragmentationTaskArg() { /** */ public VisorDefragmentationTaskArg( VisorDefragmentationOperation operation, - List nodeIds, List cacheNames ) { this.operation = operation; - this.nodeIds = nodeIds; this.cacheNames = cacheNames; } @@ -61,11 +56,6 @@ public VisorDefragmentationOperation operation() { return operation; } - /** */ - public List nodeIds() { - return nodeIds; - } - /** */ public List cacheNames() { return cacheNames; @@ -75,8 +65,6 @@ public List cacheNames() { @Override protected void writeExternalData(ObjectOutput out) throws IOException { U.writeEnum(out, operation); - U.writeCollection(out, nodeIds); - U.writeCollection(out, cacheNames); } @@ -84,8 +72,6 @@ public List cacheNames() { @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { operation = U.readEnum(in, VisorDefragmentationOperation.class); - nodeIds = U.readList(in); - cacheNames = U.readList(in); } } diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java new file mode 100644 index 00000000000000..22a5e2de9c39a8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mxbean; + +/** + * JMX bean for defragmentation manager. + */ +@MXBeanDescription("MBean that provides access for defragmentation features.") +public interface DefragmentationMXBean { + /** + * Schedule defragmentation for given caches. + * + * @param cacheNames Names of caches to run defragmentation on, comma separated. + * @return {@code true} if defragmentation is scheduled, {@code false} otherwise. + */ + @MXBeanDescription("Schedule defragmentation.") + public boolean schedule(@MXBeanParameter(name = "cacheNames", description = "Names of caches to run defragmentation on.") String cacheNames); + + /** + * Cancel defragmentation. + * + * @return {@code true} if defragmentation was canceled, {@code false} otherwise. + */ + @MXBeanDescription("Cancel current defragmentation.") + public boolean cancel(); + + /** + * Get defragmentation status. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Cancel current defragmentation.") + public boolean inProgress(); + + /** + * Get count of processed partitions. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Processed partitions.") + public int processedPartitions(); + + /** + * Get total count of partitions. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Total partitions.") + public int totalPartitions(); + + /** + * Get defragmentation's start time. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Start time.") + public long startTime(); +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java new file mode 100644 index 00000000000000..f50d3dfdca6984 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.function.UnaryOperator; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.maintenance.MaintenanceTask; +import org.apache.ignite.mxbean.DefragmentationMXBean; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; + +public class DefragmentationMXBeanTest extends GridCommonAbstractTest { + /** */ + private static CountDownLatch blockCdl; + + /** */ + private static CountDownLatch waitCdl; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + final DataStorageConfiguration dsCfg = new DataStorageConfiguration(); + + dsCfg.setWalSegmentSize(512 * 1024).setWalSegments(3); + dsCfg.setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setMaxSize(50L * 1024 * 1024).setPersistenceEnabled(true) + ); + + return cfg.setDataStorageConfiguration(dsCfg); + } + + @Test + public void testDefragmentationSchedule() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().state(ACTIVE); + + DefragmentationMXBean mxBean = defragmentationMXBean(ignite.name()); + + assertTrue(mxBean.schedule("")); + + MaintenanceTask mntcTask = DefragmentationParameters.toStore(Collections.emptyList()); + + assertNotNull(grid(0).context().maintenanceRegistry().registerMaintenanceTask(mntcTask)); + assertNull(grid(1).context().maintenanceRegistry().registerMaintenanceTask(mntcTask)); + + stopGrid(0); + startGrid(0); + + // node is already in defragmentation mode, hence scheduling is not possible + assertFalse(mxBean.schedule("")); + } + + @Test + public void testDefragmentationCancel() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().state(ACTIVE); + + final DefragmentationMXBean mxBean = defragmentationMXBean(ignite.name()); + + assertTrue(mxBean.cancel()); + + // subsequent cancel call should be successful + assertTrue(mxBean.cancel()); + } + + @Test + public void testDefragmentationCancelInProgress() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().state(ClusterState.ACTIVE); + + IgniteCache cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 1024; i++) + cache.put(i, i); + + forceCheckpoint(ig); + + DefragmentationMXBean mxBean = defragmentationMXBean(ig.name()); + + mxBean.schedule(""); + + stopGrid(0); + + blockCdl = new CountDownLatch(128); + + UnaryOperator cfgOp = cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + + FileIOFactory delegate = dsCfg.getFileIOFactory(); + + dsCfg.setFileIOFactory((file, modes) -> { + if (file.getName().contains("dfrg")) { + if (blockCdl.getCount() == 0) { + try { + // Slow down defragmentation process. + // This'll be enough for the test since we have, like, 900 partitions left. + Thread.sleep(100); + } + catch (InterruptedException ignore) { + // No-op. + } + } + else + blockCdl.countDown(); + } + + return delegate.create(file, modes); + }); + + return cfg; + }; + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + try { + startGrid(0, cfgOp); + } + catch (Exception e) { + // No-op. + throw new RuntimeException(e); + } + }); + + blockCdl.await(); + + mxBean = defragmentationMXBean(ig.name()); + + assertTrue(mxBean.cancel()); + + fut.get(); + + assertTrue(mxBean.cancel()); + } + + @Test + public void testDefragmentationStatus() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().state(ClusterState.ACTIVE); + + ig.getOrCreateCache(DEFAULT_CACHE_NAME + "1"); + + IgniteCache cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME + "2"); + + ig.getOrCreateCache(DEFAULT_CACHE_NAME + "3"); + + for (int i = 0; i < 1024; i++) + cache.put(i, i); + + forceCheckpoint(ig); + + DefragmentationMXBean mxBean = defragmentationMXBean(ig.name()); + + mxBean.schedule(""); + + stopGrid(0); + + blockCdl = new CountDownLatch(128); + waitCdl = new CountDownLatch(1); + + UnaryOperator cfgOp = cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + + FileIOFactory delegate = dsCfg.getFileIOFactory(); + + dsCfg.setFileIOFactory((file, modes) -> { + if (file.getName().contains("dfrg")) { + if (blockCdl.getCount() == 0) { + try { + waitCdl.await(); + } + catch (InterruptedException ignore) { + // No-op. + } + } + else + blockCdl.countDown(); + } + + return delegate.create(file, modes); + }); + + return cfg; + }; + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + try { + startGrid(0, cfgOp); + } + catch (Exception e) { + // No-op. + throw new RuntimeException(e); + } + }); + + blockCdl.await(); + + mxBean = defragmentationMXBean(ig.name()); + + final IgniteKernal gridx = IgnitionEx.gridx(ig.name()); + final IgniteDefragmentation defragmentation = gridx.context().defragmentation(); + final IgniteDefragmentation.DefragmentationStatus status1 = defragmentation.status(); + + assertEquals(status1.getStartTs(), mxBean.startTime()); + + assertTrue(mxBean.inProgress()); + assertEquals(126, mxBean.processedPartitions()); + final int totalPartitions = status1.getTotalPartitions(); + assertEquals(totalPartitions, mxBean.totalPartitions()); + + waitCdl.countDown(); + + fut.get(); + + ((GridCacheDatabaseSharedManager) grid(0).context().cache().context().database()) + .defragmentationManager() + .completionFuture() + .get(); + + assertFalse(mxBean.inProgress()); + assertEquals(totalPartitions, mxBean.processedPartitions()); + } + + private DefragmentationMXBean defragmentationMXBean(String name) { + return getMxBean( + name, + "Defragmentation", + DefragmentationMXBeanImpl.class, + DefragmentationMXBean.class + ); + } + +}