From e1f39289c89567fa2f353a982a19a09be6ad495b Mon Sep 17 00:00:00 2001 From: oliviarla Date: Tue, 24 Dec 2024 15:03:28 +0900 Subject: [PATCH] INTERNAL: read while PIPE_ERROR received in the pipe operation --- .../internal/BulkOperationFuture.java | 10 +- .../memcached/protocol/BaseOperationImpl.java | 38 +++--- .../CollectionPipedInsertOperationImpl.java | 123 +++++++++++------- .../protocol/ascii/OperationImpl.java | 7 +- .../memcached/protocol/ascii/BaseOpTest.java | 55 ++++++++ 5 files changed, 162 insertions(+), 71 deletions(-) diff --git a/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java b/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java index 28aaa997e..c4350b257 100644 --- a/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java +++ b/src/main/java/net/spy/memcached/internal/BulkOperationFuture.java @@ -3,6 +3,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -90,16 +91,21 @@ public Map get(long duration, MemcachedConnection.opsSucceeded(ops); } + List exceptions = new ArrayList<>(); for (Operation op : ops) { if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); + exceptions.add(op.getException()); } if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); + exceptions.add(new RuntimeException(op.getCancelCause())); } } + if (!exceptions.isEmpty()) { + throw new CompositeException(exceptions); + } + return failedResult; } diff --git a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java index 52da18328..6865851e6 100644 --- a/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java @@ -36,6 +36,8 @@ import net.spy.memcached.ops.OperationType; import net.spy.memcached.ops.StatusCode; +import static net.spy.memcached.ops.OperationErrorType.CLIENT; + /** * Base class for protocol-specific operation implementations. */ @@ -51,7 +53,7 @@ public abstract class BaseOperationImpl extends SpyObject { private boolean cancelled = false; private final AtomicBoolean callbacked = new AtomicBoolean(false); private String cancelCause = null; - private OperationException exception = null; + protected OperationException exception = null; private OperationCallback callback = null; private volatile MemcachedNode handlingNode = null; @@ -239,31 +241,25 @@ public final void writeComplete() { public abstract void readFromBuffer(ByteBuffer data) throws IOException; protected void handleError(OperationErrorType eType, String line) - throws IOException { + throws OperationException { getLogger().error("Error: %s by %s", line, this); - switch (eType) { - case GENERAL: - case SERVER: - exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName()); - break; - case CLIENT: - if (line.contains("bad command line format")) { - initialize(); - byte[] bytes = new byte[cmd.remaining()]; - cmd.get(bytes); - - String[] cmdLines = new String(bytes).split("\r\n"); - getLogger().error("Bad command: %s", cmdLines[0]); - } - exception = new OperationException(eType, line + " @ " + handlingNode.getNodeName()); - break; - default: - assert false; - } + exception = createException(eType, line); transitionState(OperationState.COMPLETE); throw exception; } + protected final OperationException createException(OperationErrorType eType, String line) { + if (eType == CLIENT && line.contains("bad command line format")) { + initialize(); + byte[] bytes = new byte[cmd.remaining()]; + cmd.get(bytes); + + String[] cmdLines = new String(bytes).split("\r\n"); + getLogger().error("Bad command: %s", cmdLines[0]); + } + return new OperationException(eType, line + " @ " + handlingNode.getNodeName()); + } + public void handleRead(ByteBuffer data) { assert false; } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index e472f7e8a..7d068b2fa 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -27,6 +27,7 @@ import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.CollectionPipedInsertOperation; import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationErrorType; import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.OperationType; @@ -89,33 +90,29 @@ public CollectionPipedInsertOperationImpl(String key, } setOperationType(OperationType.WRITE); } - @Override public void handleLine(String line) { assert getState() == OperationState.READING : "Read ``" + line + "'' when in " + getState() + " state"; - /* ENABLE_REPLICATION if */ - if (hasSwitchedOver(line)) { - this.insert.setNextOpIndex(index); - prepareSwitchover(line); - return; - } - /* ENABLE_REPLICATION end */ - /* ENABLE_MIGRATION if */ - if (hasNotMyKey(line)) { - // Only one NOT_MY_KEY is provided in response of single key piped operation when redirection. - addRedirectSingleKeyOperation(line, key); - if (insert.isNotPiped()) { + if (insert.isNotPiped()) { + // insert object contains only one command. + + /* ENABLE_REPLICATION if */ + if (hasSwitchedOver(line)) { + prepareSwitchover(line); + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + addRedirectSingleKeyOperation(line, key); transitionState(OperationState.REDIRECT); - } else { - insert.setNextOpIndex(index); + return; } - return; - } - /* ENABLE_MIGRATION end */ + /* ENABLE_MIGRATION end */ - if (insert.isNotPiped()) { OperationStatus status = matchStatus(line, STORED, CREATED_STORED, NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH); @@ -126,26 +123,8 @@ assert getState() == OperationState.READING cb.receivedStatus(FAILED_END); } transitionState(OperationState.COMPLETE); - return; - } - - /* - RESPONSE \r\n - \r\n - [ ... ] - \r\n - END|PIPE_ERROR \r\n - */ - if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { - /* ENABLE_MIGRATION if */ - if (needRedirect()) { - transitionState(OperationState.REDIRECT); - return; - } - /* ENABLE_MIGRATION end */ - cb.receivedStatus((successAll) ? END : FAILED_END); - transitionState(OperationState.COMPLETE); } else if (line.startsWith("RESPONSE ")) { + // insert object contains multiple commands getLogger().debug("Got line %s", line); // TODO server should be fixed @@ -155,16 +134,68 @@ assert getState() == OperationState.READING String[] stuff = line.split(" "); assert "RESPONSE".equals(stuff[0]); count = Integer.parseInt(stuff[1]); - } else { - OperationStatus status = matchStatus(line, STORED, CREATED_STORED, - NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, - TYPE_MISMATCH, BKEY_MISMATCH); + setReadType(OperationReadType.DATA); + } + } - if (!status.isSuccess()) { - cb.gotStatus(index, status); - successAll = false; + @Override + public void handleRead(ByteBuffer bb) { + while (bb.remaining() > 0) { + try { + String line = getLineFromBuffer(bb); + if (line == null) { + break; + } + OperationErrorType eType = classifyError(line); + if (eType != null) { + this.exception = createException(eType, line); + continue; + } + + /* ENABLE_REPLICATION if */ + if (hasSwitchedOver(line)) { + this.insert.setNextOpIndex(index); + prepareSwitchover(line); + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + // Only one NOT_MY_KEY is provided in + // response of single key piped operation when redirection. + addRedirectSingleKeyOperation(line, key); + insert.setNextOpIndex(index); + return; + } + /* ENABLE_MIGRATION end */ + + if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + /* ENABLE_MIGRATION if */ + if (needRedirect()) { + transitionState(OperationState.REDIRECT); + return; + } + /* ENABLE_MIGRATION end */ + cb.receivedStatus((successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + return; + } else { + OperationStatus status = matchStatus(line, STORED, CREATED_STORED, + NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, + TYPE_MISMATCH, BKEY_MISMATCH); + + if (!status.isSuccess()) { + cb.gotStatus(index, status); + successAll = false; + } + index++; + } + } catch (Exception e) { + getLogger().error("Failed to parse line: %s", e.getMessage()); + transitionState(OperationState.COMPLETE); + return; } - index++; } } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java index 37a8d7723..7c7f115d6 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/OperationImpl.java @@ -104,7 +104,7 @@ protected final void setArguments(ByteBuffer bb, Object... args) { bb.put(CRLF); } - private String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingException { + protected final String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingException { boolean lineFound = false; while (data.remaining() > 0) { byte b = data.get(); @@ -128,7 +128,7 @@ private String getLineFromBuffer(ByteBuffer data) throws UnsupportedEncodingExce return null; } - private OperationErrorType classifyError(String line) { + protected final OperationErrorType classifyError(String line) { OperationErrorType rv = null; if (line.startsWith("ERROR")) { rv = OperationErrorType.GENERAL; @@ -163,6 +163,9 @@ public void readFromBuffer(ByteBuffer data) throws IOException { } else { // OperationReadType.DATA handleRead(data); } + if (hasErrored() && isPipeOperation() && getState() == OperationState.COMPLETE) { + throw getException(); + } } } diff --git a/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java b/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java index 65cd8b8cc..5d0ead20a 100644 --- a/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java +++ b/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java @@ -19,14 +19,24 @@ package net.spy.memcached.protocol.ascii; +import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import net.spy.memcached.collection.CollectionPipedInsert; +import net.spy.memcached.ops.CollectionPipedInsertOperation; +import net.spy.memcached.ops.Operation; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationException; +import net.spy.memcached.ops.OperationStatus; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -99,6 +109,51 @@ void testPartialLine() throws Exception { assertEquals("this is a test", op.getCurrentLine()); } + @Test + void throwExceptionAfterReadingEndOrPipeError() throws Exception { + String key = "testPipeLine"; + CollectionPipedInsert.ListPipedInsert insert = + new CollectionPipedInsert.ListPipedInsert<>(key, 0, + Arrays.asList("a", "b"), null, null); + OperationCallback cb = new CollectionPipedInsertOperation.Callback() { + @Override + public void receivedStatus(OperationStatus status) { + } + + @Override + public void complete() { + } + + @Override + public void gotStatus(Integer index, OperationStatus status) { + } + }; + CollectionPipedInsertOperationImpl op = + new CollectionPipedInsertOperationImpl("test", insert, cb); + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); + op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211), + 60, queue, queue, queue, 0L)); + + ByteBuffer b = ByteBuffer.allocate(40); + String line1 = "RESPONSE 2\r\n"; + op.writeComplete(); + b.put(line1.getBytes()); + b.flip(); + assertDoesNotThrow(() -> op.readFromBuffer(b)); + b.clear(); + + String line2 = "SERVER_ERROR out of memory\r\n"; + b.put(line2.getBytes()); + b.flip(); + assertDoesNotThrow(() -> op.readFromBuffer(b)); + b.clear(); + + String line4 = "PIPE_ERROR failed\r\n"; + b.put(line4.getBytes()); + b.flip(); + assertThrows(OperationException.class, () -> op.readFromBuffer(b)); + } + private static class SimpleOp extends OperationImpl { private final LinkedList lines = new LinkedList<>();