001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
022import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
023import static org.junit.Assert.*;
024
025import java.io.ByteArrayOutputStream;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.nio.ByteBuffer;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.Random;
038import java.util.concurrent.Callable;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.Executor;
041import java.util.concurrent.ExecutorCompletionService;
042import java.util.concurrent.Executors;
043import java.util.concurrent.Future;
044
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.fs.FSDataInputStream;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FileSystem;
049import org.apache.hadoop.fs.Path;
050import org.apache.hadoop.hbase.ArrayBackedTag;
051import org.apache.hadoop.hbase.CellComparatorImpl;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.HBaseClassTestRule;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HBaseTestingUtility;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.KeyValue;
058import org.apache.hadoop.hbase.Tag;
059import org.apache.hadoop.hbase.fs.HFileSystem;
060import org.apache.hadoop.hbase.io.ByteBuffAllocator;
061import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
062import org.apache.hadoop.hbase.io.compress.Compression;
063import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
064import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
065import org.apache.hadoop.hbase.nio.ByteBuff;
066import org.apache.hadoop.hbase.nio.MultiByteBuff;
067import org.apache.hadoop.hbase.nio.SingleByteBuff;
068import org.apache.hadoop.hbase.testclassification.IOTests;
069import org.apache.hadoop.hbase.testclassification.LargeTests;
070import org.apache.hadoop.hbase.util.Bytes;
071import org.apache.hadoop.hbase.util.ChecksumType;
072import org.apache.hadoop.hbase.util.ClassSize;
073import org.apache.hadoop.io.WritableUtils;
074import org.apache.hadoop.io.compress.Compressor;
075import org.junit.After;
076import org.junit.Before;
077import org.junit.ClassRule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.runner.RunWith;
081import org.junit.runners.Parameterized;
082import org.junit.runners.Parameterized.Parameters;
083import org.mockito.Mockito;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087@Category({IOTests.class, LargeTests.class})
088@RunWith(Parameterized.class)
089public class TestHFileBlock {
090
091  @ClassRule
092  public static final HBaseClassTestRule CLASS_RULE =
093      HBaseClassTestRule.forClass(TestHFileBlock.class);
094
095  // change this value to activate more logs
096  private static final boolean detailedLogging = false;
097  private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
098
099  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
100
101  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
102
103  private static final int NUM_TEST_BLOCKS = 1000;
104  private static final int NUM_READER_THREADS = 26;
105  private static final int MAX_BUFFER_COUNT = 2048;
106
107  // Used to generate KeyValues
108  private static int NUM_KEYVALUES = 50;
109  private static int FIELD_LENGTH = 10;
110  private static float CHANCE_TO_REPEAT = 0.6f;
111
112  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
113  private FileSystem fs;
114
115  private final boolean includesMemstoreTS;
116  private final boolean includesTag;
117  private final boolean useHeapAllocator;
118  private final ByteBuffAllocator alloc;
119
120  public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) {
121    this.includesMemstoreTS = includesMemstoreTS;
122    this.includesTag = includesTag;
123    this.useHeapAllocator = useHeapAllocator;
124    this.alloc = useHeapAllocator ? HEAP : createOffHeapAlloc();
125    assertAllocator();
126  }
127
128  @Parameters
129  public static Collection<Object[]> parameters() {
130    List<Object[]> params = new ArrayList<>();
131    // Generate boolean triples from 000 to 111
132    for (int i = 0; i < (1 << 3); i++) {
133      Object[] flags = new Boolean[3];
134      for (int k = 0; k < 3; k++) {
135        flags[k] = (i & (1 << k)) != 0;
136      }
137      params.add(flags);
138    }
139    return params;
140  }
141
142  private ByteBuffAllocator createOffHeapAlloc() {
143    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
144    conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT);
145    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
146    ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true);
147    // Fill the allocator
148    List<ByteBuff> bufs = new ArrayList<>();
149    for (int i = 0; i < MAX_BUFFER_COUNT; i++) {
150      ByteBuff bb = alloc.allocateOneBuffer();
151      assertTrue(!bb.hasArray());
152      bufs.add(bb);
153    }
154    bufs.forEach(ByteBuff::release);
155    return alloc;
156  }
157
158  private void assertAllocator() {
159    if (!useHeapAllocator) {
160      assertEquals(MAX_BUFFER_COUNT, alloc.getFreeBufferCount());
161    }
162  }
163
164  @Before
165  public void setUp() throws IOException {
166    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
167  }
168
169  @After
170  public void tearDown() throws IOException {
171    assertAllocator();
172    alloc.clean();
173  }
174
175  static void writeTestBlockContents(DataOutputStream dos) throws IOException {
176    // This compresses really well.
177    for (int i = 0; i < 1000; ++i)
178      dos.writeInt(i / 100);
179  }
180
181  static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
182      boolean useTag) throws IOException {
183    List<KeyValue> keyValues = new ArrayList<>();
184    Random randomizer = new Random(42L + seed); // just any fixed number
185
186    // generate keyValues
187    for (int i = 0; i < NUM_KEYVALUES; ++i) {
188      byte[] row;
189      long timestamp;
190      byte[] family;
191      byte[] qualifier;
192      byte[] value;
193
194      // generate it or repeat, it should compress well
195      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
196        row = CellUtil.cloneRow(keyValues.get(randomizer.nextInt(keyValues.size())));
197      } else {
198        row = new byte[FIELD_LENGTH];
199        randomizer.nextBytes(row);
200      }
201      if (0 == i) {
202        family = new byte[FIELD_LENGTH];
203        randomizer.nextBytes(family);
204      } else {
205        family = CellUtil.cloneFamily(keyValues.get(0));
206      }
207      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
208        qualifier = CellUtil.cloneQualifier(keyValues.get(randomizer.nextInt(keyValues.size())));
209      } else {
210        qualifier = new byte[FIELD_LENGTH];
211        randomizer.nextBytes(qualifier);
212      }
213      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
214        value = CellUtil.cloneValue(keyValues.get(randomizer.nextInt(keyValues.size())));
215      } else {
216        value = new byte[FIELD_LENGTH];
217        randomizer.nextBytes(value);
218      }
219      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
220        timestamp = keyValues.get(
221            randomizer.nextInt(keyValues.size())).getTimestamp();
222      } else {
223        timestamp = randomizer.nextLong();
224      }
225      if (!useTag) {
226        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
227      } else {
228        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value,
229            new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) }));
230      }
231    }
232
233    // sort it and write to stream
234    int totalSize = 0;
235    Collections.sort(keyValues, CellComparatorImpl.COMPARATOR);
236
237    for (KeyValue kv : keyValues) {
238      totalSize += kv.getLength();
239      if (includesMemstoreTS) {
240        long memstoreTS = randomizer.nextLong();
241        kv.setSequenceId(memstoreTS);
242        totalSize += WritableUtils.getVIntSize(memstoreTS);
243      }
244      hbw.write(kv);
245    }
246    return totalSize;
247  }
248
249  public byte[] createTestV1Block(Compression.Algorithm algo)
250      throws IOException {
251    Compressor compressor = algo.getCompressor();
252    ByteArrayOutputStream baos = new ByteArrayOutputStream();
253    OutputStream os = algo.createCompressionStream(baos, compressor, 0);
254    DataOutputStream dos = new DataOutputStream(os);
255    BlockType.META.write(dos); // Let's make this a meta block.
256    writeTestBlockContents(dos);
257    dos.flush();
258    algo.returnCompressor(compressor);
259    return baos.toByteArray();
260  }
261
262  static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
263      boolean includesMemstoreTS, boolean includesTag) throws IOException {
264    final BlockType blockType = BlockType.DATA;
265    HFileContext meta = new HFileContextBuilder()
266                        .withCompression(algo)
267                        .withIncludesMvcc(includesMemstoreTS)
268                        .withIncludesTags(includesTag)
269                        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
270                        .build();
271    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
272    DataOutputStream dos = hbw.startWriting(blockType);
273    writeTestBlockContents(dos);
274    dos.flush();
275    hbw.ensureBlockReady();
276    assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
277    hbw.release();
278    return hbw;
279  }
280
281  public String createTestBlockStr(Compression.Algorithm algo,
282      int correctLength, boolean useTag) throws IOException {
283    HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
284    byte[] testV2Block = hbw.getHeaderAndDataForTest();
285    int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
286    if (testV2Block.length == correctLength) {
287      // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
288      // variations across operating systems.
289      // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
290      // We only make this change when the compressed block length matches.
291      // Otherwise, there are obviously other inconsistencies.
292      testV2Block[osOffset] = 3;
293    }
294    return Bytes.toStringBinary(testV2Block);
295  }
296
297  @Test
298  public void testNoCompression() throws IOException {
299    CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
300    Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty());
301
302    HFileBlock block =
303      createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
304    assertEquals(4000, block.getUncompressedSizeWithoutHeader());
305    assertEquals(4004, block.getOnDiskSizeWithoutHeader());
306    assertTrue(block.isUnpacked());
307  }
308
309  @Test
310  public void testGzipCompression() throws IOException {
311    final String correctTestBlockStr =
312        "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
313            + "\\xFF\\xFF\\xFF\\xFF"
314            + "\\x0" + ChecksumType.getDefaultChecksumType().getCode()
315            + "\\x00\\x00@\\x00\\x00\\x00\\x00["
316            // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
317            + "\\x1F\\x8B"  // gzip magic signature
318            + "\\x08"  // Compression method: 8 = "deflate"
319            + "\\x00"  // Flags
320            + "\\x00\\x00\\x00\\x00"  // mtime
321            + "\\x00"  // XFL (extra flags)
322            // OS (0 = FAT filesystems, 3 = Unix). However, this field
323            // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
324            // This appears to be a difference caused by the availability
325            // (and use) of the native GZ codec.
326            + "\\x03"
327            + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
328            + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
329            + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
330            + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
331    final int correctGzipBlockLength = 95;
332    final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
333    // We ignore the block checksum because createTestBlockStr can change the
334    // gzip header after the block is produced
335    assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
336      testBlockStr.substring(0, correctGzipBlockLength - 4));
337  }
338
339  @Test
340  public void testReaderV2() throws IOException {
341    testReaderV2Internals();
342  }
343
344  private void assertRelease(HFileBlock blk) {
345    if (blk instanceof ExclusiveMemHFileBlock) {
346      assertFalse(blk.release());
347    } else {
348      assertTrue(blk.release());
349    }
350  }
351
352  protected void testReaderV2Internals() throws IOException {
353    if(includesTag) {
354      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
355    }
356    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
357      for (boolean pread : new boolean[] { false, true }) {
358          LOG.info("testReaderV2: Compression algorithm: " + algo +
359                   ", pread=" + pread);
360        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
361            + algo);
362        FSDataOutputStream os = fs.create(path);
363        HFileContext meta = new HFileContextBuilder()
364                           .withCompression(algo)
365                           .withIncludesMvcc(includesMemstoreTS)
366                           .withIncludesTags(includesTag)
367                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
368                           .build();
369        HFileBlock.Writer hbw = new HFileBlock.Writer(null,
370           meta);
371        long totalSize = 0;
372        for (int blockId = 0; blockId < 2; ++blockId) {
373          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
374          for (int i = 0; i < 1234; ++i)
375            dos.writeInt(i);
376          hbw.writeHeaderAndData(os);
377          totalSize += hbw.getOnDiskSizeWithHeader();
378        }
379        os.close();
380
381        FSDataInputStream is = fs.open(path);
382        meta = new HFileContextBuilder()
383            .withHBaseCheckSum(true)
384            .withIncludesMvcc(includesMemstoreTS)
385            .withIncludesTags(includesTag)
386            .withCompression(algo).build();
387        ReaderContext context = new ReaderContextBuilder()
388            .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
389            .withFileSize(totalSize)
390            .withFilePath(path)
391            .withFileSystem(fs)
392            .build();
393        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
394        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
395        is.close();
396        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
397
398        b.sanityCheck();
399        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
400        assertEquals(algo == GZ ? 2173 : 4936,
401                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
402        HFileBlock expected = b;
403
404        if (algo == GZ) {
405          is = fs.open(path);
406          ReaderContext readerContext = new ReaderContextBuilder()
407              .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
408              .withFileSize(totalSize)
409              .withFilePath(path)
410              .withFileSystem(fs)
411              .build();
412          hbr = new HFileBlock.FSReaderImpl(readerContext, meta, alloc);
413          b = hbr.readBlockData(0,
414            2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
415          assertEquals(expected, b);
416          int wrongCompressedSize = 2172;
417          try {
418            hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread,
419              false, true);
420            fail("Exception expected");
421          } catch (IOException ex) {
422            String expectedPrefix = "Passed in onDiskSizeWithHeader=";
423            assertTrue("Invalid exception message: '" + ex.getMessage()
424                + "'.\nMessage is expected to start with: '" + expectedPrefix
425                + "'", ex.getMessage().startsWith(expectedPrefix));
426          }
427          assertRelease(b);
428          is.close();
429        }
430        assertRelease(expected);
431      }
432    }
433  }
434
435  /**
436   * Test encoding/decoding data blocks.
437   * @throws IOException a bug or a problem with temporary files.
438   */
439  @Test
440  public void testDataBlockEncoding() throws IOException {
441    testInternals();
442  }
443
444  private void testInternals() throws IOException {
445    final int numBlocks = 5;
446    if(includesTag) {
447      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
448    }
449    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
450      for (boolean pread : new boolean[] { false, true }) {
451        for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
452          LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}",
453              algo.toString(), pread, encoding);
454          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
455              + algo + "_" + encoding.toString());
456          FSDataOutputStream os = fs.create(path);
457          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
458              new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
459          HFileContext meta = new HFileContextBuilder()
460                              .withCompression(algo)
461                              .withIncludesMvcc(includesMemstoreTS)
462                              .withIncludesTags(includesTag)
463                              .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
464                              .build();
465          HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
466          long totalSize = 0;
467          final List<Integer> encodedSizes = new ArrayList<>();
468          final List<ByteBuff> encodedBlocks = new ArrayList<>();
469          for (int blockId = 0; blockId < numBlocks; ++blockId) {
470            hbw.startWriting(BlockType.DATA);
471            writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
472            hbw.writeHeaderAndData(os);
473            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
474            ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader();
475            final int encodedSize = encodedResultWithHeader.limit() - headerLen;
476            if (encoding != DataBlockEncoding.NONE) {
477              // We need to account for the two-byte encoding algorithm ID that
478              // comes after the 24-byte block header but before encoded KVs.
479              headerLen += DataBlockEncoding.ID_SIZE;
480            }
481            encodedSizes.add(encodedSize);
482            ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice();
483            encodedBlocks.add(encodedBuf);
484            totalSize += hbw.getOnDiskSizeWithHeader();
485          }
486          os.close();
487
488          FSDataInputStream is = fs.open(path);
489          meta = new HFileContextBuilder()
490                .withHBaseCheckSum(true)
491                .withCompression(algo)
492                .withIncludesMvcc(includesMemstoreTS)
493                .withIncludesTags(includesTag)
494                .build();
495          ReaderContext context = new ReaderContextBuilder()
496              .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
497              .withFileSize(totalSize)
498              .withFilePath(path)
499              .withFileSystem(fs)
500              .build();
501          HFileBlock.FSReaderImpl hbr =
502              new HFileBlock.FSReaderImpl(context, meta, alloc);
503          hbr.setDataBlockEncoder(dataBlockEncoder);
504          hbr.setIncludesMemStoreTS(includesMemstoreTS);
505          HFileBlock blockFromHFile, blockUnpacked;
506          int pos = 0;
507          for (int blockId = 0; blockId < numBlocks; ++blockId) {
508            blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true);
509            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
510            blockFromHFile.sanityCheck();
511            pos += blockFromHFile.getOnDiskSizeWithHeader();
512            assertEquals((int) encodedSizes.get(blockId),
513              blockFromHFile.getUncompressedSizeWithoutHeader());
514            assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
515            long packedHeapsize = blockFromHFile.heapSize();
516            blockUnpacked = blockFromHFile.unpack(meta, hbr);
517            assertTrue(blockUnpacked.isUnpacked());
518            if (meta.isCompressedOrEncrypted()) {
519              LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
520                .heapSize());
521              assertFalse(packedHeapsize == blockUnpacked.heapSize());
522              assertTrue("Packed heapSize should be < unpacked heapSize",
523                packedHeapsize < blockUnpacked.heapSize());
524            }
525            ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader();
526            if (encoding != DataBlockEncoding.NONE) {
527              // We expect a two-byte big-endian encoding id.
528              assertEquals(
529                "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
530                Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
531              assertEquals(
532                "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
533                Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
534              actualBuffer.position(2);
535              actualBuffer = actualBuffer.slice();
536            }
537
538            ByteBuff expectedBuff = encodedBlocks.get(blockId);
539            expectedBuff.rewind();
540
541            // test if content matches, produce nice message
542            assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread);
543
544            // test serialized blocks
545            for (boolean reuseBuffer : new boolean[] { false, true }) {
546              ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
547              blockFromHFile.serialize(serialized, true);
548              HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer()
549                  .deserialize(new SingleByteBuff(serialized), HEAP);
550              assertEquals("Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
551                blockFromHFile, deserialized);
552              // intentional reference comparison
553              if (blockFromHFile != blockUnpacked) {
554                assertEquals("Deserialized block cannot be unpacked correctly.", blockUnpacked,
555                  deserialized.unpack(meta, hbr));
556              }
557            }
558            assertRelease(blockUnpacked);
559            if (blockFromHFile != blockUnpacked) {
560              blockFromHFile.release();
561            }
562          }
563          is.close();
564        }
565      }
566    }
567  }
568
569  static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
570      boolean pread) {
571    return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
572  }
573
574  static void assertBuffersEqual(ByteBuff expectedBuffer,
575      ByteBuff actualBuffer, Compression.Algorithm compression,
576      DataBlockEncoding encoding, boolean pread) {
577    if (!actualBuffer.equals(expectedBuffer)) {
578      int prefix = 0;
579      int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
580      while (prefix < minLimit &&
581          expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
582        prefix++;
583      }
584
585      fail(String.format(
586          "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
587          buildMessageDetails(compression, encoding, pread), prefix,
588          nextBytesToStr(expectedBuffer, prefix),
589          nextBytesToStr(actualBuffer, prefix)));
590    }
591  }
592
593  /**
594   * Convert a few next bytes in the given buffer at the given position to
595   * string. Used for error messages.
596   */
597  private static String nextBytesToStr(ByteBuff buf, int pos) {
598    int maxBytes = buf.limit() - pos;
599    int numBytes = Math.min(16, maxBytes);
600    return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
601        numBytes) + (numBytes < maxBytes ? "..." : "");
602  }
603
604  @Test
605  public void testPreviousOffset() throws IOException {
606    testPreviousOffsetInternals();
607  }
608
609  protected void testPreviousOffsetInternals() throws IOException {
610    // TODO: parameterize these nested loops.
611    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
612      for (boolean pread : BOOLEAN_VALUES) {
613        for (boolean cacheOnWrite : BOOLEAN_VALUES) {
614          Random rand = defaultRandom();
615          LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}",
616              algo.toString(), pread, cacheOnWrite);
617          Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
618          List<Long> expectedOffsets = new ArrayList<>();
619          List<Long> expectedPrevOffsets = new ArrayList<>();
620          List<BlockType> expectedTypes = new ArrayList<>();
621          List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null;
622          long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
623              expectedPrevOffsets, expectedTypes, expectedContents);
624
625          FSDataInputStream is = fs.open(path);
626          HFileContext meta = new HFileContextBuilder()
627                              .withHBaseCheckSum(true)
628                              .withIncludesMvcc(includesMemstoreTS)
629                              .withIncludesTags(includesTag)
630                              .withCompression(algo).build();
631          ReaderContext context = new ReaderContextBuilder()
632              .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
633              .withFileSize(totalSize)
634              .withFilePath(path)
635              .withFileSystem(fs)
636              .build();
637          HFileBlock.FSReader hbr =
638              new HFileBlock.FSReaderImpl(context, meta, alloc);
639          long curOffset = 0;
640          for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
641            if (!pread) {
642              assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
643                HConstants.HFILEBLOCK_HEADER_SIZE));
644            }
645
646            assertEquals(expectedOffsets.get(i).longValue(), curOffset);
647            if (detailedLogging) {
648              LOG.info("Reading block #" + i + " at offset " + curOffset);
649            }
650            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false);
651            if (detailedLogging) {
652              LOG.info("Block #" + i + ": " + b);
653            }
654            assertEquals("Invalid block #" + i + "'s type:",
655                expectedTypes.get(i), b.getBlockType());
656            assertEquals("Invalid previous block offset for block " + i
657                + " of " + "type " + b.getBlockType() + ":",
658                (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
659            b.sanityCheck();
660            assertEquals(curOffset, b.getOffset());
661
662            // Now re-load this block knowing the on-disk size. This tests a
663            // different branch in the loader.
664            HFileBlock b2 =
665                hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false);
666            b2.sanityCheck();
667
668            assertEquals(b.getBlockType(), b2.getBlockType());
669            assertEquals(b.getOnDiskSizeWithoutHeader(),
670                b2.getOnDiskSizeWithoutHeader());
671            assertEquals(b.getOnDiskSizeWithHeader(),
672                b2.getOnDiskSizeWithHeader());
673            assertEquals(b.getUncompressedSizeWithoutHeader(),
674                b2.getUncompressedSizeWithoutHeader());
675            assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
676            assertEquals(curOffset, b2.getOffset());
677            assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
678            assertEquals(b.getOnDiskDataSizeWithHeader(),
679                         b2.getOnDiskDataSizeWithHeader());
680            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
681            assertRelease(b2);
682
683            curOffset += b.getOnDiskSizeWithHeader();
684
685            if (cacheOnWrite) {
686              // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
687              // verifies that the unpacked value read back off disk matches the unpacked value
688              // generated before writing to disk.
689              HFileBlock newBlock = b.unpack(meta, hbr);
690              // b's buffer has header + data + checksum while
691              // expectedContents have header + data only
692              ByteBuff bufRead = newBlock.getBufferReadOnly();
693              ByteBuffer bufExpected = expectedContents.get(i);
694              byte[] tmp = new byte[bufRead.limit() - newBlock.totalChecksumBytes()];
695              bufRead.get(tmp, 0, tmp.length);
696              boolean bytesAreCorrect = Bytes.compareTo(tmp, 0, tmp.length, bufExpected.array(),
697                bufExpected.arrayOffset(), bufExpected.limit()) == 0;
698              String wrongBytesMsg = "";
699
700              if (!bytesAreCorrect) {
701                // Optimization: only construct an error message in case we
702                // will need it.
703                wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
704                    + algo + ", pread=" + pread
705                    + ", cacheOnWrite=" + cacheOnWrite + "):\n";
706                wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
707                  bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
708                    + ", actual:\n"
709                    + Bytes.toStringBinary(bufRead.array(),
710                  bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
711                if (detailedLogging) {
712                  LOG.warn("expected header" +
713                           HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) +
714                           "\nfound    header" +
715                           HFileBlock.toStringHeader(bufRead));
716                  LOG.warn("bufread offset " + bufRead.arrayOffset() +
717                           " limit " + bufRead.limit() +
718                           " expected offset " + bufExpected.arrayOffset() +
719                           " limit " + bufExpected.limit());
720                  LOG.warn(wrongBytesMsg);
721                }
722              }
723              assertTrue(wrongBytesMsg, bytesAreCorrect);
724              assertRelease(newBlock);
725              if (newBlock != b) {
726                assertRelease(b);
727              }
728            } else {
729              assertRelease(b);
730            }
731          }
732          assertEquals(curOffset, fs.getFileStatus(path).getLen());
733          is.close();
734        }
735      }
736    }
737  }
738
739  private Random defaultRandom() {
740    return new Random(189237);
741  }
742
743  private class BlockReaderThread implements Callable<Boolean> {
744    private final String clientId;
745    private final HFileBlock.FSReader hbr;
746    private final List<Long> offsets;
747    private final List<BlockType> types;
748    private final long fileSize;
749
750    public BlockReaderThread(String clientId,
751        HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
752        long fileSize) {
753      this.clientId = clientId;
754      this.offsets = offsets;
755      this.hbr = hbr;
756      this.types = types;
757      this.fileSize = fileSize;
758    }
759
760    @Override
761    public Boolean call() throws Exception {
762      Random rand = new Random(clientId.hashCode());
763      long endTime = System.currentTimeMillis() + 10000;
764      int numBlocksRead = 0;
765      int numPositionalRead = 0;
766      int numWithOnDiskSize = 0;
767      while (System.currentTimeMillis() < endTime) {
768        int blockId = rand.nextInt(NUM_TEST_BLOCKS);
769        long offset = offsets.get(blockId);
770        // now we only support concurrent read with pread = true
771        boolean pread = true;
772        boolean withOnDiskSize = rand.nextBoolean();
773        long expectedSize =
774            (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset;
775        HFileBlock b = null;
776        try {
777          long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
778          b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
779          if (useHeapAllocator) {
780            assertTrue(!b.isSharedMem());
781          } else {
782            assertTrue(!b.getBlockType().isData() || b.isSharedMem());
783          }
784          assertEquals(types.get(blockId), b.getBlockType());
785          assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
786          assertEquals(offset, b.getOffset());
787        } catch (IOException ex) {
788          LOG.error("Error in client " + clientId + " trying to read block at " + offset
789              + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize,
790            ex);
791          return false;
792        } finally {
793          if (b != null) {
794            b.release();
795          }
796        }
797        ++numBlocksRead;
798        if (pread) {
799          ++numPositionalRead;
800        }
801
802        if (withOnDiskSize) {
803          ++numWithOnDiskSize;
804        }
805      }
806      LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
807        " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
808        "specified: " + numWithOnDiskSize + ")");
809
810      return true;
811    }
812  }
813
814  @Test
815  public void testConcurrentReading() throws Exception {
816    testConcurrentReadingInternals();
817  }
818
819  protected void testConcurrentReadingInternals() throws IOException,
820      InterruptedException, ExecutionException {
821    for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
822      Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
823      Random rand = defaultRandom();
824      List<Long> offsets = new ArrayList<>();
825      List<BlockType> types = new ArrayList<>();
826      writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
827      FSDataInputStream is = fs.open(path);
828      long fileSize = fs.getFileStatus(path).getLen();
829      HFileContext meta = new HFileContextBuilder()
830                          .withHBaseCheckSum(true)
831                          .withIncludesMvcc(includesMemstoreTS)
832                          .withIncludesTags(includesTag)
833                          .withCompression(compressAlgo)
834                          .build();
835      ReaderContext context = new ReaderContextBuilder()
836          .withInputStreamWrapper(new FSDataInputStreamWrapper(is))
837          .withFileSize(fileSize)
838          .withFilePath(path)
839          .withFileSystem(fs)
840          .build();
841      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
842
843      Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
844      ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
845
846      for (int i = 0; i < NUM_READER_THREADS; ++i) {
847        ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
848            offsets, types, fileSize));
849      }
850
851      for (int i = 0; i < NUM_READER_THREADS; ++i) {
852        Future<Boolean> result = ecs.take();
853        assertTrue(result.get());
854        if (detailedLogging) {
855          LOG.info(String.valueOf(i + 1)
856            + " reader threads finished successfully (algo=" + compressAlgo
857            + ")");
858        }
859      }
860      is.close();
861    }
862  }
863
864  private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
865      Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
866      List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
867  ) throws IOException {
868    boolean cacheOnWrite = expectedContents != null;
869    FSDataOutputStream os = fs.create(path);
870    HFileContext meta = new HFileContextBuilder()
871                        .withHBaseCheckSum(true)
872                        .withIncludesMvcc(includesMemstoreTS)
873                        .withIncludesTags(includesTag)
874                        .withCompression(compressAlgo)
875                        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
876                        .build();
877    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
878    Map<BlockType, Long> prevOffsetByType = new HashMap<>();
879    long totalSize = 0;
880    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
881      long pos = os.getPos();
882      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
883      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
884        blockTypeOrdinal = BlockType.DATA.ordinal();
885      }
886      BlockType bt = BlockType.values()[blockTypeOrdinal];
887      DataOutputStream dos = hbw.startWriting(bt);
888      int size = rand.nextInt(500);
889      for (int j = 0; j < size; ++j) {
890        // This might compress well.
891        dos.writeShort(i + 1);
892        dos.writeInt(j + 1);
893      }
894
895      if (expectedOffsets != null)
896        expectedOffsets.add(os.getPos());
897
898      if (expectedPrevOffsets != null) {
899        Long prevOffset = prevOffsetByType.get(bt);
900        expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
901        prevOffsetByType.put(bt, os.getPos());
902      }
903
904      expectedTypes.add(bt);
905
906      hbw.writeHeaderAndData(os);
907      totalSize += hbw.getOnDiskSizeWithHeader();
908
909      if (cacheOnWrite) {
910        ByteBuff buff = hbw.cloneUncompressedBufferWithHeader();
911        expectedContents.add(buff.asSubByteBuffer(buff.capacity()));
912      }
913
914      if (detailedLogging) {
915        LOG.info("Written block #" + i + " of type " + bt
916            + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
917            + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
918            + " at offset " + pos);
919      }
920    }
921    os.close();
922    LOG.info("Created a temporary file at " + path + ", "
923        + fs.getFileStatus(path).getLen() + " byte, compression=" +
924        compressAlgo);
925    return totalSize;
926  }
927
928  @Test
929  public void testBlockHeapSize() {
930    testBlockHeapSizeInternals();
931  }
932
933  protected void testBlockHeapSizeInternals() {
934    if (ClassSize.is32BitJVM()) {
935      assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
936    } else {
937      assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
938    }
939
940    for (int size : new int[] { 100, 256, 12345 }) {
941      byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
942      ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
943      HFileContext meta = new HFileContextBuilder()
944                          .withIncludesMvcc(includesMemstoreTS)
945                          .withIncludesTags(includesTag)
946                          .withHBaseCheckSum(false)
947                          .withCompression(Algorithm.NONE)
948                          .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
949                          .withChecksumType(ChecksumType.NULL).build();
950      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
951          HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP);
952      long byteBufferExpectedSize =
953          ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true)
954              + HConstants.HFILEBLOCK_HEADER_SIZE + size);
955      long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
956      long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
957      long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
958      assertEquals("Block data size: " + size + ", byte buffer expected " +
959          "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
960          "size: " + hfileBlockExpectedSize + " HFileContext class expected size: "
961              + hfileMetaSize + "; ", expected,
962          block.heapSize());
963    }
964  }
965
966  @Test
967  public void testSerializeWithoutNextBlockMetadata() {
968    int size = 100;
969    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
970    byte[] byteArr = new byte[length];
971    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
972    HFileContext meta = new HFileContextBuilder().build();
973    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
974        ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
975    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
976        ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
977    ByteBuffer buff1 = ByteBuffer.allocate(length);
978    ByteBuffer buff2 = ByteBuffer.allocate(length);
979    blockWithNextBlockMetadata.serialize(buff1, true);
980    blockWithoutNextBlockMetadata.serialize(buff2, true);
981    assertNotEquals(buff1, buff2);
982    buff1.clear();
983    buff2.clear();
984    blockWithNextBlockMetadata.serialize(buff1, false);
985    blockWithoutNextBlockMetadata.serialize(buff2, false);
986    assertEquals(buff1, buff2);
987  }
988}