diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java index 6db3e143519..fb7102d08a2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessor.java @@ -56,18 +56,23 @@ protected ReferenceCounted readData() throws Exception { } long maxSize = Math.min(batchRequest.getMaxSize(), maxBatchReadSize); //See BookieProtoEncoding.ResponseEnDeCoderPreV3#encode on BatchedReadResponse case. - long frameSize = 24 + 8 + 4; + long frameSize = 24 + 8 + Integer.BYTES; for (int i = 0; i < maxCount; i++) { try { ByteBuf entry = requestProcessor.getBookie().readEntry(request.getLedgerId(), request.getEntryId() + i); - frameSize += entry.readableBytes() + 4; if (data == null) { + frameSize += entry.readableBytes() + Integer.BYTES; data = ByteBufList.get(entry); + long perEntrySize = entry.readableBytes() + Integer.BYTES; + long remainingBudget = maxSize - frameSize; + long remainingEntries = remainingBudget > 0 ? remainingBudget / Math.max(perEntrySize, 1L) : 0L; + maxCount = (int) Math.min(maxCount, 1L + remainingEntries); } else { - if (frameSize > maxSize) { + if (frameSize + entry.readableBytes() + Integer.BYTES > maxSize) { entry.release(); break; } + frameSize += entry.readableBytes() + Integer.BYTES; data.add(entry); } } catch (Throwable e) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 93ace7eacb1..75c9565f260 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -816,6 +816,16 @@ public void testBatchReadWithV2Protocol() throws Exception { entries++; } assertEquals(expectEntriesNum, entries); + + // The first entry is still returned even when maxSize is smaller than a single entry frame. + entries = 0; + for (Enumeration readEntries = lh.batchReadEntries(0, 20, headerSize); + readEntries.hasMoreElements();) { + LedgerEntry entry = readEntries.nextElement(); + assertArrayEquals(data, entry.getEntry()); + entries++; + } + assertEquals(1, entries); } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java index 3f897558384..ed2e90bfefd 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BatchedReadEntryProcessorTest.java @@ -19,11 +19,14 @@ package org.apache.bookkeeper.proto; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +49,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.proto.BookieProtocol.Response; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.ByteBufList; import org.junit.Before; import org.junit.Test; @@ -221,4 +225,171 @@ public void testNonFenceRequest() throws Exception { assertEquals(BookieProtocol.BATCH_READ_ENTRY, response.getOpCode()); assertEquals(BookieProtocol.EOK, response.getErrorCode()); } -} \ No newline at end of file + + @Test + public void testReadDataPredictsMaxCountFromUniformFirstEntrySize() throws Exception { + long ledgerId = 1234L; + long firstEntryId = 1L; + int entrySize = 20; + long maxSize = 24 + 8 + Integer.BYTES + (entrySize + Integer.BYTES) * 2L; + + ByteBuf firstEntry = entryBuffer(entrySize); + ByteBuf secondEntry = entryBuffer(entrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1))).thenReturn(secondEntry); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(2, data.size()); + } finally { + data.release(); + } + + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId)); + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId + 1)); + verify(bookie, never()).readEntry(eq(ledgerId), eq(firstEntryId + 2)); + } + + @Test + public void testReadDataReturnsFirstEntryEvenIfItAloneExceedsMaxSize() throws Exception { + long ledgerId = 1235L; + long firstEntryId = 1L; + int firstEntrySize = 20; + long maxSize = 50; + + ByteBuf firstEntry = entryBuffer(firstEntrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(1, data.size()); + } finally { + data.release(); + } + + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId)); + verify(bookie, never()).readEntry(eq(ledgerId), eq(firstEntryId + 1)); + } + + @Test + public void testReadDataReleasesOneOverReadEntryWhenSizesGrow() throws Exception { + long ledgerId = 1236L; + long firstEntryId = 1L; + int firstEntrySize = 10; + int secondEntrySize = 40; + long maxSize = 80; + + ByteBuf firstEntry = entryBuffer(firstEntrySize); + ByteBuf secondEntry = entryBuffer(secondEntrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1))).thenReturn(secondEntry); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, maxSize); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(1, data.size()); + assertEquals(0, secondEntry.refCnt()); + } finally { + data.release(); + } + + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId)); + verify(bookie, times(1)).readEntry(eq(ledgerId), eq(firstEntryId + 1)); + verify(bookie, never()).readEntry(eq(ledgerId), eq(firstEntryId + 2)); + } + + @Test + public void testReadDataStopsOnMissingSubsequentEntry() throws Exception { + long ledgerId = 1237L; + long firstEntryId = 1L; + int firstEntrySize = 20; + + ByteBuf firstEntry = entryBuffer(firstEntrySize); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1))) + .thenThrow(new Bookie.NoEntryException(ledgerId, firstEntryId + 1)); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(1, data.size()); + } finally { + data.release(); + } + } + + @Test + public void testReadDataStopsOnIOExceptionAfterFirstEntry() throws Exception { + long ledgerId = 1238L; + long firstEntryId = 1L; + + ByteBuf firstEntry = entryBuffer(20); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1))) + .thenThrow(new IOException("disk error")); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024); + ByteBufList data = (ByteBufList) processor.readData(); + assertNotNull(data); + try { + assertEquals(1, data.size()); + } finally { + data.release(); + } + } + + @Test + public void testProcessPacketReturnsPrefixWhenSubsequentReadFails() throws Exception { + ChannelPromise promise = new DefaultChannelPromise(channel); + AtomicReference writtenObject = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + writtenObject.set(invocationOnMock.getArgument(0)); + promise.setSuccess(); + latch.countDown(); + return promise; + }).when(channel).writeAndFlush(any(Response.class)); + + long ledgerId = 1239L; + long firstEntryId = 1L; + ByteBuf firstEntry = entryBuffer(20); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId))).thenReturn(firstEntry); + when(bookie.readEntry(eq(ledgerId), eq(firstEntryId + 1))) + .thenThrow(new IOException("disk error")); + + BatchedReadEntryProcessor processor = createProcessor(ledgerId, firstEntryId, 5, 1024); + processor.run(); + + latch.await(); + assertTrue(writtenObject.get() instanceof Response); + BookieProtocol.BatchedReadResponse response = (BookieProtocol.BatchedReadResponse) writtenObject.get(); + try { + assertEquals(BookieProtocol.EOK, response.getErrorCode()); + assertEquals(1, response.getData().size()); + } finally { + response.release(); + } + assertEquals(0, firstEntry.refCnt()); + } + + private BatchedReadEntryProcessor createProcessor(long ledgerId, long entryId, int maxCount, long maxSize) { + ExecutorService service = mock(ExecutorService.class); + BookieProtocol.BatchedReadRequest request = BookieProtocol.BatchedReadRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, BookieProtocol.FLAG_NONE, new byte[] {}, + 0L, maxCount, maxSize); + return BatchedReadEntryProcessor.create(request, requestHandler, requestProcessor, service, true, + 5 * 1024 * 1024); + } + + private static ByteBuf entryBuffer(int size) { + ByteBuf entry = ByteBufAllocator.DEFAULT.buffer(size); + entry.writeZero(size); + return entry; + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java index 0cb3ad954fb..9fb0bc9c69e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java @@ -140,19 +140,35 @@ public ByteBufList batchReadEntries(BookieId bookieId, int flags, long ledgerId, if (maxCount <= 0) { maxCount = Integer.MAX_VALUE; } - long frameSize = 24 + 8 + 4; + long frameSize = 24 + 8 + Integer.BYTES; for (long i = startEntryId; i < startEntryId + maxCount; i++) { - ByteBuf entry = ledger.getEntry(i); - frameSize += entry.readableBytes() + 4; - if (data == null) { - data = ByteBufList.get(entry); - } else { - if (frameSize > maxSize) { - entry.release(); - break; - } - data.add(entry); - } + ByteBuf entry = ledger.getEntry(i); + if (data == null) { + if (entry == null) { + LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i); + throw new BKException.BKNoSuchEntryException(); + } + frameSize += entry.readableBytes() + Integer.BYTES; + data = ByteBufList.get(entry); + long perEntrySize = entry.readableBytes() + Integer.BYTES; + long remainingBudget = maxSize - frameSize; + long remainingEntries = remainingBudget > 0 ? remainingBudget / Math.max(perEntrySize, 1L) : 0L; + maxCount = (int) Math.min(maxCount, 1L + remainingEntries); + continue; + } + + if (entry == null) { + LOG.warn("[{};L{}] entry({}) not found", bookieId, ledgerId, i); + break; + } + + if (frameSize + entry.readableBytes() + Integer.BYTES > maxSize) { + // MockLedgerData returns the stored shared buffer, so this path does not own the skipped entry. + break; + } + + frameSize += entry.readableBytes() + Integer.BYTES; + data.add(entry); } return data; } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java new file mode 100644 index 00000000000..4c0421f8f13 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookiesTest.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.proto; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.util.ByteBufList; +import org.junit.Test; + +public class MockBookiesTest { + + private static final BookieId BOOKIE_ID = BookieId.parse("127.0.0.1:3181"); + private static final long LEDGER_ID = 1L; + private static final long BATCH_RESPONSE_HEADER_SIZE = 24 + 8 + 4; + + @Test + public void testBatchReadStopsOnMissingSubsequentEntry() throws Exception { + MockBookies mockBookies = new MockBookies(); + mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8)); + + ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, Long.MAX_VALUE); + + assertNotNull(data); + assertEquals(1, data.size()); + assertEquals(8, data.getBuffer(0).readableBytes()); + } + + @Test + public void testBatchReadDoesNotReleaseOversizedSkippedEntry() throws Exception { + MockBookies mockBookies = new MockBookies(); + mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 0L, newEntry(8)); + mockBookies.addEntry(BOOKIE_ID, LEDGER_ID, 1L, newEntry(16)); + + long maxSize = BATCH_RESPONSE_HEADER_SIZE + 8 + Integer.BYTES + 16 + Integer.BYTES - 1; + ByteBufList data = mockBookies.batchReadEntries(BOOKIE_ID, 0, LEDGER_ID, 0L, 2, maxSize); + + assertNotNull(data); + assertEquals(1, data.size()); + assertEquals(8, data.getBuffer(0).readableBytes()); + assertEquals(16, mockBookies.readEntry(BOOKIE_ID, 0, LEDGER_ID, 1L).readableBytes()); + } + + private static ByteBuf newEntry(int size) { + return Unpooled.buffer(size).writeZero(size); + } +}