From 99aaf0d9dbd08d1c925c0a188964f944fe7805c0 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 14 May 2024 10:55:29 +0800 Subject: [PATCH] improve of bookie-shell ReadLedgerCommand --- .../commands/bookie/ReadLedgerCommand.java | 45 ++++++++------- .../bookie/ReadLedgerCommandTest.java | 57 +++++++++++++++++-- 2 files changed, 78 insertions(+), 24 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommand.java index 20b4232e314..4cc1791a784 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommand.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommand.java @@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.LongStream; import lombok.Setter; import lombok.experimental.Accessors; import org.apache.bookkeeper.client.BKException; @@ -189,33 +188,39 @@ private boolean readledger(ServerConfiguration serverConf, ReadLedgerFlags flags executor, scheduler, NullStatsLogger.INSTANCE, bk.getBookieAddressResolver()); - LongStream.range(flags.firstEntryId, lastEntry).forEach(entryId -> { + long nextEntryId = flags.firstEntryId; + while (lastEntry == -1 || nextEntryId <= lastEntry) { CompletableFuture future = new CompletableFuture<>(); - bookieClient.readEntry(bookie, flags.ledgerId, entryId, - (rc, ledgerId1, entryId1, buffer, ctx) -> { - if (rc != BKException.Code.OK) { - LOG.error("Failed to read entry {} -- {}", entryId1, - BKException.getMessage(rc)); - future.completeExceptionally(BKException.create(rc)); - return; - } - - LOG.info("--------- Lid={}, Eid={} ---------", - ledgerIdFormatter.formatLedgerId(flags.ledgerId), entryId); - if (flags.msg) { - LOG.info("Data: " + ByteBufUtil.prettyHexDump(buffer)); - } - - future.complete(null); - }, null, BookieProtocol.FLAG_NONE); + long entryId = nextEntryId; + bookieClient.readEntry(bookie, flags.ledgerId, nextEntryId, + (rc, ledgerId1, entryId1, buffer, ctx) -> { + if (rc != BKException.Code.OK) { + LOG.error("Failed to read entry {} -- {}", entryId1, + BKException.getMessage(rc)); + future.completeExceptionally(BKException.create(rc)); + return; + } + + LOG.info("--------- Lid={}, Eid={} ---------", + ledgerIdFormatter.formatLedgerId(flags.ledgerId), entryId); + if (flags.msg) { + LOG.info("Data: " + ByteBufUtil.prettyHexDump(buffer)); + } + + future.complete(null); + }, null, BookieProtocol.FLAG_NONE); try { future.get(); } catch (Exception e) { LOG.error("Error future.get while reading entries from ledger {}", flags.ledgerId, e); + if (e.getCause() instanceof BKException.BKNoSuchEntryException) { + break; + } } - }); + ++nextEntryId; + } eventLoopGroup.shutdownGracefully(); executor.shutdown(); diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommandTest.java index 3f80d31dba1..e67c3282f9d 100644 --- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommandTest.java +++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadLedgerCommandTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -35,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import lombok.SneakyThrows; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; @@ -42,9 +44,11 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClientImpl; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase; import org.junit.Assert; import org.junit.Test; +import org.mockito.stubbing.Stubber; /** * Unit test for {@link ReadLedgerCommand}. @@ -63,6 +67,30 @@ public ReadLedgerCommandTest() { super(3, 0); } + protected void mockBookieClientImplConstruction(LedgerEntry entry) { + mockConstruction(BookieClientImpl.class, (bookieClient, context) -> { + Stubber stub = doAnswer(invokation -> { + Object[] args = invokation.getArguments(); + long ledgerId = (Long) args[1]; + long entryId = (Long) args[2]; + BookkeeperInternalCallbacks.ReadEntryCallback callback = + (BookkeeperInternalCallbacks.ReadEntryCallback) args[3]; + + if (entryId <= 10) { + callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, entry.getEntryBuffer(), + args[4]); + } else { + callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, entryId, null, args[4]); + } + return null; + }); + + stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(), + any(BookkeeperInternalCallbacks.ReadEntryCallback.class), + any(), anyInt()); + }); + } + @Override public void setup() throws Exception { super.setup(); @@ -93,6 +121,8 @@ public void accept(BookKeeperAdmin bookKeeperAdmin) { } }); + mockBookieClientImplConstruction(entry); + mockConstruction(NioEventLoopGroup.class); @@ -110,9 +140,6 @@ public void accept(BookKeeperAdmin bookKeeperAdmin) { .newSingleThreadScheduledExecutor(any(DefaultThreadFactory.class))) .thenReturn(scheduledExecutorService); - mockConstruction(BookieClientImpl.class); - - } @Test @@ -128,7 +155,7 @@ public void testWithoutBookieAddress() throws Exception { } @Test - public void testWithBookieAddress() throws Exception { + public void testWithBookieAddressWithoutEntryRange() throws Exception { ReadLedgerCommand cmd = new ReadLedgerCommand(); Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-b", bookieSocketAddress.getId() })); Assert.assertEquals(1, getMockedConstruction(NioEventLoopGroup.class).constructed().size()); @@ -137,6 +164,28 @@ public void testWithBookieAddress() throws Exception { verify(getMockedConstruction(NioEventLoopGroup.class).constructed().get(0), times(1)).shutdownGracefully(); verify(orderedExecutor, times(1)).shutdown(); verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(1)).close(); + // read from default entry -1 to entry 11. entry 11 is not found + verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(13)) + .readEntry(any(), anyLong(), anyLong(), + any(BookkeeperInternalCallbacks.ReadEntryCallback.class), + any(), anyInt()); } + @Test + public void testWithBookieAddressWithEntryRange() throws Exception { + ReadLedgerCommand cmd = new ReadLedgerCommand(); + Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-b", + bookieSocketAddress.getId(), "-fe", "5", "-le", "100" })); + Assert.assertEquals(1, getMockedConstruction(NioEventLoopGroup.class).constructed().size()); + Assert.assertEquals(1, getMockedConstruction(DefaultThreadFactory.class).constructed().size()); + Assert.assertEquals(1, getMockedConstruction(BookieClientImpl.class).constructed().size()); + verify(getMockedConstruction(NioEventLoopGroup.class).constructed().get(0), times(1)).shutdownGracefully(); + verify(orderedExecutor, times(1)).shutdown(); + verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(1)).close(); + // read from entry 5 to entry 11. entry 11 is not found + verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(7)) + .readEntry(any(), anyLong(), anyLong(), + any(BookkeeperInternalCallbacks.ReadEntryCallback.class), + any(), anyInt()); + } }