001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes;
022import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
023import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents;
024import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
025import static org.hamcrest.MatcherAssert.assertThat;
026import static org.hamcrest.Matchers.allOf;
027import static org.hamcrest.Matchers.hasItem;
028import static org.hamcrest.Matchers.hasItems;
029import static org.junit.Assert.assertArrayEquals;
030import static org.junit.Assert.assertEquals;
031import static org.junit.Assert.assertFalse;
032import static org.junit.Assert.assertTrue;
033import static org.junit.Assert.fail;
034import static org.junit.Assume.assumeTrue;
035import static org.mockito.ArgumentMatchers.anyString;
036import static org.mockito.Mockito.mock;
037import static org.mockito.Mockito.verify;
038import static org.mockito.Mockito.verifyNoMoreInteractions;
039import static org.mockito.Mockito.when;
040
041import io.opentelemetry.api.trace.Span;
042import io.opentelemetry.api.trace.StatusCode;
043import io.opentelemetry.context.Scope;
044import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
045import io.opentelemetry.sdk.trace.data.SpanData;
046import java.io.DataOutputStream;
047import java.io.IOException;
048import java.io.InputStream;
049import java.nio.ByteBuffer;
050import java.util.Random;
051import java.util.concurrent.TimeUnit;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.fs.FSDataInputStream;
054import org.apache.hadoop.fs.FSDataOutputStream;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.Path;
057import org.apache.hadoop.hbase.HBaseClassTestRule;
058import org.apache.hadoop.hbase.HBaseTestingUtil;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.MatcherPredicate;
061import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers;
062import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers;
063import org.apache.hadoop.hbase.fs.HFileSystem;
064import org.apache.hadoop.hbase.io.ByteBuffAllocator;
065import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
066import org.apache.hadoop.hbase.io.compress.Compression;
067import org.apache.hadoop.hbase.io.util.BlockIOUtils;
068import org.apache.hadoop.hbase.nio.ByteBuff;
069import org.apache.hadoop.hbase.nio.MultiByteBuff;
070import org.apache.hadoop.hbase.nio.SingleByteBuff;
071import org.apache.hadoop.hbase.testclassification.IOTests;
072import org.apache.hadoop.hbase.testclassification.SmallTests;
073import org.apache.hadoop.hbase.trace.TraceUtil;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.junit.ClassRule;
077import org.junit.Rule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.rules.ExpectedException;
081import org.junit.rules.TestName;
082
083@Category({ IOTests.class, SmallTests.class })
084public class TestBlockIOUtils {
085
086  @ClassRule
087  public static final HBaseClassTestRule CLASS_RULE =
088    HBaseClassTestRule.forClass(TestBlockIOUtils.class);
089
090  @Rule
091  public TestName testName = new TestName();
092
093  @Rule
094  public ExpectedException exception = ExpectedException.none();
095
096  @Rule
097  public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
098
099  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
100
101  private static final int NUM_TEST_BLOCKS = 2;
102  private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;
103
104  @Test
105  public void testIsByteBufferReadable() throws IOException {
106    FileSystem fs = TEST_UTIL.getTestFileSystem();
107    Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable");
108    try (FSDataOutputStream out = fs.create(p)) {
109      out.writeInt(23);
110    }
111    try (FSDataInputStream is = fs.open(p)) {
112      assertFalse(BlockIOUtils.isByteBufferReadable(is));
113    }
114  }
115
116  @Test
117  public void testReadFully() throws IOException {
118    TraceUtil.trace(() -> {
119      FileSystem fs = TEST_UTIL.getTestFileSystem();
120      Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
121      String s = "hello world";
122      try (FSDataOutputStream out = fs.create(p)) {
123        out.writeBytes(s);
124      }
125      ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11));
126      try (FSDataInputStream in = fs.open(p)) {
127        BlockIOUtils.readFully(buf, in, 11);
128      }
129      buf.rewind();
130      byte[] heapBuf = new byte[s.length()];
131      buf.get(heapBuf, 0, heapBuf.length);
132      assertArrayEquals(Bytes.toBytes(s), heapBuf);
133    }, testName.getMethodName());
134
135    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
136      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
137    assertThat(otelRule.getSpans(),
138      hasItems(allOf(hasName(testName.getMethodName()),
139        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"),
140          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11))))))));
141  }
142
143  @Test
144  public void testPreadWithReadFullBytes() throws IOException {
145    testPreadReadFullBytesInternal(true, EnvironmentEdgeManager.currentTime());
146  }
147
148  @Test
149  public void testPreadWithoutReadFullBytes() throws IOException {
150    testPreadReadFullBytesInternal(false, EnvironmentEdgeManager.currentTime());
151  }
152
153  private void testPreadReadFullBytesInternal(boolean readAllBytes, long randomSeed)
154    throws IOException {
155    Configuration conf = TEST_UTIL.getConfiguration();
156    conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
157    FileSystem fs = TEST_UTIL.getTestFileSystem();
158    Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName());
159    // give a fixed seed such we can see failure easily.
160    Random rand = new Random(randomSeed);
161    long totalDataBlockBytes =
162      writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
163    readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
164  }
165
166  private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
167    Path path) throws IOException {
168    FileSystem fs = HFileSystem.get(conf);
169    FSDataOutputStream os = fs.create(path);
170    HFileContext meta =
171      new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
172    HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
173    long totalDataBlockBytes = 0;
174    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
175      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
176      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
177        blockTypeOrdinal = BlockType.DATA.ordinal();
178      }
179      BlockType bt = BlockType.values()[blockTypeOrdinal];
180      DataOutputStream dos = hbw.startWriting(bt);
181      int size = rand.nextInt(500);
182      for (int j = 0; j < size; ++j) {
183        dos.writeShort(i + 1);
184        dos.writeInt(j + 1);
185      }
186
187      hbw.writeHeaderAndData(os);
188      totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
189    }
190    // append a dummy trailer and in a actual HFile it should have more data.
191    FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
192    trailer.setFirstDataBlockOffset(0);
193    trailer.setLastDataBlockOffset(totalDataBlockBytes);
194    trailer.setComparatorClass(meta.getCellComparator().getClass());
195    trailer.setDataIndexCount(NUM_TEST_BLOCKS);
196    trailer.setCompressionCodec(compressAlgo);
197    trailer.serialize(os);
198    // close the stream
199    os.close();
200    return totalDataBlockBytes;
201  }
202
203  private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
204    long totalDataBlockBytes) throws IOException {
205    FSDataInputStream is = fs.open(path);
206    HFileContext fileContext =
207      new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
208    ReaderContext context =
209      new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
210        .withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
211        .withFilePath(path).withFileSystem(fs).build();
212    HFileBlock.FSReader hbr =
213      new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf());
214
215    long onDiskSizeOfNextBlock = -1;
216    long offset = 0;
217    int numOfReadBlock = 0;
218    // offset and totalBytes shares the same logic in the HFilePreadReader
219    while (offset < totalDataBlockBytes) {
220      HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
221      numOfReadBlock++;
222      try {
223        onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
224        offset += block.getOnDiskSizeWithHeader();
225      } finally {
226        block.release();
227      }
228    }
229    assertEquals(totalDataBlockBytes, offset);
230    assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
231    deleteFile(fs, path);
232  }
233
234  private void deleteFile(FileSystem fs, Path path) throws IOException {
235    if (fs.exists(path)) {
236      fs.delete(path, true);
237    }
238  }
239
240  @Test
241  public void testReadWithExtra() throws IOException {
242    FileSystem fs = TEST_UTIL.getTestFileSystem();
243    Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra");
244    String s = "hello world";
245    try (FSDataOutputStream out = fs.create(p)) {
246      out.writeBytes(s);
247    }
248
249    Span span = TraceUtil.createSpan(testName.getMethodName());
250    try (Scope ignored = span.makeCurrent()) {
251      ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
252      try (FSDataInputStream in = fs.open(p)) {
253        assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
254      }
255      buf.rewind();
256      byte[] heapBuf = new byte[buf.capacity()];
257      buf.get(heapBuf, 0, heapBuf.length);
258      assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
259    } finally {
260      span.end();
261    }
262    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
263      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
264    assertThat(otelRule.getSpans(),
265      hasItems(allOf(hasName(testName.getMethodName()),
266        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
267          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 8L))))))));
268
269    otelRule.clearSpans();
270    span = TraceUtil.createSpan(testName.getMethodName());
271    try (Scope ignored = span.makeCurrent()) {
272      ByteBuff buf =
273        new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
274      try (FSDataInputStream in = fs.open(p)) {
275        assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
276      }
277      buf.rewind();
278      byte[] heapBuf = new byte[11];
279      buf.get(heapBuf, 0, heapBuf.length);
280      assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
281    } finally {
282      span.end();
283    }
284    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
285      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
286    assertThat(otelRule.getSpans(),
287      hasItems(allOf(hasName(testName.getMethodName()),
288        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
289          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L))))))));
290
291    otelRule.clearSpans();
292    span = TraceUtil.createSpan(testName.getMethodName());
293    try (Scope ignored = span.makeCurrent()) {
294      ByteBuff buf =
295        new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4));
296      buf.position(0).limit(12);
297      exception.expect(IOException.class);
298      try (FSDataInputStream in = fs.open(p)) {
299        BlockIOUtils.readWithExtra(buf, in, 12, 0);
300        fail("Should only read 11 bytes");
301      }
302    } finally {
303      span.end();
304    }
305    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
306      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
307    assertThat(otelRule.getSpans(),
308      hasItems(allOf(hasName(testName.getMethodName()),
309        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
310          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", 11L))))))));
311  }
312
313  @Test
314  public void testPositionalReadNoExtra() throws IOException {
315    long position = 0;
316    int bufOffset = 0;
317    int necessaryLen = 10;
318    int extraLen = 0;
319    int totalLen = necessaryLen + extraLen;
320    byte[] buf = new byte[totalLen];
321    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
322    FSDataInputStream in = mock(FSDataInputStream.class);
323    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
324    when(in.hasCapability(anyString())).thenReturn(false);
325    boolean ret =
326      TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
327        testName.getMethodName());
328    assertFalse("Expect false return when no extra bytes requested", ret);
329    verify(in).read(position, buf, bufOffset, totalLen);
330    verify(in).hasCapability(anyString());
331    verifyNoMoreInteractions(in);
332
333    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
334      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
335    assertThat(otelRule.getSpans(),
336      hasItems(allOf(hasName(testName.getMethodName()),
337        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
338          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
339  }
340
341  @Test
342  public void testPositionalReadShortReadOfNecessaryBytes() throws IOException {
343    long position = 0;
344    int bufOffset = 0;
345    int necessaryLen = 10;
346    int extraLen = 0;
347    int totalLen = necessaryLen + extraLen;
348    byte[] buf = new byte[totalLen];
349    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
350    FSDataInputStream in = mock(FSDataInputStream.class);
351    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
352    when(in.read(5, buf, 5, 5)).thenReturn(5);
353    when(in.hasCapability(anyString())).thenReturn(false);
354    boolean ret =
355      TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
356        testName.getMethodName());
357    assertFalse("Expect false return when no extra bytes requested", ret);
358    verify(in).read(position, buf, bufOffset, totalLen);
359    verify(in).read(5, buf, 5, 5);
360    verify(in).hasCapability(anyString());
361    verifyNoMoreInteractions(in);
362
363    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
364      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
365    assertThat(otelRule.getSpans(),
366      hasItems(allOf(hasName(testName.getMethodName()),
367        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
368          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
369  }
370
371  @Test
372  public void testPositionalReadExtraSucceeded() throws IOException {
373    long position = 0;
374    int bufOffset = 0;
375    int necessaryLen = 10;
376    int extraLen = 5;
377    int totalLen = necessaryLen + extraLen;
378    byte[] buf = new byte[totalLen];
379    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
380    FSDataInputStream in = mock(FSDataInputStream.class);
381    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
382    when(in.hasCapability(anyString())).thenReturn(false);
383    boolean ret =
384      TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
385        testName.getMethodName());
386    assertTrue("Expect true return when reading extra bytes succeeds", ret);
387    verify(in).read(position, buf, bufOffset, totalLen);
388    verify(in).hasCapability(anyString());
389    verifyNoMoreInteractions(in);
390
391    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
392      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
393    assertThat(otelRule.getSpans(),
394      hasItems(allOf(hasName(testName.getMethodName()),
395        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
396          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
397  }
398
399  @Test
400  public void testPositionalReadExtraFailed() throws IOException {
401    long position = 0;
402    int bufOffset = 0;
403    int necessaryLen = 10;
404    int extraLen = 5;
405    int totalLen = necessaryLen + extraLen;
406    byte[] buf = new byte[totalLen];
407    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
408    FSDataInputStream in = mock(FSDataInputStream.class);
409    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
410    when(in.hasCapability(anyString())).thenReturn(false);
411    boolean ret =
412      TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
413        testName.getMethodName());
414    assertFalse("Expect false return when reading extra bytes fails", ret);
415    verify(in).read(position, buf, bufOffset, totalLen);
416    verify(in).hasCapability(anyString());
417    verifyNoMoreInteractions(in);
418
419    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
420      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
421    assertThat(otelRule.getSpans(),
422      hasItems(allOf(hasName(testName.getMethodName()),
423        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
424          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", necessaryLen))))))));
425  }
426
427  @Test
428  public void testPositionalReadShortReadCompletesNecessaryAndExtraBytes() throws IOException {
429    long position = 0;
430    int bufOffset = 0;
431    int necessaryLen = 10;
432    int extraLen = 5;
433    int totalLen = necessaryLen + extraLen;
434    byte[] buf = new byte[totalLen];
435    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
436    FSDataInputStream in = mock(FSDataInputStream.class);
437    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
438    when(in.read(5, buf, 5, 10)).thenReturn(10);
439    when(in.hasCapability(anyString())).thenReturn(false);
440    boolean ret =
441      TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen),
442        testName.getMethodName());
443    assertTrue("Expect true return when reading extra bytes succeeds", ret);
444    verify(in).read(position, buf, bufOffset, totalLen);
445    verify(in).read(5, buf, 5, 10);
446    verify(in).hasCapability(anyString());
447    verifyNoMoreInteractions(in);
448
449    TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
450      otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
451    assertThat(otelRule.getSpans(),
452      hasItems(allOf(hasName(testName.getMethodName()),
453        hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
454          hasAttributes(containsEntry("db.hbase.io.heap_bytes_read", totalLen))))))));
455  }
456
457  @Test
458  public void testPositionalReadPrematureEOF() throws IOException {
459    long position = 0;
460    int bufOffset = 0;
461    int necessaryLen = 10;
462    int extraLen = 0;
463    int totalLen = necessaryLen + extraLen;
464    byte[] buf = new byte[totalLen];
465    ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen));
466    FSDataInputStream in = mock(FSDataInputStream.class);
467    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9);
468    when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1);
469    when(in.hasCapability(anyString())).thenReturn(false);
470    exception.expect(IOException.class);
471    exception.expectMessage("EOF");
472    Span span = TraceUtil.createSpan(testName.getMethodName());
473    try (Scope ignored = span.makeCurrent()) {
474      BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
475      span.setStatus(StatusCode.OK);
476    } catch (IOException e) {
477      TraceUtil.setError(span, e);
478      throw e;
479    } finally {
480      span.end();
481
482      TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new MatcherPredicate<Iterable<SpanData>>(
483        otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()), hasEnded()))));
484      assertThat(otelRule.getSpans(),
485        hasItems(allOf(hasName(testName.getMethodName()),
486          hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
487            hasAttributes(AttributesMatchers.isEmpty())))))));
488    }
489  }
490
491  /**
492   * Determine if ByteBufferPositionedReadable API is available .
493   * @return true if FSDataInputStream implements ByteBufferPositionedReadable API.
494   */
495  private boolean isByteBufferPositionedReadable() {
496    try {
497      // long position, ByteBuffer buf
498      FSDataInputStream.class.getMethod("read", long.class, ByteBuffer.class);
499    } catch (NoSuchMethodException e) {
500      return false;
501    }
502    return true;
503  }
504
505  public static class MyFSDataInputStream extends FSDataInputStream {
506    public MyFSDataInputStream(InputStream in) {
507      super(in);
508    }
509
510    // This is the ByteBufferPositionReadable API we want to test.
511    // Because the API is only available in Hadoop 3.3, FSDataInputStream in older Hadoop
512    // does not implement the interface, and it wouldn't compile trying to mock the method.
513    // So explicitly declare the method here to make mocking possible.
514    public int read(long position, ByteBuffer buf) throws IOException {
515      return 0;
516    }
517  }
518
519  @Test
520  public void testByteBufferPositionedReadable() throws IOException {
521    assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
522      isByteBufferPositionedReadable());
523    long position = 0;
524    int necessaryLen = 10;
525    int extraLen = 1;
526    int totalLen = necessaryLen + extraLen;
527    int firstReadLen = 6;
528    int secondReadLen = totalLen - firstReadLen;
529    ByteBuffer buf = ByteBuffer.allocate(totalLen);
530    ByteBuff bb = new SingleByteBuff(buf);
531    MyFSDataInputStream in = mock(MyFSDataInputStream.class);
532
533    when(in.read(position, buf)).thenReturn(firstReadLen);
534    when(in.read(firstReadLen, buf)).thenReturn(secondReadLen);
535    when(in.hasCapability(anyString())).thenReturn(true);
536    boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
537    assertTrue("Expect true return when reading extra bytes succeeds", ret);
538    verify(in).read(position, buf);
539    verify(in).read(firstReadLen, buf);
540    verify(in).hasCapability(anyString());
541    verifyNoMoreInteractions(in);
542  }
543
544  @Test
545  public void testByteBufferPositionedReadableEOF() throws IOException {
546    assumeTrue("Skip the test because ByteBufferPositionedReadable is not available",
547      isByteBufferPositionedReadable());
548    long position = 0;
549    int necessaryLen = 10;
550    int extraLen = 0;
551    int totalLen = necessaryLen + extraLen;
552    int firstReadLen = 9;
553    ByteBuffer buf = ByteBuffer.allocate(totalLen);
554    ByteBuff bb = new SingleByteBuff(buf);
555    MyFSDataInputStream in = mock(MyFSDataInputStream.class);
556
557    when(in.read(position, buf)).thenReturn(firstReadLen);
558    when(in.read(position, buf)).thenReturn(-1);
559    when(in.hasCapability(anyString())).thenReturn(true);
560    exception.expect(IOException.class);
561    exception.expectMessage("EOF");
562    BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
563
564    verify(in).read(position, buf);
565    verify(in).read(firstReadLen, buf);
566    verify(in).hasCapability(anyString());
567    verifyNoMoreInteractions(in);
568  }
569}