From 076d02b7f2ea236fbd2281bda9b7ff51d1e61f71 Mon Sep 17 00:00:00 2001 From: oliviarla Date: Mon, 19 Aug 2024 15:32:42 +0900 Subject: [PATCH] INTERNAL: make lop piped operations process synchronously --- .../java/net/spy/memcached/ArcusClient.java | 84 ++++++++++++++++++- .../internal/PipedCollectionFuture.java | 81 ++++++++---------- ... LopPipedInsertBulkMultipleValueTest.java} | 42 +++++----- 3 files changed, 140 insertions(+), 67 deletions(-) rename src/test/manual/net/spy/memcached/bulkoperation/{LopInsertBulkMultipleValueTest.java => LopPipedInsertBulkMultipleValueTest.java} (70%) diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 26ff126af..f4bf4d1db 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -1863,11 +1864,16 @@ public CollectionFuture> asyncLopPip } else { PartitionedList list = new PartitionedList<>(valueList, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); + for (List elementList : list) { insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc)); + if (index >= 0) { + index += elementList.size(); + } } } - return asyncCollectionPipedInsert(key, insertList); + + return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList)); } @Override @@ -3170,6 +3176,82 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } + /** + * Pipe insert method for collection items. + * + * @param key arcus cache key + * @param insertList must not be empty. + * @return future holding the map of element index and the reason why insert operation failed + */ + private CollectionFuture> syncCollectionPipedInsert( + final String key, final List> insertList) { + final CountDownLatch latch = new CountDownLatch(1); + final PipedCollectionFuture rv = + new PipedCollectionFuture<>(latch, operationTimeout); + + final List ops = new ArrayList<>(); + for (int i = 0; i < insertList.size(); i++) { + final CollectionPipedInsert insert = insertList.get(i); + final int idx = i; + Operation op = opFact.collectionPipedInsert(key, insert, + new CollectionPipedInsertOperation.Callback() { + // each result status + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.setOperationStatus(cstatus); + } + + // complete + public void complete() { + if (idx == insertList.size() - 1 + || rv.hasErrored() + || rv.getOperationStatus().getResponse() == CollectionResponse.CANCELED) { + // countdown if this is last op + latch.countDown(); + } else if (!rv.getOperationStatus().isSuccess()) { + // if error or cancel occurred by this operation, + // do not add all remaining operations and mark as cancelled + for (int chunkIdx = idx + 1; chunkIdx < insertList.size(); chunkIdx++) { + for (int itemIdx = 0; itemIdx < insertList.get(chunkIdx).getItemCount(); itemIdx++) { + rv.addEachResult(itemIdx + (chunkIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(new CollectionOperationStatus( + false, "CANCELED", CollectionResponse.CANCELED))); + } + } + latch.countDown(); + } else { + // add next operation if this is not last op + Operation nextOp = ops.get(idx + 1); + rv.addOperation(nextOp); + addOp(key, nextOp); + } + } + + // got status + public void gotStatus(Integer index, OperationStatus status) { + if (status instanceof CollectionOperationStatus) { + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + (CollectionOperationStatus) status); + } else { + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(status)); + } + } + }); + ops.add(op); + } + rv.addOperation(ops.get(0)); + addOp(key, ops.get(0)); + return rv; + } + @Override public Future> asyncBopInsertBulk( List keyList, long bkey, byte[] eFlag, Object value, diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java index 271f789d9..b7705c730 100644 --- a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -1,10 +1,9 @@ package net.spy.memcached.internal; -import java.util.Collection; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -18,7 +17,8 @@ public class PipedCollectionFuture extends CollectionFuture> { - private final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue<>(); + // operations that are completed or in progress + private final List ops = new ArrayList<>(); private final AtomicReference operationStatus = new AtomicReference<>(null); @@ -31,67 +31,56 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) { @Override public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - rv |= op.cancel("by application."); - } - return rv; + return ops.get(ops.size() - 1).cancel("by application."); } + /** + * if previous op is cancelled, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation is cancelled. + */ @Override public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; + return ops.get(ops.size() - 1).isCancelled(); + } + + /** + * if previous op threw exception, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation has errored by exception. + */ + public boolean hasErrored() { + return ops.get(ops.size() - 1).hasErrored(); } @Override public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; + return latch.getCount() == 0; } @Override public Map get(long duration, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { - long beforeAwait = System.currentTimeMillis(); - if (!latch.await(duration, unit)) { - Collection timedOutOps = new HashSet<>(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedOutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (!timedOutOps.isEmpty()) { - // set timeout only once for piped ops requested to single node. - MemcachedConnection.opTimedOut(timedOutOps.iterator().next()); - - long elapsed = System.currentTimeMillis() - beforeAwait; - throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps); - } + Operation lastOp = ops.get(ops.size() - 1); + if (!latch.await(duration, unit) && lastOp.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(lastOp); + + long elapsed = System.currentTimeMillis() - beforeAwait; + throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp); } else { // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); + MemcachedConnection.opSucceeded(lastOp); } - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } + if (lastOp != null && lastOp.hasErrored()) { + throw new ExecutionException(lastOp.getException()); + } - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } + if (lastOp != null && lastOp.isCancelled()) { + throw new ExecutionException(new RuntimeException(lastOp.getCancelCause())); } return failedResult; diff --git a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java similarity index 70% rename from src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java rename to src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java index eb1ffc006..2ea969446 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java @@ -16,6 +16,7 @@ */ package net.spy.memcached.bulkoperation; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -25,13 +26,14 @@ import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import org.junit.Assert; -public class LopInsertBulkMultipleValueTest extends BaseIntegrationTest { +public class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest { - private String key = "LopInsertBulkMultipleValueTest"; + private String key = "LopPipedInsertBulkMultipleValueTest"; @Override protected void tearDown() throws Exception { @@ -42,10 +44,10 @@ protected void tearDown() throws Exception { public void testInsertAndGet() { String value = "MyValue"; - int valueCount = 500; - Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; + int valueCount = 510; + List valueList = new ArrayList<>(valueCount); + for (int i = 0; i < valueCount; i++) { + valueList.add("MyValue" + i); } try { @@ -54,8 +56,7 @@ public void testInsertAndGet() { // SET Future> future = mc - .asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList), - new CollectionAttributes()); + .asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes()); try { Map errorList = future.get( 20000L, TimeUnit.MILLISECONDS); @@ -68,27 +69,27 @@ public void testInsertAndGet() { // GET int errorCount = 0; - List list = null; + List resultList = null; Future> f = mc.asyncLopGet(key, 0, valueCount, false, false); try { - list = f.get(); + resultList = f.get(); } catch (Exception e) { f.cancel(true); e.printStackTrace(); Assert.fail(e.getMessage()); } - Assert.assertNotNull("List is null.", list); - Assert.assertTrue("Cached list is empty.", !list.isEmpty()); - Assert.assertEquals(valueCount, list.size()); + Assert.assertNotNull("List is null.", resultList); + Assert.assertTrue("Cached resultList is empty.", !resultList.isEmpty()); + Assert.assertEquals(valueCount, resultList.size()); - for (Object o : list) { - if (!value.equals(o)) { + for (int i = 0; i < resultList.size(); i++) { + if (!resultList.get(i).equals(valueList.get(i))) { errorCount++; } } - Assert.assertEquals(valueCount, list.size()); + Assert.assertEquals(valueCount, resultList.size()); Assert.assertEquals(0, errorCount); // REMOVE @@ -102,9 +103,7 @@ public void testInsertAndGet() { public void testErrorCount() { int valueCount = 1200; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // SET @@ -115,7 +114,10 @@ public void testErrorCount() { Map map = future.get(1000L, TimeUnit.MILLISECONDS); assertEquals(valueCount, map.size()); - + assertEquals(map.get(mc.getMaxPipedItemCount() - 1).getResponse(), + CollectionResponse.NOT_FOUND); + assertEquals(map.get(mc.getMaxPipedItemCount()).getResponse(), + CollectionResponse.CANCELED); } catch (Exception e) { e.printStackTrace(); Assert.fail();