Skip to content

Commit

Permalink
improve of bookie-shell ReadLedgerCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
fanjianye committed May 14, 2024
1 parent 7c0e16b commit 99aaf0d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.LongStream;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -189,33 +188,39 @@ private boolean readledger(ServerConfiguration serverConf, ReadLedgerFlags flags
executor, scheduler, NullStatsLogger.INSTANCE,
bk.getBookieAddressResolver());

LongStream.range(flags.firstEntryId, lastEntry).forEach(entryId -> {
long nextEntryId = flags.firstEntryId;
while (lastEntry == -1 || nextEntryId <= lastEntry) {
CompletableFuture<Void> future = new CompletableFuture<>();

bookieClient.readEntry(bookie, flags.ledgerId, entryId,
(rc, ledgerId1, entryId1, buffer, ctx) -> {
if (rc != BKException.Code.OK) {
LOG.error("Failed to read entry {} -- {}", entryId1,
BKException.getMessage(rc));
future.completeExceptionally(BKException.create(rc));
return;
}

LOG.info("--------- Lid={}, Eid={} ---------",
ledgerIdFormatter.formatLedgerId(flags.ledgerId), entryId);
if (flags.msg) {
LOG.info("Data: " + ByteBufUtil.prettyHexDump(buffer));
}

future.complete(null);
}, null, BookieProtocol.FLAG_NONE);
long entryId = nextEntryId;
bookieClient.readEntry(bookie, flags.ledgerId, nextEntryId,
(rc, ledgerId1, entryId1, buffer, ctx) -> {
if (rc != BKException.Code.OK) {
LOG.error("Failed to read entry {} -- {}", entryId1,
BKException.getMessage(rc));
future.completeExceptionally(BKException.create(rc));
return;
}

LOG.info("--------- Lid={}, Eid={} ---------",
ledgerIdFormatter.formatLedgerId(flags.ledgerId), entryId);
if (flags.msg) {
LOG.info("Data: " + ByteBufUtil.prettyHexDump(buffer));
}

future.complete(null);
}, null, BookieProtocol.FLAG_NONE);

try {
future.get();
} catch (Exception e) {
LOG.error("Error future.get while reading entries from ledger {}", flags.ledgerId, e);
if (e.getCause() instanceof BKException.BKNoSuchEntryException) {
break;
}
}
});
++nextEntryId;
}

eventLoopGroup.shutdownGracefully();
executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -35,16 +36,19 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.stubbing.Stubber;

/**
* Unit test for {@link ReadLedgerCommand}.
Expand All @@ -63,6 +67,30 @@ public ReadLedgerCommandTest() {
super(3, 0);
}

protected void mockBookieClientImplConstruction(LedgerEntry entry) {
mockConstruction(BookieClientImpl.class, (bookieClient, context) -> {
Stubber stub = doAnswer(invokation -> {
Object[] args = invokation.getArguments();
long ledgerId = (Long) args[1];
long entryId = (Long) args[2];
BookkeeperInternalCallbacks.ReadEntryCallback callback =
(BookkeeperInternalCallbacks.ReadEntryCallback) args[3];

if (entryId <= 10) {
callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, entry.getEntryBuffer(),
args[4]);
} else {
callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, entryId, null, args[4]);
}
return null;
});

stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(),
any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
any(), anyInt());
});
}

@Override
public void setup() throws Exception {
super.setup();
Expand Down Expand Up @@ -93,6 +121,8 @@ public void accept(BookKeeperAdmin bookKeeperAdmin) {
}
});

mockBookieClientImplConstruction(entry);

mockConstruction(NioEventLoopGroup.class);


Expand All @@ -110,9 +140,6 @@ public void accept(BookKeeperAdmin bookKeeperAdmin) {
.newSingleThreadScheduledExecutor(any(DefaultThreadFactory.class)))
.thenReturn(scheduledExecutorService);

mockConstruction(BookieClientImpl.class);


}

@Test
Expand All @@ -128,7 +155,7 @@ public void testWithoutBookieAddress() throws Exception {
}

@Test
public void testWithBookieAddress() throws Exception {
public void testWithBookieAddressWithoutEntryRange() throws Exception {
ReadLedgerCommand cmd = new ReadLedgerCommand();
Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-b", bookieSocketAddress.getId() }));
Assert.assertEquals(1, getMockedConstruction(NioEventLoopGroup.class).constructed().size());
Expand All @@ -137,6 +164,28 @@ public void testWithBookieAddress() throws Exception {
verify(getMockedConstruction(NioEventLoopGroup.class).constructed().get(0), times(1)).shutdownGracefully();
verify(orderedExecutor, times(1)).shutdown();
verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(1)).close();
// read from default entry -1 to entry 11. entry 11 is not found
verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(13))
.readEntry(any(), anyLong(), anyLong(),
any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
any(), anyInt());
}

@Test
public void testWithBookieAddressWithEntryRange() throws Exception {
ReadLedgerCommand cmd = new ReadLedgerCommand();
Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-b",
bookieSocketAddress.getId(), "-fe", "5", "-le", "100" }));
Assert.assertEquals(1, getMockedConstruction(NioEventLoopGroup.class).constructed().size());
Assert.assertEquals(1, getMockedConstruction(DefaultThreadFactory.class).constructed().size());
Assert.assertEquals(1, getMockedConstruction(BookieClientImpl.class).constructed().size());
verify(getMockedConstruction(NioEventLoopGroup.class).constructed().get(0), times(1)).shutdownGracefully();
verify(orderedExecutor, times(1)).shutdown();
verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(1)).close();
// read from entry 5 to entry 11. entry 11 is not found
verify(getMockedConstruction(BookieClientImpl.class).constructed().get(0), times(7))
.readEntry(any(), anyLong(), anyLong(),
any(BookkeeperInternalCallbacks.ReadEntryCallback.class),
any(), anyInt());
}
}

0 comments on commit 99aaf0d

Please sign in to comment.