Skip to content

Commit

Permalink
INTERNAL: lop pipe version2
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Aug 16, 2024
1 parent e99869f commit 2f92cfc
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 40 deletions.
77 changes: 76 additions & 1 deletion src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1867,7 +1868,8 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);

return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
}

@Override
Expand Down Expand Up @@ -3170,6 +3172,79 @@ public void gotStatus(Integer index, OperationStatus status) {
return rv;
}

/**
* Pipe insert method for collection items.
*
* @param key arcus cache key
* @param insertList must not be empty.
* @return future holding the map of element index and the reason why insert operation failed
*/
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {
final CountDownLatch latch = new CountDownLatch(1);
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<>(latch, operationTimeout);

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
final int idx = i;
Operation op = opFact.collectionPipedInsert(key, insert,
new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
if (idx == insertList.size() - 1) {
// countdown if this is last op
latch.countDown();
} else {
// if error or cancel occurred by this operation,
// do not add all remaining operations and mark as cancelled
if (rv.hasErrored() || !rv.getOperationStatus().isSuccess()) {

This comment has been minimized.

Copy link
@jhpark816

jhpark816 Aug 19, 2024

rv.hasErrored() 경우와 rv.isCancelled() 경우는 latch.countDown()만 수행하면 됩니다. 이 경우, rv.failedResult는 조회되지 않기 때문입니다.

!rv.getOperationStatus().isSuccess() 경우에 CANCELLED로 실패를 등록하는 것은 어색합니다.

그리고, 기존에서 1~500개 연산을 pipelining 명령으로 요청할 경우, 100개 연산까지 처리하다가 101번째 연산에서 치명적인 오류가 발생하면, 101번째 연산의 오류는 rv.failedResult에 등록하지만, 102~500범위의 연산들은 rv.failedResult에 등록하지 않았습니다. 이 부분을 검토하기 바랍니다.

This comment has been minimized.

Copy link
@uhm0311

uhm0311 Aug 19, 2024

@jhpark816

치명적인 오류가 CLIENT_ERROR 혹은 SERVER_ERROR를 말하는 것인가요?

This comment has been minimized.

Copy link
@jhpark816

jhpark816 Aug 19, 2024

치명적인 오류란
PIPE_ERROR 유발시키는 오류를 표현하려는 의도로 사용했습니다.

This comment has been minimized.

Copy link
@uhm0311

uhm0311 Aug 19, 2024

PIPE_ERROR는 CLIENT_ERROR, SERVER_ERROR, ERROR인 경우에만 발생하지 않나요?
각 경우에 100번째 연산에 대한 오류 응답 이후로 캐시 서버로부터 어떤 응답을 기대해야 하나요?

  • CLIENT_ERROR인 경우
  • SERVER_ERROR인 경우
  • ERROR인 경우

This comment has been minimized.

Copy link
@oliviarla

oliviarla Aug 19, 2024

Author Owner

@jhpark816
ERROR, CLIENT_ERROR, SERVER_ERROR 로 시작하는 라인이 반환되면 아예 해당 라인의 결과를 failedResult로 쓰지 않고 complete 시킵니다. 가지고 있던 커넥션도 재연결하므로 hasErrored 인 경우에는 말씀하신대로 failedResult에 결과를 저장하지 않는 것이 맞는 것 같습니다. cancelled인 경우도 마찬가지로 failedResult에 결과를 저장할 필요가 없고요.

다만 OVERFLOWED, OUT_OF_RANGE 등의 오류가 앞 Operation에서 발생했다면 나머지 Operation에서 cancelled 오류로 처리해주는 작업이 필요하다고 생각합니다. 기존 async 방식대로라면 300개의 max count를 가진 list에 1000개 아이템(0~999 숫자)을 입력하려 하면 301~499 까지는 out of range 오류가 반환되고 500~999까지는 overflow 오류가 반환됩니다.
따라서 sync 방식으로 동작할 때에도 301~499에서 out of range가 발생했다면 500~999에서도 cancel이라는 오류를 반환해 정상 수행되지 않았음을 표현하는 것이 나을 것 같습니다.

그리고 한 가지 의문인 점은 0~999까지의 데이터를 list에 넣고자 할 때 순서가 지켜지지 않는다는 것입니다. 0~499가 0번 인덱스를 기준으로 삽입되고, 500~999가 또다시 0번 인덱스를 기준으로 삽입되기 때문에 list의 맨 처음에는 500이라는 값이 존재하게 됩니다.
이러한 동작이 정상적인건가요? 만약 제가 내부 동작을 모르는 채로 파이프를 사용한다면 0~999 순서대로 list에 저장되어 있다고 생각할 것 같습니다.

This comment has been minimized.

Copy link
@jhpark816

jhpark816 Aug 19, 2024

그리고 한 가지 의문인 점은 0~999까지의 데이터를 list에 넣고자 할 때 순서가 지켜지지 않는다는 것입니다. 0~499가 0번 인덱스를 기준으로 삽입되고, 500~999가 또다시 0번 인덱스를 기준으로 삽입되기 때문에 list의 맨 처음에는 500이라는 값이 존재하게 됩니다.
이러한 동작이 정상적인건가요? 만약 제가 내부 동작을 모르는 채로 파이프를 사용한다면 0~999 순서대로 list에 저장되어 있다고 생각할 것 같습니다.

정상이 아닙니다. 이는 버그이므로, 별도 PR로 처리하는 것이 좋겠습니다.

This comment has been minimized.

Copy link
@jhpark816

jhpark816 Aug 19, 2024

다만 OVERFLOWED, OUT_OF_RANGE 등의 오류가 앞 Operation에서 발생했다면 나머지 Operation에서 cancelled 오류로 처리해주는 작업이 필요하다고 생각합니다.

이 경우는 PIPE_ERROR가 아닌 경우이죠?
의미적으로는 필요한 사항이라고 생각됩니다.
단, 원소(element)에 대한 수행 결과까지도 확인하고 그에 따라 처리 로직을 다르게 가져가는 부분이 어색해 보입니다.
이러한 오류를 PIPE_ERROR로 분류하는 것이 나은가요?

This comment has been minimized.

Copy link
@jhpark816

jhpark816 Aug 19, 2024

정상이 아닙니다. 이는 버그이므로, 별도 PR로 처리하는 것이 좋겠습니다.

pipe 연산을 N개의 단위 연산으로 나누고,
N개의 단위 연산을 모두 비동기로 처리하는 방식 자체가 문제이네요.
버그를 해결하는 방법으로,
현재 PR 처럼 동기로 처리하는 방식 외에는 다른 방법이 없습니다.

This comment has been minimized.

Copy link
@oliviarla

oliviarla Aug 19, 2024

Author Owner

커밋 관리가 어려워 naver#795 PR로 올렸습니다.

이 경우는 PIPE_ERROR가 아닌 경우이죠?

네 맞습니다.

단, 원소(element)에 대한 수행 결과까지도 확인하고 그에 따라 처리 로직을 다르게 가져가는 부분이 어색해 보입니다.
이러한 오류를 PIPE_ERROR로 분류하는 것이 나은가요?

이부분은 무엇을 의미하는지 잘 이해가 안갑니다. 일단 현재까지 피드백을 반영해두었으니 나머지 부분은 PR에서 코멘트 달아주시면 감사하겠습니다.

for (int chunkIdx = idx + 1; chunkIdx < insertList.size(); chunkIdx++) {
for (int itemIdx = 0; itemIdx < insertList.get(chunkIdx).getItemCount(); itemIdx++) {
rv.addEachResult(itemIdx + (chunkIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(new CollectionOperationStatus(
false, "CANCELED", CollectionResponse.CANCELED)));
}
}
latch.countDown();
} else {
// add next operation if this is not last op
Operation nextOp = rv.getOp(idx + 1);
addOp(key, nextOp);
}
}
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
});
rv.addOperation(op);
}
addOp(key, rv.getOp(0));
return rv;
}

