001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
022import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
023import static org.junit.Assert.*;
024
025import java.io.ByteArrayOutputStream;
026import java.io.DataOutputStream;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.nio.ByteBuffer;
030import java.util.ArrayList;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.Random;
038import java.util.concurrent.Callable;
039import java.util.concurrent.ExecutionException;
040import java.util.concurrent.Executor;
041import java.util.concurrent.ExecutorCompletionService;
042import java.util.concurrent.Executors;
043import java.util.concurrent.Future;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FSDataInputStream;
046import org.apache.hadoop.fs.FSDataOutputStream;
047import org.apache.hadoop.fs.FileSystem;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.hbase.ArrayBackedTag;
050import org.apache.hadoop.hbase.CellComparatorImpl;
051import org.apache.hadoop.hbase.CellUtil;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtil;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.KeyValue;
057import org.apache.hadoop.hbase.Tag;
058import org.apache.hadoop.hbase.fs.HFileSystem;
059import org.apache.hadoop.hbase.io.ByteBuffAllocator;
060import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
061import org.apache.hadoop.hbase.io.compress.Compression;
062import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
063import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
064import org.apache.hadoop.hbase.nio.ByteBuff;
065import org.apache.hadoop.hbase.nio.MultiByteBuff;
066import org.apache.hadoop.hbase.nio.SingleByteBuff;
067import org.apache.hadoop.hbase.testclassification.IOTests;
068import org.apache.hadoop.hbase.testclassification.LargeTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.ChecksumType;
071import org.apache.hadoop.hbase.util.ClassSize;
072import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
073import org.apache.hadoop.io.WritableUtils;
074import org.apache.hadoop.io.compress.Compressor;
075import org.junit.After;
076import org.junit.Before;
077import org.junit.ClassRule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.runner.RunWith;
081import org.junit.runners.Parameterized;
082import org.junit.runners.Parameterized.Parameters;
083import org.mockito.Mockito;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087@Category({ IOTests.class, LargeTests.class })
088@RunWith(Parameterized.class)
089public class TestHFileBlock {
090
091  @ClassRule
092  public static final HBaseClassTestRule CLASS_RULE =
093    HBaseClassTestRule.forClass(TestHFileBlock.class);
094
095  // change this value to activate more logs
096  private static final boolean detailedLogging = false;
097  private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
098
099  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
100
101  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
102
103  private static final int NUM_TEST_BLOCKS = 1000;
104  private static final int NUM_READER_THREADS = 26;
105  private static final int MAX_BUFFER_COUNT = 2048;
106
107  // Used to generate KeyValues
108  private static int NUM_KEYVALUES = 50;
109  private static int FIELD_LENGTH = 10;
110  private static float CHANCE_TO_REPEAT = 0.6f;
111
112  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
113  private static final Random RNG = new Random(); // This test depends on Random#setSeed
114  private FileSystem fs;
115
116  private final boolean includesMemstoreTS;
117  private final boolean includesTag;
118  private final boolean useHeapAllocator;
119  private final ByteBuffAllocator alloc;
120
121  public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) {
122    this.includesMemstoreTS = includesMemstoreTS;
123    this.includesTag = includesTag;
124    this.useHeapAllocator = useHeapAllocator;
125    this.alloc = useHeapAllocator ? HEAP : createOffHeapAlloc();
126    assertAllocator();
127  }
128
129  @Parameters
130  public static Collection<Object[]> parameters() {
131    List<Object[]> params = new ArrayList<>();
132    // Generate boolean triples from 000 to 111
133    for (int i = 0; i < (1 << 3); i++) {
134      Object[] flags = new Boolean[3];
135      for (int k = 0; k < 3; k++) {
136        flags[k] = (i & (1 << k)) != 0;
137      }
138      params.add(flags);
139    }
140    return params;
141  }
142
143  private ByteBuffAllocator createOffHeapAlloc() {
144    Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
145    conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT);
146    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
147    ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true);
148    // Fill the allocator
149    List<ByteBuff> bufs = new ArrayList<>();
150    for (int i = 0; i < MAX_BUFFER_COUNT; i++) {
151      ByteBuff bb = alloc.allocateOneBuffer();
152      assertTrue(!bb.hasArray());
153      bufs.add(bb);
154    }
155    bufs.forEach(ByteBuff::release);
156    return alloc;
157  }
158
159  private void assertAllocator() {
160    if (!useHeapAllocator) {
161      assertEquals(MAX_BUFFER_COUNT, alloc.getFreeBufferCount());
162    }
163  }
164
165  @Before
166  public void setUp() throws IOException {
167    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
168  }
169
170  @After
171  public void tearDown() throws IOException {
172    assertAllocator();
173    alloc.clean();
174  }
175
176  static void writeTestBlockContents(DataOutputStream dos) throws IOException {
177    // This compresses really well.
178    for (int i = 0; i < 1000; ++i)
179      dos.writeInt(i / 100);
180  }
181
182  static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
183    boolean useTag) throws IOException {
184    List<KeyValue> keyValues = new ArrayList<>();
185
186    // generate keyValues
187    RNG.setSeed(42); // just any fixed number
188    for (int i = 0; i < NUM_KEYVALUES; ++i) {
189      byte[] row;
190      long timestamp;
191      byte[] family;
192      byte[] qualifier;
193      byte[] value;
194
195      // generate it or repeat, it should compress well
196      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
197        row = CellUtil.cloneRow(keyValues.get(RNG.nextInt(keyValues.size())));
198      } else {
199        row = new byte[FIELD_LENGTH];
200        RNG.nextBytes(row);
201      }
202      if (0 == i) {
203        family = new byte[FIELD_LENGTH];
204        RNG.nextBytes(family);
205      } else {
206        family = CellUtil.cloneFamily(keyValues.get(0));
207      }
208      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
209        qualifier = CellUtil.cloneQualifier(keyValues.get(RNG.nextInt(keyValues.size())));
210      } else {
211        qualifier = new byte[FIELD_LENGTH];
212        RNG.nextBytes(qualifier);
213      }
214      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
215        value = CellUtil.cloneValue(keyValues.get(RNG.nextInt(keyValues.size())));
216      } else {
217        value = new byte[FIELD_LENGTH];
218        RNG.nextBytes(value);
219      }
220      if (0 < i && RNG.nextFloat() < CHANCE_TO_REPEAT) {
221        timestamp = keyValues.get(RNG.nextInt(keyValues.size())).getTimestamp();
222      } else {
223        timestamp = RNG.nextLong();
224      }
225      if (!useTag) {
226        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
227      } else {
228        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value,
229          new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) }));
230      }
231    }
232
233    // sort it and write to stream
234    int totalSize = 0;
235    Collections.sort(keyValues, CellComparatorImpl.COMPARATOR);
236
237    for (KeyValue kv : keyValues) {
238      totalSize += kv.getLength();
239      if (includesMemstoreTS) {
240        long memstoreTS = RNG.nextLong();
241        kv.setSequenceId(memstoreTS);
242        totalSize += WritableUtils.getVIntSize(memstoreTS);
243      }
244      hbw.write(kv);
245    }
246    return totalSize;
247  }
248
249  public byte[] createTestV1Block(Compression.Algorithm algo) throws IOException {
250    Compressor compressor = algo.getCompressor();
251    ByteArrayOutputStream baos = new ByteArrayOutputStream();
252    OutputStream os = algo.createCompressionStream(baos, compressor, 0);
253    DataOutputStream dos = new DataOutputStream(os);
254    BlockType.META.write(dos); // Let's make this a meta block.
255    writeTestBlockContents(dos);
256    dos.flush();
257    algo.returnCompressor(compressor);
258    return baos.toByteArray();
259  }
260
261  static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo, boolean includesMemstoreTS,
262    boolean includesTag) throws IOException {
263    final BlockType blockType = BlockType.DATA;
264    HFileContext meta = new HFileContextBuilder().withCompression(algo)
265      .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
266      .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
267    HFileBlock.Writer hbw = new HFileBlock.Writer(TEST_UTIL.getConfiguration(), null, meta);
268    DataOutputStream dos = hbw.startWriting(blockType);
269    writeTestBlockContents(dos);
270    dos.flush();
271    hbw.ensureBlockReady();
272    assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
273    hbw.release();
274    return hbw;
275  }
276
277  public String createTestBlockStr(Compression.Algorithm algo, int correctLength, boolean useTag)
278    throws IOException {
279    HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
280    byte[] testV2Block = hbw.getHeaderAndDataForTest();
281    int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
282    if (testV2Block.length == correctLength) {
283      // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
284      // variations across operating systems.
285      // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
286      // We only make this change when the compressed block length matches.
287      // Otherwise, there are obviously other inconsistencies.
288      testV2Block[osOffset] = 3;
289    }
290    return Bytes.toStringBinary(testV2Block);
291  }
292
293  @Test
294  public void testNoCompression() throws IOException {
295    CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
296    Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty());
297
298    HFileBlock block =
299      createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
300    assertEquals(4000, block.getUncompressedSizeWithoutHeader());
301    assertEquals(4004, block.getOnDiskSizeWithoutHeader());
302    assertTrue(block.isUnpacked());
303  }
304
305  @Test
306  public void testGzipCompression() throws IOException {
307    // @formatter:off
308    String correctTestBlockStr = "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
309      + "\\xFF\\xFF\\xFF\\xFF"
310      + "\\x0" + ChecksumType.getDefaultChecksumType().getCode()
311      + "\\x00\\x00@\\x00\\x00\\x00\\x00["
312      // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
313      + "\\x1F\\x8B"  // gzip magic signature
314      + "\\x08"  // Compression method: 8 = "deflate"
315      + "\\x00"  // Flags
316      + "\\x00\\x00\\x00\\x00"  // mtime
317      + "\\x00"  // XFL (extra flags)
318      // OS (0 = FAT filesystems, 3 = Unix). However, this field
319      // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
320      // This appears to be a difference caused by the availability
321      // (and use) of the native GZ codec.
322      + "\\x03"
323      + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
324      + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
325      + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
326      + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
327    // @formatter:on
328    int correctGzipBlockLength = 95;
329    String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
330    // We ignore the block checksum because createTestBlockStr can change the
331    // gzip header after the block is produced
332    assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
333      testBlockStr.substring(0, correctGzipBlockLength - 4));
334  }
335
336  @Test
337  public void testReaderV2() throws IOException {
338    testReaderV2Internals();
339  }
340
341  private void assertRelease(HFileBlock blk) {
342    if (blk instanceof ExclusiveMemHFileBlock) {
343      assertFalse(blk.release());
344    } else {
345      assertTrue(blk.release());
346    }
347  }
348
349  protected void testReaderV2Internals() throws IOException {
350    final Configuration conf = TEST_UTIL.getConfiguration();
351    if (includesTag) {
352      conf.setInt("hfile.format.version", 3);
353    }
354    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
355      for (boolean pread : new boolean[] { false, true }) {
356        LOG.info("testReaderV2: Compression algorithm: " + algo + ", pread=" + pread);
357        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo);
358        FSDataOutputStream os = fs.create(path);
359        HFileContext meta = new HFileContextBuilder().withCompression(algo)
360          .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
361          .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
362        HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
363        long totalSize = 0;
364        for (int blockId = 0; blockId < 2; ++blockId) {
365          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
366          for (int i = 0; i < 1234; ++i)
367            dos.writeInt(i);
368          hbw.writeHeaderAndData(os);
369          totalSize += hbw.getOnDiskSizeWithHeader();
370        }
371        os.close();
372
373        FSDataInputStream is = fs.open(path);
374        meta =
375          new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS)
376            .withIncludesTags(includesTag).withCompression(algo).build();
377        ReaderContext context =
378          new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
379            .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
380        HFileBlock.FSReader hbr =
381          new HFileBlock.FSReaderImpl(context, meta, alloc, TEST_UTIL.getConfiguration());
382        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
383        is.close();
384        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
385
386        b.sanityCheck();
387        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
388        assertEquals(algo == GZ ? 2173 : 4936,
389          b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
390        HFileBlock expected = b;
391
392        if (algo == GZ) {
393          is = fs.open(path);
394          ReaderContext readerContext =
395            new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
396              .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
397          hbr =
398            new HFileBlock.FSReaderImpl(readerContext, meta, alloc, TEST_UTIL.getConfiguration());
399          b = hbr.readBlockData(0,
400            2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
401          assertEquals(expected, b);
402          int wrongCompressedSize = 2172;
403          try {
404            hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread,
405              false, true);
406            fail("Exception expected");
407          } catch (IOException ex) {
408            String expectedPrefix = "Passed in onDiskSizeWithHeader=";
409            assertTrue(
410              "Invalid exception message: '" + ex.getMessage()
411                + "'.\nMessage is expected to start with: '" + expectedPrefix + "'",
412              ex.getMessage().startsWith(expectedPrefix));
413          }
414          assertRelease(b);
415          is.close();
416        }
417        assertRelease(expected);
418      }
419    }
420  }
421
422  /**
423   * Test encoding/decoding data blocks.
424   * @throws IOException a bug or a problem with temporary files.
425   */
426  @Test
427  public void testDataBlockEncoding() throws IOException {
428    testInternals();
429  }
430
431  private void testInternals() throws IOException {
432    final int numBlocks = 5;
433    final Configuration conf = TEST_UTIL.getConfiguration();
434    if (includesTag) {
435      conf.setInt("hfile.format.version", 3);
436    }
437    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
438      for (boolean pread : new boolean[] { false, true }) {
439        for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
440          LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}",
441            algo.toString(), pread, encoding);
442          Path path =
443            new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_" + algo + "_" + encoding.toString());
444          FSDataOutputStream os = fs.create(path);
445          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE)
446            ? new HFileDataBlockEncoderImpl(encoding)
447            : NoOpDataBlockEncoder.INSTANCE;
448          HFileContext meta = new HFileContextBuilder().withCompression(algo)
449            .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
450            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
451          HFileBlock.Writer hbw = new HFileBlock.Writer(conf, dataBlockEncoder, meta);
452          long totalSize = 0;
453          final List<Integer> encodedSizes = new ArrayList<>();
454          final List<ByteBuff> encodedBlocks = new ArrayList<>();
455          for (int blockId = 0; blockId < numBlocks; ++blockId) {
456            hbw.startWriting(BlockType.DATA);
457            writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
458            hbw.writeHeaderAndData(os);
459            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
460            ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader();
461            final int encodedSize = encodedResultWithHeader.limit() - headerLen;
462            if (encoding != DataBlockEncoding.NONE) {
463              // We need to account for the two-byte encoding algorithm ID that
464              // comes after the 24-byte block header but before encoded KVs.
465              headerLen += DataBlockEncoding.ID_SIZE;
466            }
467            encodedSizes.add(encodedSize);
468            ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice();
469            encodedBlocks.add(encodedBuf);
470            totalSize += hbw.getOnDiskSizeWithHeader();
471          }
472          os.close();
473
474          FSDataInputStream is = fs.open(path);
475          meta = new HFileContextBuilder().withHBaseCheckSum(true).withCompression(algo)
476            .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag).build();
477          ReaderContext context =
478            new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
479              .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
480          HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
481          hbr.setDataBlockEncoder(dataBlockEncoder, conf);
482          hbr.setIncludesMemStoreTS(includesMemstoreTS);
483          HFileBlock blockFromHFile, blockUnpacked;
484          int pos = 0;
485          for (int blockId = 0; blockId < numBlocks; ++blockId) {
486            blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true);
487            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
488            blockFromHFile.sanityCheck();
489            pos += blockFromHFile.getOnDiskSizeWithHeader();
490            assertEquals((int) encodedSizes.get(blockId),
491              blockFromHFile.getUncompressedSizeWithoutHeader());
492            assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
493            long packedHeapsize = blockFromHFile.heapSize();
494            blockUnpacked = blockFromHFile.unpack(meta, hbr);
495            assertTrue(blockUnpacked.isUnpacked());
496            if (meta.isCompressedOrEncrypted()) {
497              LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize="
498                + blockUnpacked.heapSize());
499              assertFalse(packedHeapsize == blockUnpacked.heapSize());
500              assertTrue("Packed heapSize should be < unpacked heapSize",
501                packedHeapsize < blockUnpacked.heapSize());
502            }
503            ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader();
504            if (encoding != DataBlockEncoding.NONE) {
505              // We expect a two-byte big-endian encoding id.
506              assertEquals(
507                "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
508                Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
509              assertEquals(
510                "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
511                Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
512              actualBuffer.position(2);
513              actualBuffer = actualBuffer.slice();
514            }
515
516            ByteBuff expectedBuff = encodedBlocks.get(blockId);
517            expectedBuff.rewind();
518
519            // test if content matches, produce nice message
520            assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread);
521
522            // test serialized blocks
523            for (boolean reuseBuffer : new boolean[] { false, true }) {
524              ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
525              blockFromHFile.serialize(serialized, true);
526              HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer()
527                .deserialize(new SingleByteBuff(serialized), HEAP);
528              assertEquals("Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
529                blockFromHFile, deserialized);
530              // intentional reference comparison
531              if (blockFromHFile != blockUnpacked) {
532                assertEquals("Deserialized block cannot be unpacked correctly.", blockUnpacked,
533                  deserialized.unpack(meta, hbr));
534              }
535            }
536            assertRelease(blockUnpacked);
537            if (blockFromHFile != blockUnpacked) {
538              blockFromHFile.release();
539            }
540          }
541          is.close();
542        }
543      }
544    }
545  }
546
547  static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
548    boolean pread) {
549    return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
550  }
551
552  static void assertBuffersEqual(ByteBuff expectedBuffer, ByteBuff actualBuffer,
553    Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) {
554    if (!actualBuffer.equals(expectedBuffer)) {
555      int prefix = 0;
556      int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
557      while (prefix < minLimit && expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
558        prefix++;
559      }
560
561      fail(String.format("Content mismatch for %s, commonPrefix %d, expected %s, got %s",
562        buildMessageDetails(compression, encoding, pread), prefix,
563        nextBytesToStr(expectedBuffer, prefix), nextBytesToStr(actualBuffer, prefix)));
564    }
565  }
566
567  /**
568   * Convert a few next bytes in the given buffer at the given position to string. Used for error
569   * messages.
570   */
571  private static String nextBytesToStr(ByteBuff buf, int pos) {
572    int maxBytes = buf.limit() - pos;
573    int numBytes = Math.min(16, maxBytes);
574    return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, numBytes)
575      + (numBytes < maxBytes ? "..." : "");
576  }
577
578  @Test
579  public void testPreviousOffset() throws IOException {
580    testPreviousOffsetInternals();
581  }
582
583  protected void testPreviousOffsetInternals() throws IOException {
584    // TODO: parameterize these nested loops.
585    Configuration conf = TEST_UTIL.getConfiguration();
586    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
587      for (boolean pread : BOOLEAN_VALUES) {
588        for (boolean cacheOnWrite : BOOLEAN_VALUES) {
589          Random rand = defaultRandom();
590          LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}",
591            algo.toString(), pread, cacheOnWrite);
592          Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
593          List<Long> expectedOffsets = new ArrayList<>();
594          List<Long> expectedPrevOffsets = new ArrayList<>();
595          List<BlockType> expectedTypes = new ArrayList<>();
596          List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null;
597          long totalSize = writeBlocks(TEST_UTIL.getConfiguration(), rand, algo, path,
598            expectedOffsets, expectedPrevOffsets, expectedTypes, expectedContents);
599
600          FSDataInputStream is = fs.open(path);
601          HFileContext meta =
602            new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS)
603              .withIncludesTags(includesTag).withCompression(algo).build();
604          ReaderContext context =
605            new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
606              .withFileSize(totalSize).withFilePath(path).withFileSystem(fs).build();
607          HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
608          long curOffset = 0;
609          for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
610            if (!pread) {
611              assertEquals(is.getPos(),
612                curOffset + (i == 0 ? 0 : HConstants.HFILEBLOCK_HEADER_SIZE));
613            }
614
615            assertEquals(expectedOffsets.get(i).longValue(), curOffset);
616            if (detailedLogging) {
617              LOG.info("Reading block #" + i + " at offset " + curOffset);
618            }
619            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, false);
620            if (detailedLogging) {
621              LOG.info("Block #" + i + ": " + b);
622            }
623            assertEquals("Invalid block #" + i + "'s type:", expectedTypes.get(i),
624              b.getBlockType());
625            assertEquals("Invalid previous block offset for block " + i + " of " + "type "
626              + b.getBlockType() + ":", (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
627            b.sanityCheck();
628            assertEquals(curOffset, b.getOffset());
629
630            // Now re-load this block knowing the on-disk size. This tests a
631            // different branch in the loader.
632            HFileBlock b2 =
633              hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, false);
634            b2.sanityCheck();
635
636            assertEquals(b.getBlockType(), b2.getBlockType());
637            assertEquals(b.getOnDiskSizeWithoutHeader(), b2.getOnDiskSizeWithoutHeader());
638            assertEquals(b.getOnDiskSizeWithHeader(), b2.getOnDiskSizeWithHeader());
639            assertEquals(b.getUncompressedSizeWithoutHeader(),
640              b2.getUncompressedSizeWithoutHeader());
641            assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
642            assertEquals(curOffset, b2.getOffset());
643            assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
644            assertEquals(b.getOnDiskDataSizeWithHeader(), b2.getOnDiskDataSizeWithHeader());
645            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
646            assertRelease(b2);
647
648            curOffset += b.getOnDiskSizeWithHeader();
649
650            if (cacheOnWrite) {
651              // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
652              // verifies that the unpacked value read back off disk matches the unpacked value
653              // generated before writing to disk.
654              HFileBlock newBlock = b.unpack(meta, hbr);
655              // neither b's unpacked nor the expectedContents have checksum.
656              // they should be identical
657              ByteBuff bufRead = newBlock.getBufferReadOnly();
658              ByteBuffer bufExpected = expectedContents.get(i);
659              byte[] bytesRead = bufRead.toBytes();
660              boolean bytesAreCorrect = Bytes.compareTo(bytesRead, 0, bytesRead.length,
661                bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0;
662              String wrongBytesMsg = "";
663
664              if (!bytesAreCorrect) {
665                // Optimization: only construct an error message in case we
666                // will need it.
667                wrongBytesMsg = "Expected bytes in block #" + i + " (algo=" + algo + ", pread="
668                  + pread + ", cacheOnWrite=" + cacheOnWrite + "):\n";
669                wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
670                  bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit())) + ", actual:\n"
671                  + Bytes.toStringBinary(bytesRead, 0, Math.min(32 + 10, bytesRead.length));
672                if (detailedLogging) {
673                  LOG.warn(
674                    "expected header" + HFileBlock.toStringHeader(new SingleByteBuff(bufExpected))
675                      + "\nfound    header" + HFileBlock.toStringHeader(bufRead));
676                  LOG.warn("bufread offset " + bufRead.arrayOffset() + " limit " + bufRead.limit()
677                    + " expected offset " + bufExpected.arrayOffset() + " limit "
678                    + bufExpected.limit());
679                  LOG.warn(wrongBytesMsg);
680                }
681              }
682              assertTrue(wrongBytesMsg, bytesAreCorrect);
683              assertRelease(newBlock);
684              if (newBlock != b) {
685                assertRelease(b);
686              }
687            } else {
688              assertRelease(b);
689            }
690          }
691          assertEquals(curOffset, fs.getFileStatus(path).getLen());
692          is.close();
693        }
694      }
695    }
696  }
697
698  private Random defaultRandom() {
699    return new Random(189237);
700  }
701
702  private class BlockReaderThread implements Callable<Boolean> {
703    private final String clientId;
704    private final HFileBlock.FSReader hbr;
705    private final List<Long> offsets;
706    private final List<BlockType> types;
707    private final long fileSize;
708
709    public BlockReaderThread(String clientId, HFileBlock.FSReader hbr, List<Long> offsets,
710      List<BlockType> types, long fileSize) {
711      this.clientId = clientId;
712      this.offsets = offsets;
713      this.hbr = hbr;
714      this.types = types;
715      this.fileSize = fileSize;
716    }
717
718    @Override
719    public Boolean call() throws Exception {
720      Random rand = new Random(clientId.hashCode());
721      long endTime = EnvironmentEdgeManager.currentTime() + 10000;
722      int numBlocksRead = 0;
723      int numPositionalRead = 0;
724      int numWithOnDiskSize = 0;
725      while (EnvironmentEdgeManager.currentTime() < endTime) {
726        int blockId = rand.nextInt(NUM_TEST_BLOCKS);
727        long offset = offsets.get(blockId);
728        // now we only support concurrent read with pread = true
729        boolean pread = true;
730        boolean withOnDiskSize = rand.nextBoolean();
731        long expectedSize =
732          (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset;
733        HFileBlock b = null;
734        try {
735          long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
736          b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
737          if (useHeapAllocator) {
738            assertTrue(!b.isSharedMem());
739          } else {
740            assertTrue(!b.getBlockType().isData() || b.isSharedMem());
741          }
742          assertEquals(types.get(blockId), b.getBlockType());
743          assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
744          assertEquals(offset, b.getOffset());
745        } catch (IOException ex) {
746          LOG.error("Error in client " + clientId + " trying to read block at " + offset
747            + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize, ex);
748          return false;
749        } finally {
750          if (b != null) {
751            b.release();
752          }
753        }
754        ++numBlocksRead;
755        if (pread) {
756          ++numPositionalRead;
757        }
758
759        if (withOnDiskSize) {
760          ++numWithOnDiskSize;
761        }
762      }
763      LOG
764        .info("Client " + clientId + " successfully read " + numBlocksRead + " blocks (with pread: "
765          + numPositionalRead + ", with onDiskSize " + "specified: " + numWithOnDiskSize + ")");
766
767      return true;
768    }
769  }
770
771  @Test
772  public void testConcurrentReading() throws Exception {
773    testConcurrentReadingInternals();
774  }
775
776  protected void testConcurrentReadingInternals()
777    throws IOException, InterruptedException, ExecutionException {
778    Configuration conf = TEST_UTIL.getConfiguration();
779    for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
780      Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
781      Random rand = defaultRandom();
782      List<Long> offsets = new ArrayList<>();
783      List<BlockType> types = new ArrayList<>();
784      writeBlocks(TEST_UTIL.getConfiguration(), rand, compressAlgo, path, offsets, null, types,
785        null);
786      FSDataInputStream is = fs.open(path);
787      long fileSize = fs.getFileStatus(path).getLen();
788      HFileContext meta =
789        new HFileContextBuilder().withHBaseCheckSum(true).withIncludesMvcc(includesMemstoreTS)
790          .withIncludesTags(includesTag).withCompression(compressAlgo).build();
791      ReaderContext context =
792        new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
793          .withFileSize(fileSize).withFilePath(path).withFileSystem(fs).build();
794      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc, conf);
795
796      Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
797      ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
798
799      for (int i = 0; i < NUM_READER_THREADS; ++i) {
800        ecs.submit(
801          new BlockReaderThread("reader_" + (char) ('A' + i), hbr, offsets, types, fileSize));
802      }
803
804      for (int i = 0; i < NUM_READER_THREADS; ++i) {
805        Future<Boolean> result = ecs.take();
806        assertTrue(result.get());
807        if (detailedLogging) {
808          LOG.info(String.valueOf(i + 1) + " reader threads finished successfully (algo="
809            + compressAlgo + ")");
810        }
811      }
812      is.close();
813    }
814  }
815
816  private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
817    Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
818    List<BlockType> expectedTypes, List<ByteBuffer> expectedContents) throws IOException {
819    boolean cacheOnWrite = expectedContents != null;
820    FSDataOutputStream os = fs.create(path);
821    HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(true)
822      .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
823      .withCompression(compressAlgo).withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).build();
824    HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
825    Map<BlockType, Long> prevOffsetByType = new HashMap<>();
826    long totalSize = 0;
827    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
828      long pos = os.getPos();
829      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
830      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
831        blockTypeOrdinal = BlockType.DATA.ordinal();
832      }
833      BlockType bt = BlockType.values()[blockTypeOrdinal];
834      DataOutputStream dos = hbw.startWriting(bt);
835      int size = rand.nextInt(500);
836      for (int j = 0; j < size; ++j) {
837        // This might compress well.
838        dos.writeShort(i + 1);
839        dos.writeInt(j + 1);
840      }
841
842      if (expectedOffsets != null) expectedOffsets.add(os.getPos());
843
844      if (expectedPrevOffsets != null) {
845        Long prevOffset = prevOffsetByType.get(bt);
846        expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
847        prevOffsetByType.put(bt, os.getPos());
848      }
849
850      expectedTypes.add(bt);
851
852      hbw.writeHeaderAndData(os);
853      totalSize += hbw.getOnDiskSizeWithHeader();
854
855      if (cacheOnWrite) {
856        ByteBuff buff = hbw.cloneUncompressedBufferWithHeader();
857        expectedContents.add(buff.asSubByteBuffer(buff.capacity()));
858      }
859
860      if (detailedLogging) {
861        LOG.info("Written block #" + i + " of type " + bt + ", uncompressed size "
862          + hbw.getUncompressedSizeWithoutHeader() + ", packed size "
863          + hbw.getOnDiskSizeWithoutHeader() + " at offset " + pos);
864      }
865    }
866    os.close();
867    LOG.info("Created a temporary file at " + path + ", " + fs.getFileStatus(path).getLen()
868      + " byte, compression=" + compressAlgo);
869    return totalSize;
870  }
871
872  @Test
873  public void testBlockHeapSize() {
874    testBlockHeapSizeInternals();
875  }
876
877  protected void testBlockHeapSizeInternals() {
878    if (ClassSize.is32BitJVM()) {
879      assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
880    } else {
881      assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
882    }
883
884    for (int size : new int[] { 100, 256, 12345 }) {
885      byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
886      ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
887      HFileContext meta = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
888        .withIncludesTags(includesTag).withHBaseCheckSum(false).withCompression(Algorithm.NONE)
889        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withChecksumType(ChecksumType.NULL)
890        .build();
891      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, ByteBuff.wrap(buf),
892        HFileBlock.FILL_HEADER, -1, 0, -1, meta, HEAP);
893      long byteBufferExpectedSize =
894        ClassSize.align(ClassSize.estimateBase(new MultiByteBuff(buf).getClass(), true)
895          + HConstants.HFILEBLOCK_HEADER_SIZE + size);
896      long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
897      long hfileBlockExpectedSize = ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
898      long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
899      assertEquals(
900        "Block data size: " + size + ", byte buffer expected " + "size: " + byteBufferExpectedSize
901          + ", HFileBlock class expected " + "size: " + hfileBlockExpectedSize
902          + " HFileContext class expected size: " + hfileMetaSize + "; ",
903        expected, block.heapSize());
904    }
905  }
906
907  @Test
908  public void testSerializeWithoutNextBlockMetadata() {
909    int size = 100;
910    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
911    byte[] byteArr = new byte[length];
912    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
913    HFileContext meta = new HFileContextBuilder().build();
914    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
915      ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
916    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1,
917      ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
918    ByteBuffer buff1 = ByteBuffer.allocate(length);
919    ByteBuffer buff2 = ByteBuffer.allocate(length);
920    blockWithNextBlockMetadata.serialize(buff1, true);
921    blockWithoutNextBlockMetadata.serialize(buff2, true);
922    assertNotEquals(buff1, buff2);
923    buff1.clear();
924    buff2.clear();
925    blockWithNextBlockMetadata.serialize(buff1, false);
926    blockWithoutNextBlockMetadata.serialize(buff2, false);
927    assertEquals(buff1, buff2);
928  }
929}