From 9cf9255c7f51ed1e4d92a3edaab0cb015aba4c98 Mon Sep 17 00:00:00 2001 From: oliviarla Date: Mon, 19 Aug 2024 15:32:42 +0900 Subject: [PATCH] INTERNAL: make lop piped operations process synchronously --- docs/user_guide/04-list-API.md | 4 + .../java/net/spy/memcached/ArcusClient.java | 77 ++++++++++++++++++- .../internal/PipedCollectionFuture.java | 75 +++++++++--------- .../CollectionPipedInsertOperationImpl.java | 6 +- ... LopPipedInsertBulkMultipleValueTest.java} | 46 ++++++----- 5 files changed, 145 insertions(+), 63 deletions(-) rename src/test/manual/net/spy/memcached/bulkoperation/{LopInsertBulkMultipleValueTest.java => LopPipedInsertBulkMultipleValueTest.java} (69%) diff --git a/docs/user_guide/04-list-API.md b/docs/user_guide/04-list-API.md index 80c418934..daef6c601 100644 --- a/docs/user_guide/04-list-API.md +++ b/docs/user_guide/04-list-API.md @@ -332,6 +332,10 @@ asyncLopPipedInsertBulk(String key, int index, List valueList, Collectio - null: element 삽입하지 않는다. - attributes: 주어진 attributes를 가진 empty list item 생성 후에 element 삽입한다. +만약 삽입 요청 중 일부가 실패하면 filled map을 반환한다. 마지막 entry의 value에 +`CollectionResponse가 CANCELED인 CollectionOperationStatus`가 존재하는 경우, 이전 오류로 인해 +해당 요청 이후의 모든 요청이 취소되었다는 것을 의미한다. + 둘째, 여러 key들이 가리키는 list들에 각각 동일한 하나의 element를 삽입하는 함수이다. ```java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 473db5658..834ef31b1 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntFunction; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -1852,11 +1854,16 @@ public CollectionFuture> asyncLopPip insertList.add(new ListPipedInsert<>(key, index, valueList, attributesForCreate, tc)); } else { PartitionedList list = new PartitionedList<>(valueList, MAX_PIPED_ITEM_COUNT); + for (List elementList : list) { insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc)); + if (index >= 0) { + index += elementList.size(); + } } } - return asyncCollectionPipedInsert(key, insertList); + + return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList)); } @Override @@ -3114,6 +3121,74 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } + /** + * Pipe insert method for collection items. + * + * @param key arcus cache key + * @param insertList must not be empty. + * @return future holding the map of element index and the reason why insert operation failed + */ + private CollectionFuture> syncCollectionPipedInsert( + final String key, final List> insertList) { + final CountDownLatch latch = new CountDownLatch(1); + final PipedCollectionFuture rv = + new PipedCollectionFuture<>(latch, operationTimeout); + + final List ops = new ArrayList<>(insertList.size()); + IntFunction makeCallback = opIdx -> new CollectionPipedInsertOperation.Callback() { + // each result status + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.setOperationStatus(cstatus); + } + + // complete + public void complete() { + if (opIdx == insertList.size() - 1 || rv.hasErrored() || rv.isCancelled()) { + latch.countDown(); + } else if (!rv.getOperationStatus().isSuccess()) { + // If this operation failed, remaining subsequent operation + // should not be added and should be marked as cancelled. + rv.addEachResult((opIdx + 1) * MAX_PIPED_ITEM_COUNT, + new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED)); + latch.countDown(); + } else { + // add next operation if this is not last op + Operation nextOp = ops.get(opIdx + 1); + rv.addOperation(nextOp); + addOp(key, nextOp); + } + } + + // got status + public void gotStatus(Integer index, OperationStatus status) { + if (status instanceof CollectionOperationStatus) { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + (CollectionOperationStatus) status); + } else { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(status)); + } + } + }; + + for (int i = 0; i < insertList.size(); i++) { + final CollectionPipedInsert insert = insertList.get(i); + Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i)); + ops.add(op); + } + rv.addOperation(ops.get(0)); + addOp(key, ops.get(0)); + return rv; + } + @Override public Future> asyncBopInsertBulk( List keyList, long bkey, byte[] eFlag, Object value, diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java index 4cfc67c9c..6344332a7 100644 --- a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -1,7 +1,7 @@ package net.spy.memcached.internal; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -11,13 +11,15 @@ import java.util.concurrent.atomic.AtomicReference; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; public class PipedCollectionFuture extends CollectionFuture> { - private final Collection ops = new ArrayList<>(); + // operations that are completed or in progress + private final List ops = new ArrayList<>(); private final AtomicReference operationStatus = new AtomicReference<>(null); @@ -30,67 +32,60 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) { @Override public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - rv |= op.cancel("by application."); - } - return rv; + return ops.get(ops.size() - 1).cancel("by application."); } + /** + * if previous op is cancelled, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation is cancelled. + */ @Override public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; + return operationStatus.get().getResponse() == CollectionResponse.CANCELED; + } + + /** + * if previous op threw exception, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation has errored by exception. + */ + public boolean hasErrored() { + return ops.get(ops.size() - 1).hasErrored(); } @Override public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; + return latch.getCount() == 0; } @Override public Map get(long duration, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { - long beforeAwait = System.currentTimeMillis(); + Operation lastOp; if (!latch.await(duration, unit)) { - Collection timedOutOps = new ArrayList<>(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedOutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (!timedOutOps.isEmpty()) { - // set timeout only once for piped ops requested to single node. - MemcachedConnection.opTimedOut(timedOutOps.iterator().next()); + lastOp = ops.get(ops.size() - 1); + if (lastOp.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(lastOp); long elapsed = System.currentTimeMillis() - beforeAwait; - throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps); + throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp); } } else { // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); + lastOp = ops.get(ops.size() - 1); + MemcachedConnection.opSucceeded(lastOp); } - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } + if (lastOp != null && lastOp.hasErrored()) { + throw new ExecutionException(lastOp.getException()); + } - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } + if (lastOp != null && lastOp.isCancelled()) { + throw new ExecutionException(new RuntimeException(lastOp.getCancelCause())); } return failedResult; diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index b52fd4fb3..90040e13d 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -141,7 +141,7 @@ assert getState() == OperationState.READING \r\n END|PIPE_ERROR \r\n */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + if (line.startsWith("END")) { /* ENABLE_MIGRATION if */ if (needRedirect()) { transitionState(OperationState.REDIRECT); @@ -150,6 +150,10 @@ assert getState() == OperationState.READING /* ENABLE_MIGRATION end */ cb.receivedStatus((successAll) ? END : FAILED_END); transitionState(OperationState.COMPLETE); + } else if (line.startsWith("PIPE_ERROR ")) { + // command flow / memory flow + cb.receivedStatus(FAILED_END); + transitionState(OperationState.COMPLETE); } else if (line.startsWith("RESPONSE ")) { getLogger().debug("Got line %s", line); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java similarity index 69% rename from src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java rename to src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java index 2d8a5f8f0..21273546f 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java @@ -16,6 +16,7 @@ */ package net.spy.memcached.bulkoperation; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -23,21 +24,24 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -class LopInsertBulkMultipleValueTest extends BaseIntegrationTest { +class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest { - private String key = "LopInsertBulkMultipleValueTest"; + private String key = "LopPipedInsertBulkMultipleValueTest"; @AfterEach @Override @@ -50,10 +54,10 @@ protected void tearDown() throws Exception { void testInsertAndGet() { String value = "MyValue"; - int valueCount = 500; - Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; + int valueCount = 510; + List valueList = new ArrayList<>(valueCount); + for (int i = 0; i < valueCount; i++) { + valueList.add("MyValue" + i); } try { @@ -62,8 +66,7 @@ void testInsertAndGet() { // SET Future> future = mc - .asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList), - new CollectionAttributes()); + .asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes()); try { Map errorList = future.get( 20000L, TimeUnit.MILLISECONDS); @@ -76,27 +79,27 @@ void testInsertAndGet() { // GET int errorCount = 0; - List list = null; + List resultList = null; Future> f = mc.asyncLopGet(key, 0, valueCount, false, false); try { - list = f.get(); + resultList = f.get(); } catch (Exception e) { f.cancel(true); e.printStackTrace(); fail(e.getMessage()); } - assertNotNull(list, "List is null."); - assertTrue(!list.isEmpty(), "Cached list is empty."); - assertEquals(valueCount, list.size()); + assertNotNull(resultList); + assertFalse(resultList.isEmpty(), "Cached list is empty."); + assertEquals(valueCount, resultList.size()); - for (Object o : list) { - if (!value.equals(o)) { + for (int i = 0; i < resultList.size(); i++) { + if (!resultList.get(i).equals(valueList.get(i))) { errorCount++; } } - assertEquals(valueCount, list.size()); + assertEquals(valueCount, resultList.size()); assertEquals(0, errorCount); // REMOVE @@ -111,9 +114,7 @@ void testInsertAndGet() { void testErrorCount() { int valueCount = 1200; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // SET @@ -123,8 +124,11 @@ void testErrorCount() { Map map = future.get(1000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); - + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(), + CollectionResponse.NOT_FOUND); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(), + CollectionResponse.CANCELED); } catch (Exception e) { e.printStackTrace(); fail();