001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
022import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertNotEquals;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.junit.jupiter.api.Assertions.fail;
028
029import java.io.ByteArrayOutputStream;
030import java.io.DataOutputStream;
031import java.io.IOException;
032import java.io.OutputStream;
033import java.nio.ByteBuffer;
034import java.util.ArrayList;
035import java.util.Collections;
036import java.util.HashMap;
037import java.util.List;
038import java.util.Map;
039import java.util.Optional;
040import java.util.Random;
041import java.util.concurrent.Callable;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.Executor;
044import java.util.concurrent.ExecutorCompletionService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FSDataInputStream;
050import org.apache.hadoop.fs.FSDataOutputStream;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.hbase.ArrayBackedTag;
054import org.apache.hadoop.hbase.CellComparatorImpl;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.HBaseConfiguration;
057import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
058import org.apache.hadoop.hbase.HBaseTestingUtil;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.KeyValue;
061import org.apache.hadoop.hbase.Tag;
062import org.apache.hadoop.hbase.fs.HFileSystem;
063import org.apache.hadoop.hbase.io.ByteBuffAllocator;
064import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
065import org.apache.hadoop.hbase.io.compress.Compression;
066import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
067import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
068import org.apache.hadoop.hbase.nio.ByteBuff;
069import org.apache.hadoop.hbase.nio.MultiByteBuff;
070import org.apache.hadoop.hbase.nio.SingleByteBuff;
071import org.apache.hadoop.hbase.testclassification.IOTests;
072import org.apache.hadoop.hbase.testclassification.LargeTests;
073import org.apache.hadoop.hbase.util.Bytes;
074import org.apache.hadoop.hbase.util.ChecksumType;
075import org.apache.hadoop.hbase.util.ClassSize;
076import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
077import org.apache.hadoop.io.WritableUtils;
078import org.apache.hadoop.io.compress.Compressor;
079import org.junit.jupiter.api.AfterEach;
080import org.junit.jupiter.api.BeforeEach;
081import org.junit.jupiter.api.TestTemplate;
082import org.junit.jupiter.params.provider.Arguments;
083import org.mockito.Mockito;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087@org.junit.jupiter.api.Tag(IOTests.TAG)
088@org.junit.jupiter.api.Tag(LargeTests.TAG)
089@HBaseParameterizedTestTemplate(
090    name = "{index}: includesMemstoreTS={0}, includesTag={1}, useHeapAllocator={2}")
091public class TestHFileBlock {
092
093  // change this value to activate more logs
094  private static final boolean detailedLogging = false;
095  private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
096
097  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
098
099  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
100
101  private static final int NUM_TEST_BLOCKS = 1000;
102  private static final int NUM_READER_THREADS = 26;
103  private static final int MAX_BUFFER_COUNT = 2048;
104
105  // Used to generate KeyValues
106  private static int NUM_KEYVALUES = 50;
107  private static int FIELD_LENGTH = 10;
108  private static float CHANCE_TO_REPEAT = 0.6f;
109
110  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
111  private static final Random RNG = new Random(); // This test depends on Random#setSeed
112  private FileSystem fs;
113
114  private final boolean includesMemstoreTS;
115  private final boolean includesTag;
116  private final boolean useHeapAllocator;
117  private final ByteBuffAllocator alloc;
118
119  public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) {
120    this.includesMemstoreTS = includesMemstoreTS;
121    this.includesTag = includesTag;
122    this.useHeapAllocator = useHeapAllocator;
123    this.alloc = useHeapAllocator ? HEAP : createOffHeapAlloc();
124    assertAllocator();
125  }
126
127  public static Stream<Arguments> parameters() {
128    List<Arguments> params = new ArrayList<>();
129    for (int i = 0; i < (1 << 3); i++) {
130      boolean v0 = (i & (1 << 0)) != 0;
131      boolean v1 = (i & (1 << 1)) != 0;
132      boolean v2 = (i & (1 << 2)) != 0;
133      params.add(Arguments.of(v0, v1, v2));
134    }
135    return params.stream();
136  }
137
138  private ByteBuffAllocator createOffHeapAlloc() {
139    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
140    conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT);
141    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
142    ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true);
143    // Fill the allocator
144    List<ByteBuff> bufs = new ArrayList<>();
145    for (int i = 0; i < MAX_BUFFER_COUNT; i++) {
146      ByteBuff bb = alloc.allocateOneBuffer();
147      assertTrue(!bb.hasArray());
148      bufs.add(bb);
149    }
150    bufs.forEach(ByteBuff::release);
151    return alloc;
152  }
153
154  private void assertAllocator() {
155    if (!useHeapAllocator) {
156      assertEquals(MAX_BUFFER_COUNT, alloc.getFreeBufferCount());
157    }
158  }
159
160  @BeforeEach
161  public void setUp() throws IOException {
162    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
163  }
164
165  @AfterEach
166  public void tearDown() throws IOException {
167    assertAllocator();
168    alloc.clean();
169  }
170
171  static void writeTestBlockContents(DataOutputStream dos) throws IOException {
172    // This compresses really well.
173    for (int i = 0; i < 1000; ++i)
174      dos.writeInt(i / 100);
175  }
176
177  static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
178    boolean useTag) throws IOException {
179    List<KeyValue> keyValues = new ArrayList<>();
180
181    // generate keyValues
182    RNG.setSeed(42); // just any fixed number
183    for (int i = 0; i < NUM_KEYVALUES; ++i) {
184      byte[] row;
185      long timestamp;
186      byte[] family;
187      byte[] qualifier;
188      byte[] value;
189
190      // generate it or repeat, it should compress well
191      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
192        row = CellUtil.cloneRow(keyValues.get(RNG.nextInt(keyValues.size())));
193      } else {
194        row = new byte[FIELD_LENGTH];
195        RNG.nextBytes(row);
196      }
197      if (0 == i) {
198        family = new byte[FIELD_LENGTH];
199        RNG.nextBytes(family);
200      } else {
201        family = CellUtil.cloneFamily(keyValues.get(0));
202      }
203      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
204        qualifier = CellUtil.cloneQualifier(keyValues.get(RNG.nextInt(keyValues.size())));
205      } else {
206        qualifier = new byte[FIELD_LENGTH];
207        RNG.nextBytes(qualifier);
208      }
209      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
210        value = CellUtil.cloneValue(keyValues.get(RNG.nextInt(keyValues.size())));
211      } else {
212        value = new byte[FIELD_LENGTH];
213        RNG.nextBytes(value);
214      }
215      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
216        timestamp = keyValues.get(RNG.nextInt(keyValues.size())).getTimestamp();
217      } else {
218        timestamp = RNG.nextLong();
219      }
220      if (!useTag) {
221        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
222      } else {
223        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value,
224          new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) }));
225      }
226    }
227
228    // sort it and write to stream
229    int totalSize = 0;
230    Collections.sort(keyValues, CellComparatorImpl.COMPARATOR);
231
232    for (KeyValue kv : keyValues) {
233      totalSize += kv.getLength();
234      if (includesMemstoreTS) {
235        long memstoreTS = RNG.nextLong();
236        kv.setSequenceId(memstoreTS);
237        totalSize += WritableUtils.getVIntSize(memstoreTS);
238      }
239      hbw.write(kv);
240    }
241    return totalSize;
242  }
243
244  public byte[] createTestV1Block(Compression.Algorithm algo) throws IOException {
245    Compressor compressor = algo.getCompressor();
246    ByteArrayOutputStream baos = new ByteArrayOutputStream();
247    OutputStream os = algo.createCompressionStream(baos, compressor, 0);
248    DataOutputStream dos = new DataOutputStream(os);
249    BlockType.META.write(dos); // Let's make this a meta block.
250    writeTestBlockContents(dos);
251    dos.flush();
252    algo.returnCompressor(compressor);
253    return baos.toByteArray();
254  }
255
256  static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo, boolean includesMemstoreTS,
257    boolean includesTag) throws IOException {
258    final BlockType blockType = BlockType.DATA;
259    HFileContext meta = new HFileContextBuilder().withCompression(algo)
260      .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
261      .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
262    HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
263    DataOutputStream dos = hbw.startWriting(blockType);
264    writeTestBlockContents(dos);
265    dos.flush();
266    hbw.ensureBlockReady();
267    assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
268    hbw.release();
269    return hbw;
270  }
271
272  public String createTestBlockStr(Compression.Algorithm algo, int correctLength, boolean useTag)
273    throws IOException {
274    HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
275    byte[] testV2Block = hbw.getHeaderAndDataForTest();
276    int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
277    if (testV2Block.length == correctLength) {
278      // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
279      // variations across operating systems.
280      // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
281      // We only make this change when the compressed block length matches.
282      // Otherwise, there are obviously other inconsistencies.
283      testV2Block[osOffset] = 3;
284    }
285    return Bytes.toStringBinary(testV2Block);
286  }
287
288  @TestTemplate
289  public void testNoCompression() throws IOException {
290    CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
291    Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty());
292
293    HFileBlock block =
294      createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
295    assertEquals(4000, block.getUncompressedSizeWithoutHeader());
296    assertEquals(4004, block.getOnDiskSizeWithoutHeader());
297    assertTrue(block.isUnpacked());
298  }
299
300  @TestTemplate
301  public void testGzipCompression() throws IOException {
302    // @formatter:off
303    String correctTestBlockStr = "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
304      + "\\xFF\\xFF\\xFF\\xFF"
305      + "\\x0" + ChecksumType.getDefaultChecksumType().getCode()
306      + "\\x00\\x00@\\x00\\x00\\x00\\x00["
307      // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
308      + "\\x1F\\x8B"  // gzip magic signature
309      + "\\x08"  // Compression method: 8 = "deflate"
310      + "\\x00"  // Flags
311      + "\\x00\\x00\\x00\\x00"  // mtime
312      + "\\x00"  // XFL (extra flags)
313      // OS (0 = FAT filesystems, 3 = Unix). However, this field
314      // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
315      // This appears to be a difference caused by the availability
316      // (and use) of the native GZ codec.
317      + "\\x03"
318      + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
319      + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
320      + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
321      + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
322    // @formatter:on
323    int correctGzipBlockLength = 95;
324    String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
325    // We ignore the block checksum because createTestBlockStr can change the
326    // gzip header after the block is produced
327    assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
328      testBlockStr.substring(0, correctGzipBlockLength - 4));
329  }
330
331  @TestTemplate
332  public void testReaderV2() throws IOException {
333    testReaderV2Internals();
334  }
335
336  private void assertRelease(HFileBlock blk) {
337    if (blk instanceof ExclusiveMemHFileBlock) {
338      assertFalse(blk.release());
339    } else {
340      assertTrue(blk.release());
341    }
342  }
343
344  protected void testReaderV2Internals() throws IOException {
345    final Configuration conf = TEST_UTIL.getConfiguration();
346    if (includesTag) {
347      conf.setInt("hfile.format.version", 3);
348    }
349    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
350      for (boolean pread : new boolean[] { false, true }) {
351        LOG.info("testReaderV2: Compression algorithm: " + algo + ", pread=" + pread);
352        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo);
353        FSDataOutputStream os = fs.create(path);
354        HFileContext meta = new HFileContextBuilder().withCompression(algo)
355          .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
356          .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
357        HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
358        long totalSize = 0;
359        for (int blockId = 0; blockId < 2; ++blockId) {
360          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
361          for (int i = 0; i < 1234; ++i)
362            dos.writeInt(i);
363          hbw.writeHeaderAndData(os);
364          totalSize += hbw.getOnDiskSizeWithHeader();
365        }
366        os.close();
367
368        FSDataInputStream is = fs.open(path);
369        meta =
370          new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS)
371            .withIncludesTags(includesTag).withCompression(algo).build();
372        ReaderContext context =
373          new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
374            .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
375        HFileBlock.FSReader hbr =
376          new HFileBlock.FSReaderImpl(context, meta, alloc, TEST_UTIL.getConfiguration());
377        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
378        is.close();
379        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
380
381        b.sanityCheck();
382        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
383        assertEquals(algo == GZ ? 2173 : 4936,
384          b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
385        HFileBlock expected = b;
386
387        if (algo == GZ) {
388          is = fs.open(path);
389          ReaderContext readerContext =
390            new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
391              .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
392          hbr =
393            new HFileBlock.FSReaderImpl(readerContext, meta, alloc, TEST_UTIL.getConfiguration());
394          b = hbr.readBlockData(0,
395            2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
396          assertEquals(expected, b);
397          int wrongCompressedSize = 2172;
398          try {
399            hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread,
400              false, true);
401            fail("Exception expected");
402          } catch (IOException ex) {
403            String expectedPrefix = "Passed in onDiskSizeWithHeader=";
404            assertTrue(ex.getMessage().startsWith(expectedPrefix),
405              "Invalid exception message: '" + ex.getMessage()
406                + "'.\nMessage is expected to start with: '" + expectedPrefix + "'");
407          }
408          assertRelease(b);
409          is.close();
410        }
411        assertRelease(expected);
412      }
413    }
414  }
415
416  /**
417   * Test encoding/decoding data blocks.
418   * @throws IOException a bug or a problem with temporary files.
419   */
420  @TestTemplate
421  public void testDataBlockEncoding() throws IOException {
422    testInternals();
423  }
424
425  private void testInternals() throws IOException {
426    final int numBlocks = 5;
427    final Configuration conf = TEST_UTIL.getConfiguration();
428    if (includesTag) {
429      conf.setInt("hfile.format.version", 3);
430    }
431    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
432      for (boolean pread : new boolean[] { false, true }) {
433        for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
434          LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}",
435            algo.toString(), pread, encoding);
436          Path path =
437            new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo + "_" + encoding.toString());
438          FSDataOutputStream os = fs.create(path);
439          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE)
440            ? new HFileDataBlockEncoderImpl(encoding)
441            : NoOpDataBlockEncoder.INSTANCE;
442          HFileContext meta = new HFileContextBuilder().withCompression(algo)
443            .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
444            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
445          HFileBlock.Writer hbw = new HFileBlock.Writer(conf, dataBlockEncoder, meta);
446          long totalSize = 0;
447          final List<Integer> encodedSizes = new ArrayList<>();
448          final List<ByteBuff> encodedBlocks = new ArrayList<>();
449          for (int blockId = 0; blockId < numBlocks; ++blockId) {
450            hbw.startWriting(BlockType.DATA);
451            writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
452            hbw.writeHeaderAndData(os);
453            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
454            ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader();
455            final int encodedSize = encodedResultWithHeader.limit() - headerLen;
456            if (encoding != DataBlockEncoding.NONE) {
457              // We need to account for the two-byte encoding algorithm ID that
458              // comes after the 24-byte block header but before encoded KVs.
459              headerLen += DataBlockEncoding.ID_SIZE;
460            }
461            encodedSizes.add(encodedSize);
462            ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice();
463            encodedBlocks.add(encodedBuf);
464            totalSize += hbw.getOnDiskSizeWithHeader();
465          }
466          os.close();
467
468          FSDataInputStream is = fs.open(path);
469          meta = new HFileContextBuilder().withHBaseCheckSum(true).withCompression(algo)
470            .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag).build();
471          ReaderContext context =
472            new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
473              .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
474          HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
475          hbr.setDataBlockEncoder(dataBlockEncoder, conf);
476          hbr.setIncludesMemStoreTS(includesMemstoreTS);
477          HFileBlock blockFromHFile, blockUnpacked;
478          int pos = 0;
479          for (int blockId = 0; blockId < numBlocks; ++blockId) {
480            blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true);
481            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
482            blockFromHFile.sanityCheck();
483            pos += blockFromHFile.getOnDiskSizeWithHeader();
484            assertEquals((int) encodedSizes.get(blockId),
485              blockFromHFile.getUncompressedSizeWithoutHeader());
486            assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
487            long packedHeapsize = blockFromHFile.heapSize();
488            blockUnpacked = blockFromHFile.unpack(meta, hbr);
489            assertTrue(blockUnpacked.isUnpacked());
490            if (meta.isCompressedOrEncrypted()) {
491              LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize="
492                + blockUnpacked.heapSize());
493              assertFalse(packedHeapsize == blockUnpacked.heapSize());
494              assertTrue(packedHeapsize < blockUnpacked.heapSize(),
495                "Packed heapSize should be < unpacked heapSize");
496            }
497            ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader();
498            if (encoding != DataBlockEncoding.NONE) {
499              // We expect a two-byte big-endian encoding id.
500              assertEquals(Long.toHexString(0), Long.toHexString(actualBuffer.get(0)),
501                "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread));
502              assertEquals(Long.toHexString(encoding.getId()),
503                Long.toHexString(actualBuffer.get(1)),
504                "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread));
505              actualBuffer.position(2);
506              actualBuffer = actualBuffer.slice();
507            }
508
509            ByteBuff expectedBuff = encodedBlocks.get(blockId);
510            expectedBuff.rewind();
511
512            // test if content matches, produce nice message
513            assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread);
514
515            // test serialized blocks
516            for (boolean reuseBuffer : new boolean[] { false, true }) {
517              ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
518              blockFromHFile.serialize(serialized, true);
519              HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer()
520                .deserialize(new SingleByteBuff(serialized), HEAP);
521              assertEquals(blockFromHFile, deserialized,
522                "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer);
523              // intentional reference comparison
524              if (blockFromHFile != blockUnpacked) {
525                assertEquals(blockUnpacked, deserialized.unpack(meta, hbr),
526                  "Deserialized block cannot be unpacked correctly.");
527              }
528            }
529            assertRelease(blockUnpacked);
530            if (blockFromHFile != blockUnpacked) {
531              blockFromHFile.release();
532            }
533          }
534          is.close();
535        }
536      }
537    }
538  }
539
540  static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
541    boolean pread) {
542    return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
543  }
544
545  static void assertBuffersEqual(ByteBuff expectedBuffer, ByteBuff actualBuffer,
546    Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) {
547    if (!actualBuffer.equals(expectedBuffer)) {
548      int prefix = 0;
549      int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
550      while (prefix < minLimit && expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
551        prefix++;
552      }
553
554      fail(String.format("Content mismatch for %s, commonPrefix %d, expected %s, got %s",
555        buildMessageDetails(compression, encoding, pread), prefix,
556        nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(actualBuffer, prefix)));
557    }
558  }
559
560  /**
561   * Convert a few next bytes in the given buffer at the given position to string. Used for error
562   * messages.
563   */
564  private static String nextBytesToStr(ByteBuff buf, int pos) {
565    int maxBytes = buf.limit() - pos;
566    int numBytes = Math.min(16, maxBytes);
567    return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, numBytes)
568      + (numBytes < maxBytes ? "..." : "");
569  }
570
571  @TestTemplate
572  public void testPreviousOffset() throws IOException {
573    testPreviousOffsetInternals();
574  }
575
576  protected void testPreviousOffsetInternals() throws IOException {
577    // TODO: parameterize these nested loops.
578    Configuration conf = TEST_UTIL.getConfiguration();
579    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
580      for (boolean pread : BOOLEAN_VALUES) {
581        for (boolean cacheOnWrite : BOOLEAN_VALUES) {
582          Random rand = defaultRandom();
583          LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}",
584            algo.toString(), pread, cacheOnWrite);
585          Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
586          List<Long> expectedOffsets = new ArrayList<>();
587          List<Long> expectedPrevOffsets = new ArrayList<>();
588          List<BlockType> expectedTypes = new ArrayList<>();
589          List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null;
590          long totalSize = writeBlocks(TEST_UTIL.getConfiguration(), rand, algo, path,
591            expectedOffsets, expectedPrevOffsets, expectedTypes, expectedContents);
592
593          FSDataInputStream is = fs.open(path);
594          HFileContext meta =
595            new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS)
596              .withIncludesTags(includesTag).withCompression(algo).build();
597          ReaderContext context =
598            new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
599              .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
600          HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
601          long curOffset = 0;
602          for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
603            if (!pread) {
604              assertEquals(is.getPos(),
605                curOffset + (i == 0 ? 0 : HConstants.HFILEBLOCK_HEADER_SIZE));
606            }
607
608            assertEquals(expectedOffsets.get(i).longValue(), curOffset);
609            if (detailedLogging) {
610              LOG.info("Reading block #" + i + " at offset " + curOffset);
611            }
612            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false);
613            if (detailedLogging) {
614              LOG.info("Block #" + i + ": " + b);
615            }
616            assertEquals(expectedTypes.get(i), b.getBlockType(),
617              "Invalid block #" + i + "'s type:");
618            assertEquals((long) expectedPrevOffsets.get(i), b.getPrevBlockOffset(),
619              "Invalid previous block offset for block " + i + " of " + "type " + b.getBlockType()
620                + ":");
621            b.sanityCheck();
622            assertEquals(curOffset, b.getOffset());
623
624            // Now re-load this block knowing the on-disk size. This tests a
625            // different branch in the loader.
626            HFileBlock b2 =
627              hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false);
628            b2.sanityCheck();
629
630            assertEquals(b.getBlockType(), b2.getBlockType());
631            assertEquals(b.getOnDiskSizeWithoutHeader(), b2.getOnDiskSizeWithoutHeader());
632            assertEquals(b.getOnDiskSizeWithHeader(), b2.getOnDiskSizeWithHeader());
633            assertEquals(b.getUncompressedSizeWithoutHeader(),
634              b2.getUncompressedSizeWithoutHeader());
635            assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
636            assertEquals(curOffset, b2.getOffset());
637            assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
638            assertEquals(b.getOnDiskDataSizeWithHeader(), b2.getOnDiskDataSizeWithHeader());
639            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
640            assertRelease(b2);
641
642            curOffset += b.getOnDiskSizeWithHeader();
643
644            if (cacheOnWrite) {
645              // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
646              // verifies that the unpacked value read back off disk matches the unpacked value
647              // generated before writing to disk.
648              HFileBlock newBlock = b.unpack(meta, hbr);
649              // neither b's unpacked nor the expectedContents have checksum.
650              // they should be identical
651              ByteBuff bufRead = newBlock.getBufferReadOnly();
652              ByteBuffer bufExpected = expectedContents.get(i);
653              byte[] bytesRead = bufRead.toBytes();
654              boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length,
655                bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0;
656              String wrongBytesMsg = "";
657
658              if (!bytesAreCorrect) {
659                // Optimization: only construct an error message in case we
660                // will need it.
661                wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" + algo + ", pread="
662                  + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n";
663                wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
664                  bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n"
665                  + Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length));
666                if (detailedLogging) {
667                  LOG.warn(
668                    "expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected))
669                      + "\nfound    header" + HFileBlock.toStringHeader(bufRead));
670                  LOG.warn("bufread offset " + bufRead.arrayOffset() + " limit " + bufRead.limit()
671                    + " expected offset " + bufExpected.arrayOffset() + " limit "
672                    + bufExpected.limit());
673                  LOG.warn(wrongBytesMsg);
674                }
675              }
676              assertTrue(bytesAreCorrect, wrongBytesMsg);
677              assertRelease(newBlock);
678              if (newBlock != b) {
679                assertRelease(b);
680              }
681            } else {
682              assertRelease(b);
683            }
684          }
685          assertEquals(curOffset, fs.getFileStatus(path).getLen());
686          is.close();
687        }
688      }
689    }
690  }
691
692  private Random defaultRandom() {
693    return new Random(189237);
694  }
695
696  private class BlockReaderThread implements Callable<Boolean> {
697    private final String clientId;
698    private final HFileBlock.FSReader hbr;
699    private final List<Long> offsets;
700    private final List<BlockType> types;
701    private final long fileSize;
702
703    public BlockReaderThread(String clientId, HFileBlock.FSReader hbr, List<Long> offsets,
704      List<BlockType> types, long fileSize) {
705      this.clientId = clientId;
706      this.offsets = offsets;
707      this.hbr = hbr;
708      this.types = types;
709      this.fileSize = fileSize;
710    }
711
712    @Override
713    public Boolean call() throws Exception {
714      Random rand = new Random(clientId.hashCode());
715      long endTime = EnvironmentEdgeManager.currentTime() + 10000;
716      int numBlocksRead = 0;
717      int numPositionalRead = 0;
718      int numWithOnDiskSize = 0;
719      while (EnvironmentEdgeManager.currentTime() < endTime) {
720        int blockId = rand.nextInt(NUM_TEST_BLOCKS);
721        long offset = offsets.get(blockId);
722        // now we only support concurrent read with pread = true
723        boolean pread = true;
724        boolean withOnDiskSize = rand.nextBoolean();
725        long expectedSize =
726          (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset;
727        HFileBlock b = null;
728        try {
729          long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
730          b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
731          if (useHeapAllocator) {
732            assertTrue(!b.isSharedMem());
733          } else {
734            assertTrue(!b.getBlockType().isData() || b.isSharedMem());
735          }
736          assertEquals(types.get(blockId), b.getBlockType());
737          assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
738          assertEquals(offset, b.getOffset());
739        } catch (IOException ex) {
740          LOG.error("Error in client " + clientId + " trying to read block at " + offset
741            + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize, ex);
742          return false;
743        } finally {
744          if (b != null) {
745            b.release();
746          }
747        }
748        ++numBlocksRead;
749        if (pread) {
750          ++numPositionalRead;
751        }
752
753        if (withOnDiskSize) {
754          ++numWithOnDiskSize;
755        }
756      }
757      LOG
758        .info("Client " + clientId + " successfully read " + numBlocksRead + " blocks (with pread: "
759          + numPositionalRead + ", with onDiskSize " + "specified: " + numWithOnDiskSize + ")");
760
761      return true;
762    }
763  }
764
765  @TestTemplate
766  public void testConcurrentReading() throws Exception {
767    testConcurrentReadingInternals();
768  }
769
770  protected void testConcurrentReadingInternals()
771    throws IOException, InterruptedException, ExecutionException {
772    Configuration conf = TEST_UTIL.getConfiguration();
773    for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
774      Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
775      Random rand = defaultRandom();
776      List<Long> offsets = new ArrayList<>();
777      List<BlockType> types = new ArrayList<>();
778      writeBlocks(TEST_UTIL.getConfiguration(), rand, compressAlgo, path, offsets, null, types,
779        null);
780      FSDataInputStream is = fs.open(path);
781      long fileSize = fs.getFileStatus(path).getLen();
782      HFileContext meta =
783        new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS)
784          .withIncludesTags(includesTag).withCompression(compressAlgo).build();
785      ReaderContext context =
786        new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
787          .withFileSize(fileSize).withFilePath(path).withFileSystem(fs).build();
788      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
789
790      Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
791      ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
792
793      for (int i = 0; i < NUM_READER_THREADS; ++i) {
794        ecs.submit(
795          new BlockReaderThread("reader_" + (char) ('A' + i), hbr, offsets, types, fileSize));
796      }
797
798      for (int i = 0; i < NUM_READER_THREADS; ++i) {
799        Future<Boolean> result = ecs.take();
800        assertTrue(result.get());
801        if (detailedLogging) {
802          LOG.info(String.valueOf(i + 1) + " reader threads finished successfully (algo="
803            + compressAlgo + ")");
804        }
805      }
806      is.close();
807    }
808  }
809
810  private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
811    Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
812    List<BlockType> expectedTypes, List<ByteBuffer> expectedContents) throws IOException {
813    boolean cacheOnWrite = expectedContents != null;
814    FSDataOutputStream os = fs.create(path);
815    HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(true)
816      .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
817      .withCompression(compressAlgo).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
818    HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
819    Map<BlockType, Long> prevOffsetByType = new HashMap<>();
820    long totalSize = 0;
821    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
822      long pos = os.getPos();
823      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
824      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
825        blockTypeOrdinal = BlockType.DATA.ordinal();
826      }
827      BlockType bt = BlockType.values()[blockTypeOrdinal];
828      DataOutputStream dos = hbw.startWriting(bt);
829      int size = rand.nextInt(500);
830      for (int j = 0; j < size; ++j) {
831        // This might compress well.
832        dos.writeShort(i + 1);
833        dos.writeInt(j + 1);
834      }
835
836      if (expectedOffsets != null) expectedOffsets.add(os.getPos());
837
838      if (expectedPrevOffsets != null) {
839        Long prevOffset = prevOffsetByType.get(bt);
840        expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
841        prevOffsetByType.put(bt, os.getPos());
842      }
843
844      expectedTypes.add(bt);
845
846      hbw.writeHeaderAndData(os);
847      totalSize += hbw.getOnDiskSizeWithHeader();
848
849      if (cacheOnWrite) {
850        ByteBuff buff = hbw.cloneUncompressedBufferWithHeader();
851        expectedContents.add(buff.asSubByteBuffer(buff.capacity()));
852      }
853
854      if (detailedLogging) {
855        LOG.info("Written block #" + i + " of type " + bt + ", uncompressed size "
856          + hbw.getUncompressedSizeWithoutHeader() + ", packed size "
857          + hbw.getOnDiskSizeWithoutHeader() + " at offset " + pos);
858      }
859    }
860    os.close();
861    LOG.info("Created a temporary file at " + path + ", " + fs.getFileStatus(path).getLen()
862      + " byte, compression=" + compressAlgo);
863    return totalSize;
864  }
865
866  @TestTemplate
867  public void testBlockHeapSize() {
868    testBlockHeapSizeInternals();
869  }
870
871  protected void testBlockHeapSizeInternals() {
872    if (ClassSize.is32BitJVM()) {
873      assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
874    } else {
875      assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
876    }
877
878    for (int size : new int[] { 100, 256, 12345 }) {
879      byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
880      ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
881      HFileContext meta = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
882        .withIncludesTags(includesTag).withHBaseCheckSum(false).withCompression(Algorithm.NONE)
883        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withChecksumType(ChecksumType.NULL)
884        .build();
885      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
886        HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP);
887      long byteBufferExpectedSize =
888        ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true)
889          + HConstants.HFILEBLOCK_HEADER_SIZE + size);
890      long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
891      long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
892      long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
893      assertEquals(expected, block.heapSize(),
894        "Block data size: " + size + ", byte buffer expected " + "size: " + byteBufferExpectedSize
895          + ", HFileBlock class expected " + "size: " + hfileBlockExpectedSize
896          + " HFileContext class expected size: " + hfileMetaSize + "; ");
897    }
898  }
899
900  @TestTemplate
901  public void testSerializeWithoutNextBlockMetadata() {
902    int size = 100;
903    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
904    byte[] byteArr = new byte[length];
905    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
906    HFileContext meta = new HFileContextBuilder().build();
907    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
908      ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
909    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
910      ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
911    ByteBuffer buff1 = ByteBuffer.allocate(length);
912    ByteBuffer buff2 = ByteBuffer.allocate(length);
913    blockWithNextBlockMetadata.serialize(buff1, true);
914    blockWithoutNextBlockMetadata.serialize(buff2, true);
915    assertNotEquals(buff1, buff2);
916    buff1.clear();
917    buff2.clear();
918    blockWithNextBlockMetadata.serialize(buff1, false);
919    blockWithoutNextBlockMetadata.serialize(buff2, false);
920    assertEquals(buff1, buff2);
921  }
922}