Skip to content

Commit

Permalink
INTERNAL: make lop piped operations process synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Oct 14, 2024
1 parent 69cfd20 commit e5edceb
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 63 deletions.
4 changes: 4 additions & 0 deletions docs/user_guide/04-list-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ asyncLopPipedInsertBulk(String key, int index, List<Object> valueList, Collectio
- null: element 삽입하지 않는다.
- attributes: 주어진 attributes를 가진 empty list item 생성 후에 element 삽입한다.

만약 삽입 요청 중 일부가 실패하면 filled map을 반환한다. 마지막 entry의 value에
`CollectionResponse가 CANCELED인 CollectionOperationStatus`가 존재하는 경우, 이전 오류로 인해
해당 요청 이후의 모든 요청이 취소되었다는 것을 의미한다.

둘째, 여러 key들이 가리키는 list들에 각각 동일한 하나의 element를 삽입하는 함수이다.

```java
Expand Down
77 changes: 76 additions & 1 deletion src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import java.util.jar.JarFile;
import java.util.jar.Manifest;

Expand Down Expand Up @@ -1864,11 +1866,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
} else {
PartitionedList<T> list = new PartitionedList<>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

for (List<T> elementList : list) {
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
if (index >= 0) {
index += elementList.size();
}
}
}
return asyncCollectionPipedInsert(key, insertList);

return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
}

@Override
Expand Down Expand Up @@ -3139,6 +3146,74 @@ public void gotStatus(Integer index, OperationStatus status) {
return rv;
}

/**
* Pipe insert method for collection items.
*
* @param key arcus cache key
* @param insertList must not be empty.
* @return future holding the map of element index and the reason why insert operation failed
*/
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {
final CountDownLatch latch = new CountDownLatch(1);
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<>(latch, operationTimeout);

final List<Operation> ops = new ArrayList<>(insertList.size());
IntFunction<OperationCallback> makeCallback = opIdx -> new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
if (opIdx == insertList.size() - 1 || rv.hasErrored() || rv.isCancelled()) {
latch.countDown();
} else if (!rv.getOperationStatus().isSuccess()) {
// If this operation failed, remaining subsequent operation
// should not be added and should be marked as cancelled.
rv.addEachResult((opIdx + 1) * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT,
new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED));
latch.countDown();
} else {
// add next operation if this is not last op
Operation nextOp = ops.get(opIdx + 1);
rv.addOperation(nextOp);
addOp(key, nextOp);
}
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i));
ops.add(op);
}
rv.addOperation(ops.get(0));
addOp(key, ops.get(0));
return rv;
}

@Override
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
List<String> keyList, long bkey, byte[] eFlag, Object value,
Expand Down
75 changes: 35 additions & 40 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.spy.memcached.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -11,13 +11,15 @@
import java.util.concurrent.atomic.AtomicReference;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final Collection<Operation> ops = new ArrayList<>();
// operations that are completed or in progress
private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);

Expand All @@ -30,67 +32,60 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
}
return rv;
return ops.get(ops.size() - 1).cancel("by application.");
}

/**
* if previous op is cancelled, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation is cancelled.
*/
@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
return operationStatus.get().getResponse() == CollectionResponse.CANCELED;
}

/**
* if previous op threw exception, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation has errored by exception.
*/
public boolean hasErrored() {
return ops.get(ops.size() - 1).hasErrored();
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
return latch.getCount() == 0;
}

@Override
public Map<K, V> get(long duration, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {

long beforeAwait = System.currentTimeMillis();
Operation lastOp;
if (!latch.await(duration, unit)) {
Collection<Operation> timedOutOps = new ArrayList<>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedOutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (!timedOutOps.isEmpty()) {
// set timeout only once for piped ops requested to single node.
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
lastOp = ops.get(ops.size() - 1);
if (lastOp.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(lastOp);

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
lastOp = ops.get(ops.size() - 1);
MemcachedConnection.opSucceeded(lastOp);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (lastOp != null && lastOp.hasErrored()) {
throw new ExecutionException(lastOp.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
if (lastOp != null && lastOp.isCancelled()) {
throw new ExecutionException(new RuntimeException(lastOp.getCancelCause()));
}

return failedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ assert getState() == OperationState.READING
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
if (line.startsWith("END")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
Expand All @@ -145,6 +145,10 @@ assert getState() == OperationState.READING
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("PIPE_ERROR ")) {
// command flow / memory flow
cb.receivedStatus(FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package net.spy.memcached.bulkoperation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -25,19 +26,22 @@

import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionPipedInsert;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {

private String key = "LopInsertBulkMultipleValueTest";
private String key = "LopPipedInsertBulkMultipleValueTest";

@AfterEach
@Override
Expand All @@ -50,10 +54,10 @@ protected void tearDown() throws Exception {
void testInsertAndGet() {
String value = "MyValue";

int valueCount = 500;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
int valueCount = 510;
List<Object> valueList = new ArrayList<>(valueCount);
for (int i = 0; i < valueCount; i++) {
valueList.add("MyValue" + i);
}

try {
Expand All @@ -62,8 +66,7 @@ void testInsertAndGet() {

// SET
Future<Map<Integer, CollectionOperationStatus>> future = mc
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
new CollectionAttributes());
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
try {
Map<Integer, CollectionOperationStatus> errorList = future.get(
20000L, TimeUnit.MILLISECONDS);
Expand All @@ -76,27 +79,27 @@ void testInsertAndGet() {

// GET
int errorCount = 0;
List<Object> list = null;
List<Object> resultList = null;
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
false);
try {
list = f.get();
resultList = f.get();
} catch (Exception e) {
f.cancel(true);
e.printStackTrace();
fail(e.getMessage());
}

assertNotNull(list, "List is null.");
assertTrue(!list.isEmpty(), "Cached list is empty.");
assertEquals(valueCount, list.size());
assertNotNull(resultList);
assertFalse(resultList.isEmpty(), "Cached list is empty.");
assertEquals(valueCount, resultList.size());

for (Object o : list) {
if (!value.equals(o)) {
for (int i = 0; i < resultList.size(); i++) {
if (!resultList.get(i).equals(valueList.get(i))) {
errorCount++;
}
}
assertEquals(valueCount, list.size());
assertEquals(valueCount, resultList.size());
assertEquals(0, errorCount);

// REMOVE
Expand All @@ -111,9 +114,7 @@ void testInsertAndGet() {
void testErrorCount() {
int valueCount = 1200;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
}
Arrays.fill(valueList, "MyValue");

try {
// SET
Expand All @@ -123,8 +124,11 @@ void testErrorCount() {

Map<Integer, CollectionOperationStatus> map = future.get(1000L,
TimeUnit.MILLISECONDS);
assertEquals(valueCount, map.size());

assertEquals(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + 1, map.size());
assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT - 1).getResponse(),
CollectionResponse.NOT_FOUND);
assertEquals(map.get(CollectionPipedInsert.MAX_PIPED_ITEM_COUNT).getResponse(),
CollectionResponse.CANCELED);
} catch (Exception e) {
e.printStackTrace();
fail();
Expand Down

0 comments on commit e5edceb

Please sign in to comment.