001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.containsString;
028import static org.hamcrest.Matchers.hasItem;
029import static org.hamcrest.Matchers.hasItems;
030import static org.junit.jupiter.api.Assertions.assertArrayEquals;
031import static org.junit.jupiter.api.Assertions.assertEquals;
032import static org.junit.jupiter.api.Assertions.assertFalse;
033import static org.junit.jupiter.api.Assertions.assertThrows;
034import static org.junit.jupiter.api.Assertions.assertTrue;
035import static org.junit.jupiter.api.Assumptions.assumeTrue;
036import static org.mockito.ArgumentMatchers.anyString;
037import static org.mockito.Mockito.mock;
038import static org.mockito.Mockito.verify;
039import static org.mockito.Mockito.verifyNoMoreInteractions;
040import static org.mockito.Mockito.when;
041
042import io.opentelemetry.api.trace.Span;
043import io.opentelemetry.context.Scope;
044import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
045import io.opentelemetry.sdk.trace.data.SpanData;
046import java.io.DataOutputStream;
047import java.io.IOException;
048import java.io.InputStream;
049import java.nio.ByteBuffer;
050import java.util.Random;
051import java.util.concurrent.TimeUnit;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FSDataInputStream;
054import org.apache.hadoop.fs.FSDataOutputStream;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.Path;
057import org.apache.hadoop.hbase.HBaseTestingUtil;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.MatcherPredicate;
060import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers;
061import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ByteBuffAllocator;
064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
065import org.apache.hadoop.hbase.io.compress.Compression;
066import org.apache.hadoop.hbase.io.util.BlockIOUtils;
067import org.apache.hadoop.hbase.nio.ByteBuff;
068import org.apache.hadoop.hbase.nio.MultiByteBuff;
069import org.apache.hadoop.hbase.nio.SingleByteBuff;
070import org.apache.hadoop.hbase.testclassification.IOTests;
071import org.apache.hadoop.hbase.testclassification.SmallTests;
072import org.apache.hadoop.hbase.trace.TraceUtil;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
075import org.junit.jupiter.api.BeforeEach;
076import org.junit.jupiter.api.Tag;
077import org.junit.jupiter.api.Test;
078import org.junit.jupiter.api.TestInfo;
079import org.junit.jupiter.api.extension.RegisterExtension;
080
081@Tag(IOTests.TAG)
082@Tag(SmallTests.TAG)
083public class TestBlockIOUtils {
084
085  private String methodName;
086
087  @RegisterExtension
088  private static OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create();
089
090  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
091
092  private static final int NUM_TEST_BLOCKS = 2;
093  private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;
094
095  @BeforeEach
096  public void setUp(TestInfo testInfo) {
097    methodName = testInfo.getTestMethod().get().getName();
098  }
099
100  @Test
101  public void testIsByteBufferReadable() throws IOException {
102    FileSystem fs = TEST_UTIL.getTestFileSystem();
103    Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable");
104    try (FSDataOutputStream out = fs.create(p)) {
105      out.writeInt(23);
106    }
107    try (FSDataInputStream is = fs.open(p)) {
108      assertFalse(BlockIOUtils.isByteBufferReadable(is));
109    }
110  }
111
112  @Test
113  public void testReadFully() throws IOException {
114    TraceUtil.trace(() -> {
115      FileSystem fs = TEST_UTIL.getTestFileSystem();
116      Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
117      String s = "hello world";
118      try (FSDataOutputStream out = fs.create(p)) {
119        out.writeBytes(s);
120      }
121      ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11));
122      try (FSDataInputStream in = fs.open(p)) {
123        BlockIOUtils.readFully(buf, in, 11);
124      }
125      buf.rewind();
126      byte[] heapBuf = new byte[s.length()];
127      buf.get(heapBuf, 0, heapBuf.length);
128      assertArrayEquals(Bytes.toBytes(s), heapBuf);
129    }, methodName);
130
131    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
132      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
133    assertThat(OTEL_EXT.getSpans(),
134      hasItems(allOf(hasName(methodName),
135        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"),
136          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11))))))));
137  }
138
139  @Test
140  public void testPreadWithReadFullBytes() throws IOException {
141    testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime());
142  }
143
144  @Test
145  public void testPreadWithoutReadFullBytes() throws IOException {
146    testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime());
147  }
148
149  private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed)
150    throws IOException {
151    Configuration conf = TEST_UTIL.getConfiguration();
152    conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
153    FileSystem fs = TEST_UTIL.getTestFileSystem();
154    Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), methodName);
155    // give a fixed seed such we can see failure easily.
156    Random rand = new Random(randomSeed);
157    long totalDataBlockBytes =
158      writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
159    readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
160  }
161
162  private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
163    Path path) throws IOException {
164    FileSystem fs = HFileSystem.get(conf);
165    FSDataOutputStream os = fs.create(path);
166    HFileContext meta =
167      new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
168    HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
169    long totalDataBlockBytes = 0;
170    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
171      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
172      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
173        blockTypeOrdinal = BlockType.DATA.ordinal();
174      }
175      BlockType bt = BlockType.values()[blockTypeOrdinal];
176      DataOutputStream dos = hbw.startWriting(bt);
177      int size = rand.nextInt(500);
178      for (int j = 0; j < size; ++j) {
179        dos.writeShort(i + 1);
180        dos.writeInt(j + 1);
181      }
182
183      hbw.writeHeaderAndData(os);
184      totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
185    }
186    // append a dummy trailer and in a actual HFile it should have more data.
187    FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
188    trailer.setFirstDataBlockOffset(0);
189    trailer.setLastDataBlockOffset(totalDataBlockBytes);
190    trailer.setComparatorClass(meta.getCellComparator().getClass());
191    trailer.setDataIndexCount(NUM_TEST_BLOCKS);
192    trailer.setCompressionCodec(compressAlgo);
193    trailer.serialize(os);
194    // close the stream
195    os.close();
196    return totalDataBlockBytes;
197  }
198
199  private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
200    long totalDataBlockBytes) throws IOException {
201    FSDataInputStream is = fs.open(path);
202    HFileContext fileContext =
203      new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
204    ReaderContext context =
205      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
206        .withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
207        .withFilePath(path).withFileSystem(fs).build();
208    HFileBlock.FSReader hbr =
209      new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf());
210
211    long onDiskSizeOfNextBlock = -1;
212    long offset = 0;
213    int numOfReadBlock = 0;
214    // offset and totalBytes shares the same logic in the HFilePreadReader
215    while (offset < totalDataBlockBytes) {
216      HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
217      numOfReadBlock++;
218      try {
219        onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
220        offset += block.getOnDiskSizeWithHeader();
221      } finally {
222        block.release();
223      }
224    }
225    assertEquals(totalDataBlockBytes, offset);
226    assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
227    deleteFile(fs, path);
228  }
229
230  private void deleteFile(FileSystem fs, Path path) throws IOException {
231    if (fs.exists(path)) {
232      fs.delete(path, true);
233    }
234  }
235
236  @Test
237  public void testReadWithExtra() throws IOException {
238    FileSystem fs = TEST_UTIL.getTestFileSystem();
239    Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra");
240    String s = "hello world";
241    try (FSDataOutputStream out = fs.create(p)) {
242      out.writeBytes(s);
243    }
244
245    Span span = TraceUtil.createSpan(methodName);
246    try (Scope ignored = span.makeCurrent()) {
247      ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
248      try (FSDataInputStream in = fs.open(p)) {
249        assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
250      }
251      buf.rewind();
252      byte[] heapBuf = new byte[buf.capacity()];
253      buf.get(heapBuf, 0, heapBuf.length);
254      assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
255    } finally {
256      span.end();
257    }
258    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
259      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
260    assertThat(OTEL_EXT.getSpans(),
261      hasItems(allOf(hasName(methodName),
262        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
263          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 8L))))))));
264
265    OTEL_EXT.clearSpans();
266    span = TraceUtil.createSpan(methodName);
267    try (Scope ignored = span.makeCurrent()) {
268      ByteBuff buf =
269        new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
270      try (FSDataInputStream in = fs.open(p)) {
271        assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
272      }
273      buf.rewind();
274      byte[] heapBuf = new byte[11];
275      buf.get(heapBuf, 0, heapBuf.length);
276      assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
277    } finally {
278      span.end();
279    }
280    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
281      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
282    assertThat(OTEL_EXT.getSpans(),
283      hasItems(allOf(hasName(methodName),
284        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
285          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L))))))));
286
287    OTEL_EXT.clearSpans();
288    span = TraceUtil.createSpan(methodName);
289    try (Scope ignored = span.makeCurrent()) {
290      ByteBuff buf =
291        new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
292      buf.position(0).limit(12);
293      try (FSDataInputStream in = fs.open(p)) {
294        assertThrows(IOException.class, () -> BlockIOUtils.readWithExtra(buf, in, 12, 0),
295          "Should only read 11 bytes");
296      }
297    } finally {
298      span.end();
299    }
300    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
301      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
302    assertThat(OTEL_EXT.getSpans(),
303      hasItems(allOf(hasName(methodName),
304        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
305          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L))))))));
306  }
307
308  @Test
309  public void testPositionalReadNoExtra() throws IOException {
310    long position = 0;
311    int bufOffset = 0;
312    int necessaryLen = 10;
313    int extraLen = 0;
314    int totalLen = necessaryLen + extraLen;
315    byte[] buf = new byte[totalLen];
316    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
317    FSDataInputStream in = mock(FSDataInputStream.class);
318    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
319    when(in.hasCapability(anyString())).thenReturn(false);
320    boolean ret = TraceUtil.trace(
321      () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName);
322    assertFalse(ret, "Expect false return when no extra bytes requested");
323    verify(in).read(position, buf, bufOffset, totalLen);
324    verify(in).hasCapability(anyString());
325    verifyNoMoreInteractions(in);
326
327    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
328      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
329    assertThat(OTEL_EXT.getSpans(),
330      hasItems(allOf(hasName(methodName),
331        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
332          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
333  }
334
335  @Test
336  public void testPositionalReadShortReadOfNecessaryBytes() throws IOException {
337    long position = 0;
338    int bufOffset = 0;
339    int necessaryLen = 10;
340    int extraLen = 0;
341    int totalLen = necessaryLen + extraLen;
342    byte[] buf = new byte[totalLen];
343    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
344    FSDataInputStream in = mock(FSDataInputStream.class);
345    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
346    when(in.read(5, buf, 5, 5)).thenReturn(5);
347    when(in.hasCapability(anyString())).thenReturn(false);
348    boolean ret = TraceUtil.trace(
349      () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName);
350    assertFalse(ret, "Expect false return when no extra bytes requested");
351    verify(in).read(position, buf, bufOffset, totalLen);
352    verify(in).read(5, buf, 5, 5);
353    verify(in).hasCapability(anyString());
354    verifyNoMoreInteractions(in);
355
356    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
357      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
358    assertThat(OTEL_EXT.getSpans(),
359      hasItems(allOf(hasName(methodName),
360        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
361          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
362  }
363
364  @Test
365  public void testPositionalReadExtraSucceeded() throws IOException {
366    long position = 0;
367    int bufOffset = 0;
368    int necessaryLen = 10;
369    int extraLen = 5;
370    int totalLen = necessaryLen + extraLen;
371    byte[] buf = new byte[totalLen];
372    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
373    FSDataInputStream in = mock(FSDataInputStream.class);
374    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
375    when(in.hasCapability(anyString())).thenReturn(false);
376    boolean ret = TraceUtil.trace(
377      () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName);
378    assertTrue(ret, "Expect true return when reading extra bytes succeeds");
379    verify(in).read(position, buf, bufOffset, totalLen);
380    verify(in).hasCapability(anyString());
381    verifyNoMoreInteractions(in);
382
383    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
384      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
385    assertThat(OTEL_EXT.getSpans(),
386      hasItems(allOf(hasName(methodName),
387        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
388          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
389  }
390
391  @Test
392  public void testPositionalReadExtraFailed() throws IOException {
393    long position = 0;
394    int bufOffset = 0;
395    int necessaryLen = 10;
396    int extraLen = 5;
397    int totalLen = necessaryLen + extraLen;
398    byte[] buf = new byte[totalLen];
399    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
400    FSDataInputStream in = mock(FSDataInputStream.class);
401    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
402    when(in.hasCapability(anyString())).thenReturn(false);
403    boolean ret = TraceUtil.trace(
404      () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName);
405    assertFalse(ret, "Expect false return when reading extra bytes fails");
406    verify(in).read(position, buf, bufOffset, totalLen);
407    verify(in).hasCapability(anyString());
408    verifyNoMoreInteractions(in);
409
410    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
411      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
412    assertThat(OTEL_EXT.getSpans(),
413      hasItems(allOf(hasName(methodName),
414        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
415          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", necessaryLen))))))));
416  }
417
418  @Test
419  public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes() throws IOException {
420    long position = 0;
421    int bufOffset = 0;
422    int necessaryLen = 10;
423    int extraLen = 5;
424    int totalLen = necessaryLen + extraLen;
425    byte[] buf = new byte[totalLen];
426    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
427    FSDataInputStream in = mock(FSDataInputStream.class);
428    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
429    when(in.read(5, buf, 5, 10)).thenReturn(10);
430    when(in.hasCapability(anyString())).thenReturn(false);
431    boolean ret = TraceUtil.trace(
432      () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen), methodName);
433    assertTrue(ret, "Expect true return when reading extra bytes succeeds");
434    verify(in).read(position, buf, bufOffset, totalLen);
435    verify(in).read(5, buf, 5, 10);
436    verify(in).hasCapability(anyString());
437    verifyNoMoreInteractions(in);
438
439    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
440      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
441    assertThat(OTEL_EXT.getSpans(),
442      hasItems(allOf(hasName(methodName),
443        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
444          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
445  }
446
447  @Test
448  public void testPositionalReadPrematureEOF() throws IOException {
449    long position = 0;
450    int bufOffset = 0;
451    int necessaryLen = 10;
452    int extraLen = 0;
453    int totalLen = necessaryLen + extraLen;
454    byte[] buf = new byte[totalLen];
455    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
456    FSDataInputStream in = mock(FSDataInputStream.class);
457    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
458    when(in.hasCapability(anyString())).thenReturn(false);
459    Span span = TraceUtil.createSpan(methodName);
460    try (Scope ignored = span.makeCurrent()) {
461      IOException e = assertThrows(IOException.class,
462        () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen));
463      assertThat(e.getMessage(), containsString("EOF"));
464      TraceUtil.setError(span, e);
465    } finally {
466      span.end();
467    }
468    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
469      OTEL_EXT::getSpans, hasItem(allOf(hasName(methodName), hasEnded()))));
470    assertThat(OTEL_EXT.getSpans(),
471      hasItems(allOf(hasName(methodName),
472        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
473          hasAttributes(AttributesMatchers.isEmpty())))))));
474
475    verify(in).read(position, buf, bufOffset, totalLen);
476    verify(in).hasCapability(anyString());
477    verifyNoMoreInteractions(in);
478  }
479
480  /**
481   * Determine if ByteBufferPositionedReadable API is available .
482   * @return true if FSDataInputStream implements ByteBufferPositionedReadable API.
483   */
484  private boolean isByteBufferPositionedReadable() {
485    try {
486      // long position, ByteBuffer buf
487      FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
488    } catch (NoSuchMethodException e) {
489      return false;
490    }
491    return true;
492  }
493
494  public static class MyFSDataInputStream extends FSDataInputStream {
495    public MyFSDataInputStream(InputStream in) {
496      super(in);
497    }
498
499    // This is the ByteBufferPositionReadable API we want to test.
500    // Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop
501    // does not implement the interface, and it wouldn't compile trying to mock the method.
502    // So explicitly declare the method here to make mocking possible.
503    public int read(long position, ByteBuffer buf) throws IOException {
504      return 0;
505    }
506  }
507
508  @Test
509  public void testByteBufferPositionedReadable() throws IOException {
510    assumeTrue(isByteBufferPositionedReadable(),
511      "Skip the test because ByteBufferPositionedReadable is not available");
512    long position = 0;
513    int necessaryLen = 10;
514    int extraLen = 1;
515    int totalLen = necessaryLen + extraLen;
516    int firstReadLen = 6;
517    int secondReadLen = totalLen - firstReadLen;
518    ByteBuffer buf = ByteBuffer.allocate(totalLen);
519    ByteBuff bb = new SingleByteBuff(buf);
520    MyFSDataInputStream in = mock(MyFSDataInputStream.class);
521
522    when(in.read(position, buf)).thenReturn(firstReadLen);
523    when(in.read(firstReadLen, buf)).thenReturn(secondReadLen);
524    when(in.hasCapability(anyString())).thenReturn(true);
525    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
526    assertTrue(ret, "Expect true return when reading extra bytes succeeds");
527    verify(in).read(position, buf);
528    verify(in).read(firstReadLen, buf);
529    verify(in).hasCapability(anyString());
530    verifyNoMoreInteractions(in);
531  }
532
533  @Test
534  public void testByteBufferPositionedReadableEOF() throws IOException {
535    assumeTrue(isByteBufferPositionedReadable(),
536      "Skip the test because ByteBufferPositionedReadable is not available");
537    long position = 0;
538    int necessaryLen = 10;
539    int extraLen = 0;
540    int totalLen = necessaryLen + extraLen;
541    int firstReadLen = 9;
542    ByteBuffer buf = ByteBuffer.allocate(totalLen);
543    ByteBuff bb = new SingleByteBuff(buf);
544    MyFSDataInputStream in = mock(MyFSDataInputStream.class);
545
546    when(in.read(position, buf)).thenReturn(firstReadLen);
547    when(in.read(firstReadLen, buf)).thenReturn(-1);
548    when(in.hasCapability(anyString())).thenReturn(true);
549    IOException e = assertThrows(IOException.class,
550      () -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen));
551    assertThat(e.getMessage(), containsString("EOF"));
552
553    verify(in).read(position, buf);
554    verify(in).read(firstReadLen, buf);
555    verify(in).hasCapability(anyString());
556    verifyNoMoreInteractions(in);
557  }
558}