001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes; 022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; 023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents; 024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; 025import static org.hamcrest.MatcherAssert.assertThat; 026import static org.hamcrest.Matchers.allOf; 027import static org.hamcrest.Matchers.containsString; 028import static org.hamcrest.Matchers.hasItem; 029import static org.hamcrest.Matchers.hasItems; 030import static org.junit.jupiter.api.Assertions.assertArrayEquals; 031import static org.junit.jupiter.api.Assertions.assertEquals; 032import static org.junit.jupiter.api.Assertions.assertFalse; 033import static org.junit.jupiter.api.Assertions.assertThrows; 034import static org.junit.jupiter.api.Assertions.assertTrue; 035import static org.junit.jupiter.api.Assumptions.assumeTrue; 036import static org.mockito.ArgumentMatchers.anyString; 037import static org.mockito.Mockito.mock; 038import static org.mockito.Mockito.verify; 039import static org.mockito.Mockito.verifyNoMoreInteractions; 040import static org.mockito.Mockito.when; 041 042import io.opentelemetry.api.trace.Span; 043import io.opentelemetry.context.Scope; 044import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; 045import io.opentelemetry.sdk.trace.data.SpanData; 046import java.io.DataOutputStream; 047import java.io.IOException; 048import java.io.InputStream; 049import java.nio.ByteBuffer; 050import java.util.Random; 051import java.util.concurrent.TimeUnit; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.fs.FSDataInputStream; 054import org.apache.hadoop.fs.FSDataOutputStream; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.hbase.HBaseTestingUtil; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.MatcherPredicate; 060import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers; 061import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers; 062import org.apache.hadoop.hbase.fs.HFileSystem; 063import org.apache.hadoop.hbase.io.ByteBuffAllocator; 064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; 065import org.apache.hadoop.hbase.io.compress.Compression; 066import org.apache.hadoop.hbase.io.util.BlockIOUtils; 067import org.apache.hadoop.hbase.nio.ByteBuff; 068import org.apache.hadoop.hbase.nio.MultiByteBuff; 069import org.apache.hadoop.hbase.nio.SingleByteBuff; 070import org.apache.hadoop.hbase.testclassification.IOTests; 071import org.apache.hadoop.hbase.testclassification.SmallTests; 072import org.apache.hadoop.hbase.trace.TraceUtil; 073import org.apache.hadoop.hbase.util.Bytes; 074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 075import org.junit.jupiter.api.BeforeEach; 076import org.junit.jupiter.api.Tag; 077import org.junit.jupiter.api.Test; 078import org.junit.jupiter.api.TestInfo; 079import org.junit.jupiter.api.extension.RegisterExtension; 080 081@Tag(IOTests.TAG) 082@Tag(SmallTests.TAG) 083public class TestBlockIOUtils { 084 085 private String methodName; 086 087 @RegisterExtension 088 private static OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create(); 089 090 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 091 092 private static final int NUM_TEST_BLOCKS = 2; 093 private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ; 094 095 @BeforeEach 096 public void setUp(TestInfo testInfo) { 097 methodName = testInfo.getTestMethod().get().getName(); 098 } 099 100 @Test 101 public void testIsByteBufferReadable() throws IOException { 102 FileSystem fs = TEST_UTIL.getTestFileSystem(); 103 Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable"); 104 try (FSDataOutputStream out = fs.create(p)) { 105 out.writeInt(23); 106 } 107 try (FSDataInputStream is = fs.open(p)) { 108 assertFalse(BlockIOUtils.isByteBufferReadable(is)); 109 } 110 } 111 112 @Test 113 public void testReadFully() throws IOException { 114 TraceUtil.trace(() -> { 115 FileSystem fs = TEST_UTIL.getTestFileSystem(); 116 Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); 117 String s = "hello world"; 118 try (FSDataOutputStream out = fs.create(p)) { 119 out.writeBytes(s); 120 } 121 ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11)); 122 try (FSDataInputStream in = fs.open(p)) { 123 BlockIOUtils.readFully(buf, in, 11); 124 } 125 buf.rewind(); 126 byte[] heapBuf = new byte[s.length()]; 127 buf.get(heapBuf, 0, heapBuf.length); 128 assertArrayEquals(Bytes.toBytes(s), heapBuf); 129 }, methodName); 130 131 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 132 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 133 assertThat(OTEL_EXT.getSpans(), 134 hasItems(allOf(hasName(methodName), 135 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"), 136 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11)))))))); 137 } 138 139 @Test 140 public void testPreadWithReadFullBytes() throws IOException { 141 testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime()); 142 } 143 144 @Test 145 public void testPreadWithoutReadFullBytes() throws IOException { 146 testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime()); 147 } 148 149 private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed) 150 throws IOException { 151 Configuration conf = TEST_UTIL.getConfiguration(); 152 conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes); 153 FileSystem fs = TEST_UTIL.getTestFileSystem(); 154 Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), methodName); 155 // give a fixed seed such we can see failure easily. 156 Random rand = new Random(randomSeed); 157 long totalDataBlockBytes = 158 writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path); 159 readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes); 160 } 161 162 private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo, 163 Path path) throws IOException { 164 FileSystem fs = HFileSystem.get(conf); 165 FSDataOutputStream os = fs.create(path); 166 HFileContext meta = 167 new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); 168 HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); 169 long totalDataBlockBytes = 0; 170 for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { 171 int blockTypeOrdinal = rand.nextInt(BlockType.values().length); 172 if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { 173 blockTypeOrdinal = BlockType.DATA.ordinal(); 174 } 175 BlockType bt = BlockType.values()[blockTypeOrdinal]; 176 DataOutputStream dos = hbw.startWriting(bt); 177 int size = rand.nextInt(500); 178 for (int j = 0; j < size; ++j) { 179 dos.writeShort(i + 1); 180 dos.writeInt(j + 1); 181 } 182 183 hbw.writeHeaderAndData(os); 184 totalDataBlockBytes += hbw.getOnDiskSizeWithHeader(); 185 } 186 // append a dummy trailer and in a actual HFile it should have more data. 187 FixedFileTrailer trailer = new FixedFileTrailer(3, 3); 188 trailer.setFirstDataBlockOffset(0); 189 trailer.setLastDataBlockOffset(totalDataBlockBytes); 190 trailer.setComparatorClass(meta.getCellComparator().getClass()); 191 trailer.setDataIndexCount(NUM_TEST_BLOCKS); 192 trailer.setCompressionCodec(compressAlgo); 193 trailer.serialize(os); 194 // close the stream 195 os.close(); 196 return totalDataBlockBytes; 197 } 198 199 private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo, 200 long totalDataBlockBytes) throws IOException { 201 FSDataInputStream is = fs.open(path); 202 HFileContext fileContext = 203 new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); 204 ReaderContext context = 205 new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) 206 .withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes) 207 .withFilePath(path).withFileSystem(fs).build(); 208 HFileBlock.FSReader hbr = 209 new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf()); 210 211 long onDiskSizeOfNextBlock = -1; 212 long offset = 0; 213 int numOfReadBlock = 0; 214 // offset and totalBytes shares the same logic in the HFilePreadReader 215 while (offset < totalDataBlockBytes) { 216 HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false); 217 numOfReadBlock++; 218 try { 219 onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); 220 offset += block.getOnDiskSizeWithHeader(); 221 } finally { 222 block.release(); 223 } 224 } 225 assertEquals(totalDataBlockBytes, offset); 226 assertEquals(NUM_TEST_BLOCKS, numOfReadBlock); 227 deleteFile(fs, path); 228 } 229 230 private void deleteFile(FileSystem fs, Path path) throws IOException { 231 if (fs.exists(path)) { 232 fs.delete(path, true); 233 } 234 } 235 236 @Test 237 public void testReadWithExtra() throws IOException { 238 FileSystem fs = TEST_UTIL.getTestFileSystem(); 239 Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra"); 240 String s = "hello world"; 241 try (FSDataOutputStream out = fs.create(p)) { 242 out.writeBytes(s); 243 } 244 245 Span span = TraceUtil.createSpan(methodName); 246 try (Scope ignored = span.makeCurrent()) { 247 ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); 248 try (FSDataInputStream in = fs.open(p)) { 249 assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); 250 } 251 buf.rewind(); 252 byte[] heapBuf = new byte[buf.capacity()]; 253 buf.get(heapBuf, 0, heapBuf.length); 254 assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); 255 } finally { 256 span.end(); 257 } 258 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 259 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 260 assertThat(OTEL_EXT.getSpans(), 261 hasItems(allOf(hasName(methodName), 262 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), 263 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 8L)))))))); 264 265 OTEL_EXT.clearSpans(); 266 span = TraceUtil.createSpan(methodName); 267 try (Scope ignored = span.makeCurrent()) { 268 ByteBuff buf = 269 new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); 270 try (FSDataInputStream in = fs.open(p)) { 271 assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); 272 } 273 buf.rewind(); 274 byte[] heapBuf = new byte[11]; 275 buf.get(heapBuf, 0, heapBuf.length); 276 assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); 277 } finally { 278 span.end(); 279 } 280 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 281 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 282 assertThat(OTEL_EXT.getSpans(), 283 hasItems(allOf(hasName(methodName), 284 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), 285 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); 286 287 OTEL_EXT.clearSpans(); 288 span = TraceUtil.createSpan(methodName); 289 try (Scope ignored = span.makeCurrent()) { 290 ByteBuff buf = 291 new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); 292 buf.position(0).limit(12); 293 try (FSDataInputStream in = fs.open(p)) { 294 assertThrows(IOException.class, () -> BlockIOUtils.readWithExtra(buf, in, 12, 0), 295 "Should only read 11 bytes"); 296 } 297 } finally { 298 span.end(); 299 } 300 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 301 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 302 assertThat(OTEL_EXT.getSpans(), 303 hasItems(allOf(hasName(methodName), 304 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"), 305 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L)))))))); 306 } 307 308 @Test 309 public void testPositionalReadNoExtra() throws IOException { 310 long position = 0; 311 int bufOffset = 0; 312 int necessaryLen = 10; 313 int extraLen = 0; 314 int totalLen = necessaryLen + extraLen; 315 byte[] buf = new byte[totalLen]; 316 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 317 FSDataInputStream in = mock(FSDataInputStream.class); 318 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); 319 when(in.hasCapability(anyString())).thenReturn(false); 320 boolean ret = TraceUtil.trace( 321 () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName); 322 assertFalse(ret, "Expect false return when no extra bytes requested"); 323 verify(in).read(position, buf, bufOffset, totalLen); 324 verify(in).hasCapability(anyString()); 325 verifyNoMoreInteractions(in); 326 327 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 328 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 329 assertThat(OTEL_EXT.getSpans(), 330 hasItems(allOf(hasName(methodName), 331 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 332 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 333 } 334 335 @Test 336 public void testPositionalReadShortReadOfNecessaryBytes() throws IOException { 337 long position = 0; 338 int bufOffset = 0; 339 int necessaryLen = 10; 340 int extraLen = 0; 341 int totalLen = necessaryLen + extraLen; 342 byte[] buf = new byte[totalLen]; 343 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 344 FSDataInputStream in = mock(FSDataInputStream.class); 345 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); 346 when(in.read(5, buf, 5, 5)).thenReturn(5); 347 when(in.hasCapability(anyString())).thenReturn(false); 348 boolean ret = TraceUtil.trace( 349 () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName); 350 assertFalse(ret, "Expect false return when no extra bytes requested"); 351 verify(in).read(position, buf, bufOffset, totalLen); 352 verify(in).read(5, buf, 5, 5); 353 verify(in).hasCapability(anyString()); 354 verifyNoMoreInteractions(in); 355 356 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 357 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 358 assertThat(OTEL_EXT.getSpans(), 359 hasItems(allOf(hasName(methodName), 360 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 361 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 362 } 363 364 @Test 365 public void testPositionalReadExtraSucceeded() throws IOException { 366 long position = 0; 367 int bufOffset = 0; 368 int necessaryLen = 10; 369 int extraLen = 5; 370 int totalLen = necessaryLen + extraLen; 371 byte[] buf = new byte[totalLen]; 372 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 373 FSDataInputStream in = mock(FSDataInputStream.class); 374 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); 375 when(in.hasCapability(anyString())).thenReturn(false); 376 boolean ret = TraceUtil.trace( 377 () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName); 378 assertTrue(ret, "Expect true return when reading extra bytes succeeds"); 379 verify(in).read(position, buf, bufOffset, totalLen); 380 verify(in).hasCapability(anyString()); 381 verifyNoMoreInteractions(in); 382 383 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 384 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 385 assertThat(OTEL_EXT.getSpans(), 386 hasItems(allOf(hasName(methodName), 387 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 388 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 389 } 390 391 @Test 392 public void testPositionalReadExtraFailed() throws IOException { 393 long position = 0; 394 int bufOffset = 0; 395 int necessaryLen = 10; 396 int extraLen = 5; 397 int totalLen = necessaryLen + extraLen; 398 byte[] buf = new byte[totalLen]; 399 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 400 FSDataInputStream in = mock(FSDataInputStream.class); 401 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); 402 when(in.hasCapability(anyString())).thenReturn(false); 403 boolean ret = TraceUtil.trace( 404 () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName); 405 assertFalse(ret, "Expect false return when reading extra bytes fails"); 406 verify(in).read(position, buf, bufOffset, totalLen); 407 verify(in).hasCapability(anyString()); 408 verifyNoMoreInteractions(in); 409 410 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 411 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 412 assertThat(OTEL_EXT.getSpans(), 413 hasItems(allOf(hasName(methodName), 414 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 415 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", necessaryLen)))))))); 416 } 417 418 @Test 419 public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes() throws IOException { 420 long position = 0; 421 int bufOffset = 0; 422 int necessaryLen = 10; 423 int extraLen = 5; 424 int totalLen = necessaryLen + extraLen; 425 byte[] buf = new byte[totalLen]; 426 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 427 FSDataInputStream in = mock(FSDataInputStream.class); 428 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); 429 when(in.read(5, buf, 5, 10)).thenReturn(10); 430 when(in.hasCapability(anyString())).thenReturn(false); 431 boolean ret = TraceUtil.trace( 432 () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName); 433 assertTrue(ret, "Expect true return when reading extra bytes succeeds"); 434 verify(in).read(position, buf, bufOffset, totalLen); 435 verify(in).read(5, buf, 5, 10); 436 verify(in).hasCapability(anyString()); 437 verifyNoMoreInteractions(in); 438 439 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 440 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 441 assertThat(OTEL_EXT.getSpans(), 442 hasItems(allOf(hasName(methodName), 443 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 444 hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen)))))))); 445 } 446 447 @Test 448 public void testPositionalReadPrematureEOF() throws IOException { 449 long position = 0; 450 int bufOffset = 0; 451 int necessaryLen = 10; 452 int extraLen = 0; 453 int totalLen = necessaryLen + extraLen; 454 byte[] buf = new byte[totalLen]; 455 ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); 456 FSDataInputStream in = mock(FSDataInputStream.class); 457 when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1); 458 when(in.hasCapability(anyString())).thenReturn(false); 459 Span span = TraceUtil.createSpan(methodName); 460 try (Scope ignored = span.makeCurrent()) { 461 IOException e = assertThrows(IOException.class, 462 () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen)); 463 assertThat(e.getMessage(), containsString("EOF")); 464 TraceUtil.setError(span, e); 465 } finally { 466 span.end(); 467 } 468 TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>( 469 OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded())))); 470 assertThat(OTEL_EXT.getSpans(), 471 hasItems(allOf(hasName(methodName), 472 hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"), 473 hasAttributes(AttributesMatchers.isEmpty()))))))); 474 475 verify(in).read(position, buf, bufOffset, totalLen); 476 verify(in).hasCapability(anyString()); 477 verifyNoMoreInteractions(in); 478 } 479 480 /** 481 * Determine if ByteBufferPositionedReadable API is available . 482 * @return true if FSDataInputStream implements ByteBufferPositionedReadable API. 483 */ 484 private boolean isByteBufferPositionedReadable() { 485 try { 486 // long position, ByteBuffer buf 487 FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class); 488 } catch (NoSuchMethodException e) { 489 return false; 490 } 491 return true; 492 } 493 494 public static class MyFSDataInputStream extends FSDataInputStream { 495 public MyFSDataInputStream(InputStream in) { 496 super(in); 497 } 498 499 // This is the ByteBufferPositionReadable API we want to test. 500 // Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop 501 // does not implement the interface, and it wouldn't compile trying to mock the method. 502 // So explicitly declare the method here to make mocking possible. 503 public int read(long position, ByteBuffer buf) throws IOException { 504 return 0; 505 } 506 } 507 508 @Test 509 public void testByteBufferPositionedReadable() throws IOException { 510 assumeTrue(isByteBufferPositionedReadable(), 511 "Skip the test because ByteBufferPositionedReadable is not available"); 512 long position = 0; 513 int necessaryLen = 10; 514 int extraLen = 1; 515 int totalLen = necessaryLen + extraLen; 516 int firstReadLen = 6; 517 int secondReadLen = totalLen - firstReadLen; 518 ByteBuffer buf = ByteBuffer.allocate(totalLen); 519 ByteBuff bb = new SingleByteBuff(buf); 520 MyFSDataInputStream in = mock(MyFSDataInputStream.class); 521 522 when(in.read(position, buf)).thenReturn(firstReadLen); 523 when(in.read(firstReadLen, buf)).thenReturn(secondReadLen); 524 when(in.hasCapability(anyString())).thenReturn(true); 525 boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); 526 assertTrue(ret, "Expect true return when reading extra bytes succeeds"); 527 verify(in).read(position, buf); 528 verify(in).read(firstReadLen, buf); 529 verify(in).hasCapability(anyString()); 530 verifyNoMoreInteractions(in); 531 } 532 533 @Test 534 public void testByteBufferPositionedReadableEOF() throws IOException { 535 assumeTrue(isByteBufferPositionedReadable(), 536 "Skip the test because ByteBufferPositionedReadable is not available"); 537 long position = 0; 538 int necessaryLen = 10; 539 int extraLen = 0; 540 int totalLen = necessaryLen + extraLen; 541 int firstReadLen = 9; 542 ByteBuffer buf = ByteBuffer.allocate(totalLen); 543 ByteBuff bb = new SingleByteBuff(buf); 544 MyFSDataInputStream in = mock(MyFSDataInputStream.class); 545 546 when(in.read(position, buf)).thenReturn(firstReadLen); 547 when(in.read(firstReadLen, buf)).thenReturn(-1); 548 when(in.hasCapability(anyString())).thenReturn(true); 549 IOException e = assertThrows(IOException.class, 550 () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen)); 551 assertThat(e.getMessage(), containsString("EOF")); 552 553 verify(in).read(position, buf); 554 verify(in).read(firstReadLen, buf); 555 verify(in).hasCapability(anyString()); 556 verifyNoMoreInteractions(in); 557 } 558}