diff --git a/docs/user_guide/04-list-API.md b/docs/user_guide/04-list-API.md index 2998621a2..70dfc4d68 100644 --- a/docs/user_guide/04-list-API.md +++ b/docs/user_guide/04-list-API.md @@ -332,6 +332,10 @@ asyncLopPipedInsertBulk(String key, int index, List valueList, Collectio - null: element 삽입하지 않는다. - attributes: 주어진 attributes를 가진 empty list item 생성 후에 element 삽입한다. +만약 삽입 요청 중 일부가 실패하면 filled map을 반환한다. 마지막 entry의 value에 +`CollectionResponse가 CANCELED인 CollectionOperationStatus`가 존재하는 경우, 이전 오류로 인해 +해당 요청 이후의 모든 요청이 취소되었다는 것을 의미한다. + 둘째, 여러 key들이 가리키는 list들에 각각 동일한 하나의 element를 삽입하는 함수이다. ```java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index fd28c31b2..910ea83cc 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntFunction; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -1862,11 +1864,16 @@ public CollectionFuture> asyncLopPip } else { PartitionedList list = new PartitionedList<>(valueList, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT); + for (List elementList : list) { insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc)); + if (index >= 0) { + index += elementList.size(); + } } } - return asyncCollectionPipedInsert(key, insertList); + + return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList)); } @Override @@ -3137,6 +3144,74 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } + /** + * Pipe insert method for collection items. + * + * @param key arcus cache key + * @param insertList must not be empty. + * @return future holding the map of element index and the reason why insert operation failed + */ + private CollectionFuture> syncCollectionPipedInsert( + final String key, final List> insertList) { + final CountDownLatch latch = new CountDownLatch(1); + final PipedCollectionFuture rv = + new PipedCollectionFuture<>(latch, operationTimeout); + + final List ops = new ArrayList<>(insertList.size()); + IntFunction makeCallback = idx -> new CollectionPipedInsertOperation.Callback() { + // each result status + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.setOperationStatus(cstatus); + } + + // complete + public void complete() { + if (idx == insertList.size() - 1 || rv.hasErrored() || rv.isCancelled()) { + latch.countDown(); + } else if (!rv.getOperationStatus().isSuccess()) { + // If this operation failed, remaining subsequent operation + // should not be added and should be marked as cancelled. + rv.addEachResult((idx + 1) * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT, + new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED)); + latch.countDown(); + } else { + // add next operation if this is not last op + Operation nextOp = ops.get(idx + 1); + rv.addOperation(nextOp); + addOp(key, nextOp); + } + } + + // got status + public void gotStatus(Integer index, OperationStatus status) { + if (status instanceof CollectionOperationStatus) { + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + (CollectionOperationStatus) status); + } else { + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(status)); + } + } + }; + + for (int i = 0; i < insertList.size(); i++) { + final CollectionPipedInsert insert = insertList.get(i); + Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i)); + ops.add(op); + } + rv.addOperation(ops.get(0)); + addOp(key, ops.get(0)); + return rv; + } + @Override public Future> asyncBopInsertBulk( List keyList, long bkey, byte[] eFlag, Object value, diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java index 4cfc67c9c..906e760b2 100644 --- a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -1,7 +1,7 @@ package net.spy.memcached.internal; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -11,13 +11,15 @@ import java.util.concurrent.atomic.AtomicReference; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; public class PipedCollectionFuture extends CollectionFuture> { - private final Collection ops = new ArrayList<>(); + // operations that are completed or in progress + private final List ops = new ArrayList<>(); private final AtomicReference operationStatus = new AtomicReference<>(null); @@ -30,67 +32,56 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) { @Override public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - rv |= op.cancel("by application."); - } - return rv; + return ops.get(ops.size() - 1).cancel("by application."); } + /** + * if previous op is cancelled, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation is cancelled. + */ @Override public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; + return operationStatus.get().getResponse() == CollectionResponse.CANCELED; + } + + /** + * if previous op threw exception, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation has errored by exception. + */ + public boolean hasErrored() { + return ops.get(ops.size() - 1).hasErrored(); } @Override public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; + return latch.getCount() == 0; } @Override public Map get(long duration, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { - long beforeAwait = System.currentTimeMillis(); - if (!latch.await(duration, unit)) { - Collection timedOutOps = new ArrayList<>(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedOutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (!timedOutOps.isEmpty()) { - // set timeout only once for piped ops requested to single node. - MemcachedConnection.opTimedOut(timedOutOps.iterator().next()); - - long elapsed = System.currentTimeMillis() - beforeAwait; - throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps); - } + Operation lastOp = ops.get(ops.size() - 1); + if (!latch.await(duration, unit) && lastOp.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(lastOp); + + long elapsed = System.currentTimeMillis() - beforeAwait; + throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp); } else { // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); + MemcachedConnection.opSucceeded(lastOp); } - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } + if (lastOp != null && lastOp.hasErrored()) { + throw new ExecutionException(lastOp.getException()); + } - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } + if (lastOp != null && lastOp.isCancelled()) { + throw new ExecutionException(new RuntimeException(lastOp.getCancelCause())); } return failedResult; diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index fddba8383..4f7f4323e 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -136,7 +136,7 @@ assert getState() == OperationState.READING \r\n END|PIPE_ERROR \r\n */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + if (line.startsWith("END")) { /* ENABLE_MIGRATION if */ if (needRedirect()) { transitionState(OperationState.REDIRECT); @@ -145,6 +145,10 @@ assert getState() == OperationState.READING /* ENABLE_MIGRATION end */ cb.receivedStatus((successAll) ? END : FAILED_END); transitionState(OperationState.COMPLETE); + } else if (line.startsWith("PIPE_ERROR ")) { + // command flow / memory flow + cb.receivedStatus(FAILED_END); + transitionState(OperationState.COMPLETE); } else if (line.startsWith("RESPONSE ")) { getLogger().debug("Got line %s", line); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java similarity index 74% rename from src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java rename to src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java index e4efa33ea..6f0cbeb2a 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/LopPipedInsertBulkMultipleValueTest.java @@ -16,6 +16,7 @@ */ package net.spy.memcached.bulkoperation; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -25,6 +26,8 @@ import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionPipedInsert; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import org.junit.jupiter.api.AfterEach; @@ -35,9 +38,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -public class LopInsertBulkMultipleValueTest extends BaseIntegrationTest { +public class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest { - private String key = "LopInsertBulkMultipleValueTest"; + private String key = "LopPipedInsertBulkMultipleValueTest"; @AfterEach @Override @@ -50,10 +53,10 @@ protected void tearDown() throws Exception { public void testInsertAndGet() { String value = "MyValue"; - int valueCount = 500; - Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; + int valueCount = 510; + List valueList = new ArrayList<>(valueCount); + for (int i = 0; i < valueCount; i++) { + valueList.add("MyValue" + i); } try { @@ -62,8 +65,7 @@ public void testInsertAndGet() { // SET Future> future = mc - .asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList), - new CollectionAttributes()); + .asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes()); try { Map errorList = future.get( 20000L, TimeUnit.MILLISECONDS); @@ -76,11 +78,11 @@ public void testInsertAndGet() { // GET int errorCount = 0; - List list = null; + List resultList = null; Future> f = mc.asyncLopGet(key, 0, valueCount, false, false); try { - list = f.get(); + resultList = f.get(); } catch (Exception e) { f.cancel(true); e.printStackTrace(); @@ -91,8 +93,8 @@ public void testInsertAndGet() { assertTrue(!list.isEmpty(), "Cached list is empty."); assertEquals(valueCount, list.size()); - for (Object o : list) { - if (!value.equals(o)) { + for (int i = 0; i < resultList.size(); i++) { + if (!resultList.get(i).equals(valueList.get(i))) { errorCount++; } } @@ -111,9 +113,7 @@ public void testInsertAndGet() { public void testErrorCount() { int valueCount = 1200; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // SET @@ -123,8 +123,11 @@ public void testErrorCount() { Map map = future.get(1000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); - + assertEquals(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + 1, map.size()); + assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT - 1).getResponse(), + CollectionResponse.NOT_FOUND); + assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT).getResponse(), + CollectionResponse.CANCELED); } catch (Exception e) { e.printStackTrace(); fail();