@Override
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
List<String> keyList, long bkey, byte[] eFlag, Object value,
Expand Down
67 changes: 35 additions & 32 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package net.spy.memcached.internal;

import java.util.Collection;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,7 +17,7 @@

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<>();
private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);

Expand All @@ -31,11 +30,12 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
if (!op.getState().equals(OperationState.COMPLETE)) {

This comment has been minimized.

Copy link
@jhpark816

jhpark816 Aug 19, 2024

아래 조건과 equals() 사용하는 조건 중 어떤 방식이 나은가요?

(op.getState() != OperationState.COMPLETE)

This comment has been minimized.

Copy link
@oliviarla

oliviarla Aug 19, 2024

Author Owner

enum 타입이라서 != 를 사용해도 되겠네요. 수정하겠습니다.

return op.cancel("by application.");
}
}
return rv;
return false;
}

@Override
Expand All @@ -48,50 +48,49 @@ public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
public boolean hasErrored() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
if (op.hasErrored()) {
return true;
}
}
return true;
return false;
}

@Override
public boolean isDone() {
return latch.getCount() == 0;
}

@Override
public Map<K, V> get(long duration, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {

System.out.println("ops size:" + ops.size());
long beforeAwait = System.currentTimeMillis();
Operation lastOp = ops.get(ops.size() - 1);
if (!latch.await(duration, unit)) {
Collection<Operation> timedOutOps = new HashSet<>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedOutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (!timedOutOps.isEmpty()) {
// set timeout only once for piped ops requested to single node.
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
if (lastOp.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(lastOp);

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp);
} else {
for (Operation op : ops) {
MemcachedConnection.opSucceeded(op);
}
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
MemcachedConnection.opSucceeded(lastOp);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (lastOp != null && lastOp.hasErrored()) {
throw new ExecutionException(lastOp.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
if (lastOp != null && lastOp.isCancelled()) {
throw new ExecutionException(new RuntimeException(lastOp.getCancelCause()));
}

return failedResult;
Expand Down Expand Up @@ -120,4 +119,8 @@ public void addEachResult(K index, V status) {
public void addOperation(Operation op) {
ops.add(op);
}

public Operation getOp(int index) {
return this.ops.get(index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;

import org.junit.Assert;
Expand All @@ -44,9 +45,7 @@ public void testInsertAndGet() {

int valueCount = 500;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
}
Arrays.fill(valueList, "MyValue");

try {
// REMOVE
Expand Down Expand Up @@ -102,9 +101,7 @@ public void testInsertAndGet() {
public void testErrorCount() {
int valueCount = 1200;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
}
Arrays.fill(valueList, "MyValue");

try {
// SET
Expand All @@ -115,7 +112,10 @@ public void testErrorCount() {
Map<Integer, CollectionOperationStatus> map = future.get(1000L,
TimeUnit.MILLISECONDS);
assertEquals(valueCount, map.size());

assertEquals(map.get(mc.getMaxPipedItemCount() - 1).getResponse(),
CollectionResponse.NOT_FOUND);
assertEquals(map.get(mc.getMaxPipedItemCount()).getResponse(),
CollectionResponse.CANCELED);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
Expand Down

0 comments on commit 2f92cfc

Please sign in to comment.