From 0f9d6b22ec45b9238bec268161eadeb3647ddb49 Mon Sep 17 00:00:00 2001 From: Donal Evans Date: Fri, 23 Oct 2020 12:05:42 -0700 Subject: [PATCH] GEODE-8536: Allow limited retries when creating Lucene IndexWriter (#5659) Authored-by: Donal Evans (cherry picked from commit 872718ec9d119e332c328caf4493bdf8e8a83dcf) --- ...IndexRepositoryFactoryDistributedTest.java | 2 - ...IndexRepositoryFactoryIntegrationTest.java | 123 ++++++++++++++++++ .../internal/IndexRepositoryFactory.java | 41 ++++-- .../internal/IndexRepositoryFactoryTest.java | 45 +++++-- 4 files changed, 193 insertions(+), 18 deletions(-) create mode 100644 geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java diff --git a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java index a3f245dcfb82..bfe921feebb1 100644 --- a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java +++ b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; -import junitparams.Parameters; import org.apache.commons.lang3.RandomStringUtils; import org.awaitility.core.ConditionTimeoutException; import org.junit.Before; @@ -111,7 +110,6 @@ private BucketRegion getFileAndChunkBucket() { } @Test - @Parameters() public void lockedBucketShouldPreventPrimaryFromMoving() { dataStore1.invoke(this::initDataStoreAndLuceneIndex); dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache())); diff --git a/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java b/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java new file mode 100644 index 000000000000..5e79bcfbd2f8 --- /dev/null +++ b/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.lucene.internal; + +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.geode.InternalGemFireError; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.execute.FunctionException; +import org.apache.geode.cache.lucene.LuceneQuery; +import org.apache.geode.cache.lucene.LuceneQueryException; +import org.apache.geode.cache.lucene.LuceneServiceProvider; +import org.apache.geode.internal.cache.InternalCache; + +public class IndexRepositoryFactoryIntegrationTest { + private Cache cache; + public static final String INDEX_NAME = "testIndex"; + public static final String REGION_NAME = "testRegion"; + public static final int NUMBER_OF_BUCKETS = 4; + private IndexRepositoryFactory spyFactory; + private LuceneQuery luceneQuery; + + @Before + public void setUp() { + cache = new CacheFactory().create(); + String fieldName = "field1"; + LuceneServiceProvider.get(cache) + .createIndexFactory() + .setFields(fieldName) + .create(INDEX_NAME, REGION_NAME); + + cache.createRegionFactory(RegionShortcut.PARTITION) + .setPartitionAttributes(new PartitionAttributesFactory<>() + .setTotalNumBuckets(NUMBER_OF_BUCKETS) + .create()) + .create(REGION_NAME); + + spyFactory = spy(new IndexRepositoryFactory()); + PartitionedRepositoryManager.indexRepositoryFactory = spyFactory; + + luceneQuery = LuceneServiceProvider.get(cache) + .createLuceneQueryFactory() + .create(INDEX_NAME, REGION_NAME, "hello", fieldName); + } + + @After + public void tearDown() { + ExecutorService lonerDistributionThreads = + ((InternalCache) cache).getDistributionManager().getExecutors().getThreadPool(); + PartitionedRepositoryManager.indexRepositoryFactory = new IndexRepositoryFactory(); + if (cache != null) { + cache.close(); + } + // Wait until the thread pool that uses the modified IndexRepositoryFactory behaviour has + // terminated before allowing further tests, to prevent mocking exceptions + await().until(lonerDistributionThreads::isTerminated); + } + + @Test + public void shouldRetryWhenIOExceptionEncounteredOnceDuringComputingRepository() + throws IOException, LuceneQueryException { + // To ensure that the specific bucket used in the query throws the IOException to trigger the + // retry, throw once for every bucket in the region + + doAnswer(new Answer() { + private AtomicInteger times = new AtomicInteger(0); + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (times.getAndIncrement() < NUMBER_OF_BUCKETS) { + throw new IOException(); + } + return invocation.callRealMethod(); + } + }).when(spyFactory).getIndexWriter(any(), any()); + + luceneQuery.findKeys(); + + // The invocation should throw once for each bucket, then retry once for each bucket + verify(spyFactory, times(NUMBER_OF_BUCKETS * 2)).getIndexWriter(any(), any()); + } + + @Test + public void shouldThrowInternalGemfireErrorWhenIOExceptionEncounteredConsistentlyDuringComputingRepository() + throws IOException { + doThrow(new IOException()).when(spyFactory).getIndexWriter(any(), any()); + + assertThatThrownBy(luceneQuery::findKeys).isInstanceOf(FunctionException.class) + .hasCauseInstanceOf(InternalGemFireError.class); + } +} diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java index 7674a454527d..7db8b96f204b 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java @@ -44,6 +44,7 @@ public class IndexRepositoryFactory { private static final Logger logger = LogService.getLogger(); public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:"; public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE"; + protected static final int GET_INDEX_WRITER_MAX_ATTEMPTS = 200; public IndexRepositoryFactory() {} @@ -74,7 +75,8 @@ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSeri * This is a util function just to not let computeIndexRepository be a huge chunk of code. */ protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer, - PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index) { + PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index) + throws IOException { LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index; final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion(); BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId); @@ -129,7 +131,7 @@ protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSeri } catch (IOException e) { logger.warn("Exception thrown while constructing Lucene Index for bucket:" + bucketId + " for file region:" + fileAndChunkBucket.getFullPath(), e); - return null; + throw e; } catch (CacheClosedException e) { logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:" + bucketId + " for file region:" + fileAndChunkBucket.getFullPath()); @@ -144,11 +146,34 @@ protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSeri protected IndexWriter buildIndexWriter(int bucketId, BucketRegion fileAndChunkBucket, LuceneIndexForPartitionedRegion indexForPR) throws IOException { - // bucketTargetingMap handles partition resolver (via bucketId as callbackArg) - Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId); - RegionDirectory dir = new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats()); - IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer()); + int attempts = 0; + // IOExceptions can occur if the fileAndChunk region is being modified while the IndexWriter is + // being initialized, so allow limited retries here to account for that timing window + while (true) { + // bucketTargetingMap handles partition resolver (via bucketId as callbackArg) + Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId); + RegionDirectory dir = + new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats()); + IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer()); + try { + attempts++; + return getIndexWriter(dir, config); + } catch (IOException e) { + if (attempts >= GET_INDEX_WRITER_MAX_ATTEMPTS) { + throw e; + } + logger.info("Encountered {} while attempting to get IndexWriter for index {}. Retrying...", + e, indexForPR.getName()); + try { + Thread.sleep(5); + } catch (InterruptedException ignore) { + } + } + } + } + protected IndexWriter getIndexWriter(RegionDirectory dir, IndexWriterConfig config) + throws IOException { return new IndexWriter(dir, config); } @@ -186,8 +211,8 @@ private Object getValue(Region.Entry entry) { return value; } - protected Map getBucketTargetingMap(BucketRegion region, int bucketId) { - return new BucketTargetingMap(region, bucketId); + protected Map getBucketTargetingMap(BucketRegion region, int bucketId) { + return new BucketTargetingMap<>(region, bucketId); } protected String getLockName(final BucketRegion fileAndChunkBucket) { diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java index e301dcfa72a9..38e63551857d 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.cache.lucene.internal; +import static org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.GET_INDEX_WRITER_MAX_ATTEMPTS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -22,11 +23,13 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import org.apache.lucene.index.IndexWriter; import org.junit.Before; import org.junit.Test; @@ -77,7 +80,8 @@ public void setUp() { } @Test - public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull() { + public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull() + throws IOException { doReturn(null).when(indexRepositoryFactory).getMatchingBucket(fileRegion, bucketId); IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0, @@ -87,7 +91,8 @@ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFi } @Test - public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary() { + public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary() + throws IOException { when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(false); IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0, @@ -97,7 +102,8 @@ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFi } @Test - public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed() { + public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed() + throws IOException { when(oldRepository.isClosed()).thenReturn(false); when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true); @@ -108,7 +114,8 @@ public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotC } @Test - public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary() { + public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary() + throws IOException { when(oldRepository.isClosed()).thenReturn(true); when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true).thenReturn(false); when(distributedLockService.lock(any(), anyLong(), anyLong())).thenReturn(false); @@ -119,7 +126,7 @@ public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAnd } @Test - public void finishComputingRepositoryShouldReturnNullAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex() + public void finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex() throws IOException { when(oldRepository.isClosed()).thenReturn(true); when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true); @@ -127,9 +134,8 @@ public void finishComputingRepositoryShouldReturnNullAndReleaseLockWhenIOExcepti doThrow(new IOException("Test Exception")).when(indexRepositoryFactory) .buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex); - IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0, - serializer, userRegion, oldRepository, luceneIndex); - assertThat(indexRepository).isNull(); + assertThatThrownBy(() -> indexRepositoryFactory.finishComputingRepository(0, + serializer, userRegion, oldRepository, luceneIndex)).isInstanceOf(IOException.class); verify(distributedLockService).unlock(any()); } @@ -146,4 +152,27 @@ public void finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenCache userRegion, oldRepository, luceneIndex)).isInstanceOf(CacheClosedException.class); verify(distributedLockService).unlock(any()); } + + @Test + public void buildIndexWriterRetriesCreatingIndexWriterWhenIOExceptionEncountered() + throws IOException { + IndexWriter writer = mock(IndexWriter.class); + doThrow(new IOException()).doReturn(writer).when(indexRepositoryFactory).getIndexWriter(any(), + any()); + assertThat(indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex)) + .isEqualTo(writer); + verify(indexRepositoryFactory, times(2)).getIndexWriter(any(), any()); + } + + @Test + public void buildIndexWriterThrowsExceptionWhenIOExceptionConsistentlyEncountered() + throws IOException { + IOException testException = new IOException("Test exception"); + doThrow(testException).when(indexRepositoryFactory).getIndexWriter(any(), any()); + assertThatThrownBy( + () -> indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex)) + .isEqualTo(testException); + verify(indexRepositoryFactory, times(GET_INDEX_WRITER_MAX_ATTEMPTS)).getIndexWriter(any(), + any()); + } }