Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: make lop piped operations process synchronously #795

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/user_guide/04-list-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ asyncLopPipedInsertBulk(String key, int index, List<Object> valueList, Collectio
- null: element ์‚ฝ์ž…ํ•˜์ง€ ์•Š๋Š”๋‹ค.
- attributes: ์ฃผ์–ด์ง„ attributes๋ฅผ ๊ฐ€์ง„ empty list item ์ƒ์„ฑ ํ›„์— element ์‚ฝ์ž…ํ•œ๋‹ค.

๋งŒ์•ฝ ์‚ฝ์ž… ์š”์ฒญ ์ค‘ ์ผ๋ถ€๊ฐ€ ์‹คํŒจํ•˜๋ฉด filled map์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ๋งˆ์ง€๋ง‰ entry์˜ value์—
`CollectionResponse๊ฐ€ CANCELED์ธ CollectionOperationStatus`๊ฐ€ ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ, ์ด์ „ ์˜ค๋ฅ˜๋กœ ์ธํ•ด
ํ•ด๋‹น ์š”์ฒญ ์ดํ›„์˜ ๋ชจ๋“  ์š”์ฒญ์ด ์ทจ์†Œ๋˜์—ˆ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•œ๋‹ค.

๋‘˜์งธ, ์—ฌ๋Ÿฌ key๋“ค์ด ๊ฐ€๋ฆฌํ‚ค๋Š” list๋“ค์— ๊ฐ๊ฐ ๋™์ผํ•œ ํ•˜๋‚˜์˜ element๋ฅผ ์‚ฝ์ž…ํ•˜๋Š” ํ•จ์ˆ˜์ด๋‹ค.

```java
Expand Down
77 changes: 76 additions & 1 deletion src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import java.util.jar.JarFile;
import java.util.jar.Manifest;

Expand Down Expand Up @@ -1852,11 +1854,16 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
insertList.add(new ListPipedInsert<>(key, index, valueList, attributesForCreate, tc));
} else {
PartitionedList<T> list = new PartitionedList<>(valueList, MAX_PIPED_ITEM_COUNT);

for (List<T> elementList : list) {
insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc));
if (index >= 0) {
index += elementList.size();
}
}
}
return asyncCollectionPipedInsert(key, insertList);

return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList));
}

@Override
Expand Down Expand Up @@ -3114,6 +3121,74 @@ public void gotStatus(Integer index, OperationStatus status) {
return rv;
}

/**
* Pipe insert method for collection items.
*
* @param key arcus cache key
* @param insertList must not be empty.
* @return future holding the map of element index and the reason why insert operation failed
*/
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {
final CountDownLatch latch = new CountDownLatch(1);
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<>(latch, operationTimeout);

final List<Operation> ops = new ArrayList<>(insertList.size());
IntFunction<OperationCallback> makeCallback = opIdx -> new CollectionPipedInsertOperation.Callback() {
// each result status
public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

// complete
public void complete() {
if (opIdx == insertList.size() - 1 || rv.hasErrored() || rv.isCancelled()) {
latch.countDown();
} else if (!rv.getOperationStatus().isSuccess()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rv.getOperationStatus().isSuccess() ์กฐ๊ฑด์œผ๋กœ next ์—ฐ์‚ฐ ์ˆ˜ํ–‰ ์—ฌ๋ถ€๋ฅผ ํŒ๋‹จํ•˜๊ธฐ ์–ด๋ ต์Šต๋‹ˆ๋‹ค.
์ด ๋ถ€๋ถ„์— ๋Œ€ํ•ด offline ๋…ผ์˜ํ•ด์•ผ ํ•  ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

๊ด€๋ จ ์‚ฌํ•ญ์„ ์ข€ ๋” ์ž์„ธํ•˜๊ฒŒ ์ ์œผ๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • PIPE_ERROR ์‘๋‹ต์ด ์˜จ ๊ฒฝ์šฐ
    • ํ˜„์žฌ์˜ piped ์—ฐ์‚ฐ์—์„œ PIPE_ERROR๋กœ ์ธํ•ด ์•„์˜ˆ ์ˆ˜ํ–‰๋˜์ง€ item ์—ฐ์‚ฐ์ด ์žˆ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    • ์ด ๊ฒฝ์šฐ, ํ˜„์žฌ failed Result์— ๊ฒฐ๊ณผ๋ฅผ ๋‹ด์ง€ ์•Š๊ณ  ์žˆ์œผ๋ฏ€๋กœ, ์ด๋ฅผ ์ถ”๊ฐ€ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
  • PIPE_END ์‘๋‹ต์ด ์˜จ ๊ฒฝ์šฐ
    • ๊ฐœ๋ณ„ ์—ฐ์‚ฐ์— ๋Œ€ํ•ด NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
    • B+Tree collection์— ๋Œ€ํ•ด BKEY_MISMATCH ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•œ ๊ฒฝ์šฐ, ์ฃผ์–ด์ง„ element๋งŒ insert ์‹คํŒจํ•˜๊ณ  ๋‚˜๋จธ์ง„ ์„ฑ๊ณตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ฒฝ์šฐ, next ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์ด ๋งž์ง€ ์•Š๋Š” ์ง€ ? ELEMENT_EXISTS ์˜ค๋ฅ˜๋„ ๋น„์Šทํ•ด ๋ณด์ž…๋‹ˆ๋‹ค.
    • ๊ทธ ์™ธ์˜ ์˜ค๋ฅ˜์ธ ๊ฒฝ์šฐ๋Š” ์บ์‹œ ์„œ๋ฒ„์—์„œ ์ˆ˜ํ–‰์„ ์ค‘์ง€ํ•˜๊ณ , PIPE_ERROR ๋ฆฌํ„ดํ•ด์•ผ ํ•˜์ง€ ์•Š๋‚˜ ์ƒ๊ฐํ•ฉ๋‹ˆ๋‹ค.
    • ๊ฒฐ๊ตญ, ์บ์‹œ ์„œ๋ฒ„ ๋™์ž‘ ๋ถ€๋ถ„๋„ ํ•จ๊ป˜ ๊ณ ๋ ค๋˜์–ด์•ผ ํ•˜์ง€ ์•Š๋‚˜ ์ƒ๊ฐํ•ฉ๋‹ˆ๋‹ค.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

END ์‘๋‹ต์ด ์™”๊ณ  btree์ผ ๋•Œ BKEY_MISMATCH, map, set์ผ ๋•Œ ELEMENT_EXISTS๋งŒ ๋ฐœ์ƒํ–ˆ๋‹ค๋ฉด ๋‹ค์Œ Operation ๋ณด๋‚ด๋Š” ๊ฒƒ์—๋Š” ๋™์˜ํ•ฉ๋‹ˆ๋‹ค.

PIPE_ERROR๊ฐ€ CLIENT_ERROR, SERVER_ERROR ๋กœ ์ธํ•ด ๋ฐœ์ƒํ–ˆ๋‹ค๋ฉด ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•ด failed result๋ฅผ ์กฐํšŒํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. PIPE_ERROR๊ฐ€ command/memory overflow ๋กœ ์ธํ•ด ๋ฐœ์ƒํ–ˆ๋‹ค๋ฉด ์—๋Ÿฌ๋ฅผ ๋ฌด์‹œํ•˜๊ณ  ๋ชจ๋“  ์‘๋‹ต์„ ๊ธฐ์ค€์œผ๋กœ ์ „์ฒด ์„ฑ๊ณต/์‹คํŒจ ์—ฌ๋ถ€๋งŒ ํŒ๋ณ„ํ•ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ "PIPE_ERROR" ๋ผ๋Š” ์‘๋‹ต์ด ์™”๋Š”์ง€ future์—์„œ๋Š” ํ™•์ธํ•  ์ˆ˜ ์—†์œผ๋ฉฐ, failed result์— ๊ฒฐ๊ณผ๋ฅผ ๋„ฃ์„ ํ•„์š”๋„ ์—†์Šต๋‹ˆ๋‹ค.

if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla

๊ธฐ์กด pipe ์ฒ˜๋ฆฌ์—๋„ ์ผ๋ถ€ ๋ฌธ์ œ๊ฐ€ ์žˆ๋Š” ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • CLIENT_ERROR, SERVER_ERROR ๋“ฑ์˜ ์˜ค๋ฅ˜๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ, java client๋Š” PIPE_ERROR ์‘๋‹ต๊นŒ์ง€ ์ฝ์ง€ ์•Š๋Š”๋‹ค.
    ๊ทธ๋Ÿฌ๋ฉด, ๋‹ค์Œ ์—ฐ์‚ฐ์—์„œ ์‘๋‹ต์„ ์ฝ์„ ์‹œ์— PIPE_ERROR ์‘๋‹ต์„ ๋ณด๊ฒŒ ๋˜๋Š” ๋ฌธ์ œ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.
    ๋Œ€๋žต ํ™•์ธํ•œ ์‚ฌํ•ญ์ด๋ผ, ์ด ๋ถ€๋ถ„์— ๊ด€ํ•œ ์ฝ”๋“œ๋Š” ์ง์ ‘ ํ™•์ธํ•ด ๋ณด๊ธฐ ๋ฐ”๋ž๋‹ˆ๋‹ค.
  • ์•„๋ž˜ 2๊ฐ€์ง€ PIPE_ERROR์— ๋Œ€ํ•ด "END" ์‘๋‹ต์„ ๋ฐ›์€ ๊ฒฝ์šฐ์™€ ๋™์ผํ•˜๊ฒŒ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
    ์ฆ‰, ์ฒ˜๋ฆฌํ•˜์ง€ ๋ชปํ•œ ์—ฐ์‚ฐ์ด ์กด์žฌํ•˜๋”๋ผ๋„ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ํŒ๋‹จํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
    • PIPE_ERROR command overflow
    • PIPE_ERROR memory overflow

์ด๋Ÿฌํ•œ pipe ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜๋Š” ๋ณธ PR๊ณผ๋Š” ๋ณ„๋„๋กœ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•˜์ฃ ?
๊ฒ€ํ†  ๋ฐ”๋ž๋‹ˆ๋‹ค.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLIENT_ERROR, SERVER_ERROR ๋“ฑ์˜ ์˜ค๋ฅ˜๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ, ๋ฐ”๋กœ OperationException์ด throw ๋ฉ๋‹ˆ๋‹ค.
์ด ์˜ˆ์™ธ๋ฅผ MemcachedConnection์—์„œ catchํ•˜์—ฌ ์—ฐ๊ฒฐ์„ ๋Š๊ธฐ ๋•Œ๋ฌธ์— ๋‹ค์Œ ์—ฐ์‚ฐ์ด ์‘๋‹ต์„ ์ฝ์„ ์ˆ˜ ์—†์„ ๊ฒƒ์œผ๋กœ ๋ณด์ž…๋‹ˆ๋‹ค.

} catch (OperationException e) {
qa.setupForAuth("operation exception"); // noop if !shouldAuth
getLogger().warn("Reconnection due to exception " +
"handling a memcached exception on %s.", qa, e);
lostConnection(qa, ReconnDelay.IMMEDIATE, "operation exception");

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PIPE_ERROR command/memory overflow ์˜ค๋ฅ˜ ์‹œ, ์ฒ˜๋ฆฌ๋˜์ง€ ๋ชปํ•œ ์—ฐ์‚ฐ์— ๋Œ€ํ•ด
๋ณธ PR ์ฒ˜๋Ÿผ CANCELED CollectionOperationStatus ์„ค์ •ํ•˜๋Š” ๊ฒƒ์ด ์ข‹๊ฒ ์Šต๋‹ˆ๋‹ค.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PIPE_ERROR command/memory overflow ์˜ค๋ฅ˜ ์‹œ, ์ฒ˜๋ฆฌ๋˜์ง€ ๋ชปํ•œ ์—ฐ์‚ฐ์— ๋Œ€ํ•ด
๋ณธ PR ์ฒ˜๋Ÿผ CANCELED CollectionOperationStatus ์„ค์ •ํ•˜๋Š” ๊ฒƒ์ด ์ข‹๊ฒ ์Šต๋‹ˆ๋‹ค.

์œ„ ์‚ฌํ•ญ์„ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•ด
์•„๋ž˜ ๋กœ์ง์—์„œ cb.receivedStatus() ํ˜ธ์ถœ ์‹œ์˜ ์ธ์ž ๊ฐ’์ด ์ˆ˜์ •๋˜์–ด์•ผ ํ•  ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

         if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { 
	   /* ENABLE_MIGRATION if */ 
	   if (needRedirect()) { 
	     transitionState(OperationState.REDIRECT); 
	     return; 
	   } 
	   /* ENABLE_MIGRATION end */ 
	   cb.receivedStatus((successAll) ? END : FAILED_END); 
	   transitionState(OperationState.COMPLETE); 
	 } else if (line.startsWith("RESPONSE ")) { 

์•„๋ž˜์™€ ๊ฐ™์ด ํ˜ธ์ถœํ•ด์•ผ ํ•  ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

	   cb.receivedStatus((index == count && successAll) ? END : FAILED_END); 

๋˜ํ•œ, ์•„๋ž˜ ๊ฒฝ์šฐ์— CENCEL ์ƒํƒœ๋ฅผ ์ถ”๊ฐ€ํ•˜๋Š” ๋กœ์ง๋„ ์ˆ˜์ •๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

        if (!rv.getOperationStatus().isSuccess()) {
          rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
                  new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED));
          latch.countDown();
        } 

์œ„์™€ ๊ฐ™์ด ๋ณ€๊ฒฝ๋˜๋ฉด,
complete() ๋กœ์ง์—์„œ์˜ ๊ฒ€์‚ฌ ์ˆœ์„œ๋„ ๋ณ€๊ฒฝ๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
์•„๋ž˜ ์กฐ๊ฑด์ด ๊ฐ€์žฅ ๋จผ์ € ๊ฒ€์‚ฌ๋˜์–ด์•ผ ํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค.

        if (!rv.getOperationStatus().isSuccess()) 

// If this operation failed, remaining subsequent operation
// should not be added and should be marked as cancelled.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete() ๋กœ์ง์„ ์™„๋ฒฝํ•˜๊ฒŒ ๋งŒ๋“ค๋ ค๋ฉด,
PIPE_ERROR์ด๋”๋ผ๋„ ๋ชจ๋“  ์‘๋‹ต์„ ์ฝ์–ด๋‚ด๋Š” ๊ธฐ๋Šฅ์ด ๋จผ์ € ๊ตฌํ˜„๋˜์–ด์•ผ ํ•  ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.
์ด๋ฅผ ์ด์Šˆ๋กœ ์˜ฌ๋ฆฌ๊ณ  ์ž‘์—…์„ ์‹œ์ž‘ํ•˜๋ฉด ์ข‹๊ฒ ์Šต๋‹ˆ๋‹ค.

rv.addEachResult((opIdx + 1) * MAX_PIPED_ITEM_COUNT,
new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED));
latch.countDown();
} else {
// add next operation if this is not last op
Operation nextOp = ops.get(opIdx + 1);
rv.addOperation(nextOp);
addOp(key, nextOp);
}
}

// got status
public void gotStatus(Integer index, OperationStatus status) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
};

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i));
ops.add(op);
}
rv.addOperation(ops.get(0));
addOp(key, ops.get(0));
return rv;
}

@Override
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
List<String> keyList, long bkey, byte[] eFlag, Object value,
Expand Down
75 changes: 35 additions & 40 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.spy.memcached.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -11,13 +11,15 @@
import java.util.concurrent.atomic.AtomicReference;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final Collection<Operation> ops = new ArrayList<>();
// operations that are completed or in progress
private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);

Expand All @@ -30,67 +32,60 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
}
return rv;
return ops.get(ops.size() - 1).cancel("by application.");
}

/**
* if previous op is cancelled, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation is cancelled.
*/
@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
return operationStatus.get().getResponse() == CollectionResponse.CANCELED;
oliviarla marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* if previous op threw exception, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation has errored by exception.
*/
public boolean hasErrored() {
return ops.get(ops.size() - 1).hasErrored();
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
return latch.getCount() == 0;
}

@Override
public Map<K, V> get(long duration, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {

long beforeAwait = System.currentTimeMillis();
Operation lastOp;
if (!latch.await(duration, unit)) {
Collection<Operation> timedOutOps = new ArrayList<>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedOutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (!timedOutOps.isEmpty()) {
// set timeout only once for piped ops requested to single node.
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
lastOp = ops.get(ops.size() - 1);
if (lastOp.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(lastOp);

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp);
oliviarla marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
lastOp = ops.get(ops.size() - 1);
MemcachedConnection.opSucceeded(lastOp);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (lastOp != null && lastOp.hasErrored()) {
throw new ExecutionException(lastOp.getException());
}
oliviarla marked this conversation as resolved.
Show resolved Hide resolved

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
if (lastOp != null && lastOp.isCancelled()) {
throw new ExecutionException(new RuntimeException(lastOp.getCancelCause()));
}

return failedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ assert getState() == OperationState.READING
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
if (line.startsWith("END")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
Expand All @@ -150,6 +150,10 @@ assert getState() == OperationState.READING
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("PIPE_ERROR ")) {
// command flow / memory flow
jhpark816 marked this conversation as resolved.
Show resolved Hide resolved
cb.receivedStatus(FAILED_END);
transitionState(OperationState.COMPLETE);
oliviarla marked this conversation as resolved.
Show resolved Hide resolved
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,32 @@
*/
package net.spy.memcached.bulkoperation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.ArcusClient;
import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

class LopInsertBulkMultipleValueTest extends BaseIntegrationTest {
class LopPipedInsertBulkMultipleValueTest extends BaseIntegrationTest {

private String key = "LopInsertBulkMultipleValueTest";
private String key = "LopPipedInsertBulkMultipleValueTest";

@AfterEach
@Override
Expand All @@ -50,10 +54,10 @@ protected void tearDown() throws Exception {
void testInsertAndGet() {
String value = "MyValue";

int valueCount = 500;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
int valueCount = 510;
List<Object> valueList = new ArrayList<>(valueCount);
for (int i = 0; i < valueCount; i++) {
valueList.add("MyValue" + i);
}

try {
Expand All @@ -62,8 +66,7 @@ void testInsertAndGet() {

// SET
Future<Map<Integer, CollectionOperationStatus>> future = mc
.asyncLopPipedInsertBulk(key, 0, Arrays.asList(valueList),
new CollectionAttributes());
.asyncLopPipedInsertBulk(key, 0, valueList, new CollectionAttributes());
try {
Map<Integer, CollectionOperationStatus> errorList = future.get(
20000L, TimeUnit.MILLISECONDS);
Expand All @@ -76,27 +79,27 @@ void testInsertAndGet() {

// GET
int errorCount = 0;
List<Object> list = null;
List<Object> resultList = null;
Future<List<Object>> f = mc.asyncLopGet(key, 0, valueCount, false,
false);
try {
list = f.get();
resultList = f.get();
} catch (Exception e) {
f.cancel(true);
e.printStackTrace();
fail(e.getMessage());
}

assertNotNull(list, "List is null.");
assertTrue(!list.isEmpty(), "Cached list is empty.");
assertEquals(valueCount, list.size());
assertNotNull(resultList);
assertFalse(resultList.isEmpty(), "Cached list is empty.");
assertEquals(valueCount, resultList.size());

for (Object o : list) {
if (!value.equals(o)) {
for (int i = 0; i < resultList.size(); i++) {
if (!resultList.get(i).equals(valueList.get(i))) {
errorCount++;
}
}
assertEquals(valueCount, list.size());
assertEquals(valueCount, resultList.size());
assertEquals(0, errorCount);

// REMOVE
Expand All @@ -111,9 +114,7 @@ void testInsertAndGet() {
void testErrorCount() {
int valueCount = 1200;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
}
Arrays.fill(valueList, "MyValue");

try {
// SET
Expand All @@ -123,8 +124,11 @@ void testErrorCount() {

Map<Integer, CollectionOperationStatus> map = future.get(1000L,
TimeUnit.MILLISECONDS);
assertEquals(valueCount, map.size());

assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(),
CollectionResponse.NOT_FOUND);
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(),
CollectionResponse.CANCELED);
} catch (Exception e) {
e.printStackTrace();
fail();
Expand Down