Skip to content

Commit

Permalink
INTERNAL: make lop piped operations process synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Aug 22, 2024
1 parent e99869f commit 9eb5496
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 66 deletions.
82 changes: 81 additions & 1 deletion src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1863,11 +1864,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
} else {
PartitionedList<T> list = new PartitionedList<>(valueList,
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);

for (List<T> elementList : list) {
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
if (index >= 0) {
index += elementList.size();
}
}
}
return asyncCollectionPipedInsert(key, insertList);

return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
}

@Override
Expand Down Expand Up @@ -3170,6 +3176,80 @@ public void gotStatus(Integer index, OperationStatus status) {
return rv;
}

/**
* Pipe insert method for collection items.
*
* @param key arcus cache key
* @param insertList must not be empty.
* @return future holding the map of element index and the reason why insert operation failed
*/
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {
final CountDownLatch latch = new CountDownLatch(1);
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<>(latch, operationTimeout);

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
final int idx = i;
Operation op = opFact.collectionPipedInsert(key, insert,
new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
if (idx == insertList.size() - 1
|| rv.hasErrored()
|| rv.getOperationStatus().getResponse() == CollectionResponse.CANCELED) {
// countdown if this is last op
latch.countDown();
} else if (!rv.getOperationStatus().isSuccess()) {
// if error or cancel occurred by this operation,
// do not add all remaining operations and mark as cancelled
for (int chunkIdx = idx + 1; chunkIdx < insertList.size(); chunkIdx++) {
for (int itemIdx = 0; itemIdx < insertList.get(chunkIdx).getItemCount(); itemIdx++) {
rv.addEachResult(itemIdx + (chunkIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(new CollectionOperationStatus(
false, "CANCELED", CollectionResponse.CANCELED)));
}
}
latch.countDown();
} else {
// add next operation if this is not last op
rv.incrCurrentOpIdx();
Operation nextOp = rv.getOp(idx + 1);
addOp(key, nextOp);
}
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
rv.addOperation(op);
}
addOp(key, rv.getOp(0));
return rv;
}

@Override
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
List<String> keyList, long bkey, byte[] eFlag, Object value,
Expand Down
90 changes: 45 additions & 45 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package net.spy.memcached.internal;

import java.util.Collection;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import net.spy.memcached.MemcachedConnection;
Expand All @@ -18,9 +18,10 @@

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<>();
private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);
private final AtomicInteger currentOpIdx = new AtomicInteger(0);

private final Map<K, V> failedResult =
new ConcurrentHashMap<>();
Expand All @@ -31,67 +32,58 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
}
return rv;
return ops.get(currentOpIdx.get()).cancel("by application.");
}

/**
* if previous op is cancelled, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation is cancelled.
*/
@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
return ops.get(currentOpIdx.get()).isCancelled();
}

/**
* if previous op threw exception, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation has errored by exception.
*/
public boolean hasErrored() {
return ops.get(currentOpIdx.get()).hasErrored();
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
return latch.getCount() == 0;
}

@Override
public Map<K, V> get(long duration, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {

long beforeAwait = System.currentTimeMillis();
if (!latch.await(duration, unit)) {
Collection<Operation> timedOutOps = new HashSet<>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedOutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (!timedOutOps.isEmpty()) {
// set timeout only once for piped ops requested to single node.
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
}
Operation lastOp = ops.get(ops.size() - 1);
Operation currentOp = ops.get(currentOpIdx.get());
if (!latch.await(duration, unit) && lastOp.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(lastOp);

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, currentOp);
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
MemcachedConnection.opSucceeded(lastOp);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (currentOp != null && currentOp.hasErrored()) {
throw new ExecutionException(currentOp.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
if (currentOp != null && currentOp.isCancelled()) {
throw new ExecutionException(new RuntimeException(currentOp.getCancelCause()));
}

return failedResult;
Expand Down Expand Up @@ -120,4 +112,12 @@ public void addEachResult(K index, V status) {
public void addOperation(Operation op) {
ops.add(op);
}

public Operation getOp(int index) {
return this.ops.get(index);
}

public void incrCurrentOpIdx() {
this.currentOpIdx.incrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package net.spy.memcached.bulkoperation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -25,13 +26,14 @@

import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;

import org.junit.Assert;

public class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
public class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {

private String key = "LopInsertBulkMultipleValueTest";
private String key = "LopPipedInsertBulkMultipleValueTest";

@Override
protected void tearDown() throws Exception {
Expand All @@ -42,10 +44,10 @@ protected void tearDown() throws Exception {
public void testInsertAndGet() {
String value = "MyValue";

int valueCount = 500;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
int valueCount = 510;
List<Object> valueList = new ArrayList<>(valueCount);
for (int i = 0; i < valueCount; i++) {
valueList.add("MyValue" + i);
}

try {
Expand All @@ -54,8 +56,7 @@ public void testInsertAndGet() {

// SET
Future<Map<Integer, CollectionOperationStatus>> future = mc
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
new CollectionAttributes());
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
try {
Map<Integer, CollectionOperationStatus> errorList = future.get(
20000L, TimeUnit.MILLISECONDS);
Expand All @@ -68,27 +69,27 @@ public void testInsertAndGet() {

// GET
int errorCount = 0;
List<Object> list = null;
List<Object> resultList = null;
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
false);
try {
list = f.get();
resultList = f.get();
} catch (Exception e) {
f.cancel(true);
e.printStackTrace();
Assert.fail(e.getMessage());
}

Assert.assertNotNull("List is null.", list);
Assert.assertTrue("Cached list is empty.", !list.isEmpty());
Assert.assertEquals(valueCount, list.size());
Assert.assertNotNull("List is null.", resultList);
Assert.assertTrue("Cached resultList is empty.", !resultList.isEmpty());
Assert.assertEquals(valueCount, resultList.size());

for (Object o : list) {
if (!value.equals(o)) {
for (int i = 0; i < resultList.size(); i++) {
if (!resultList.get(i).equals(valueList.get(i))) {
errorCount++;
}
}
Assert.assertEquals(valueCount, list.size());
Assert.assertEquals(valueCount, resultList.size());
Assert.assertEquals(0, errorCount);

// REMOVE
Expand All @@ -102,9 +103,7 @@ public void testInsertAndGet() {
public void testErrorCount() {
int valueCount = 1200;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
}
Arrays.fill(valueList, "MyValue");

try {
// SET
Expand All @@ -115,7 +114,10 @@ public void testErrorCount() {
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
TimeUnit.MILLISECONDS);
assertEquals(valueCount, map.size());

assertEquals(map.get(mc.getMaxPipedItemCount() - 1).getResponse(),
CollectionResponse.NOT_FOUND);
assertEquals(map.get(mc.getMaxPipedItemCount()).getResponse(),
CollectionResponse.CANCELED);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
Expand Down

0 comments on commit 9eb5496

Please sign in to comment.