001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
022import static org.junit.Assert.*;
023
024import java.io.ByteArrayOutputStream;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Random;
036import java.util.concurrent.Callable;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.Executor;
039import java.util.concurrent.ExecutorCompletionService;
040import java.util.concurrent.Executors;
041import java.util.concurrent.Future;
042import org.apache.hadoop.fs.FSDataInputStream;
043import org.apache.hadoop.fs.FSDataOutputStream;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.hbase.ArrayBackedTag;
047import org.apache.hadoop.hbase.CellComparatorImpl;
048import org.apache.hadoop.hbase.CellUtil;
049import org.apache.hadoop.hbase.HBaseClassTestRule;
050import org.apache.hadoop.hbase.HBaseTestingUtility;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.KeyValue;
053import org.apache.hadoop.hbase.Tag;
054import org.apache.hadoop.hbase.fs.HFileSystem;
055import org.apache.hadoop.hbase.io.compress.Compression;
056import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
057import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
058import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
059import org.apache.hadoop.hbase.nio.ByteBuff;
060import org.apache.hadoop.hbase.nio.MultiByteBuff;
061import org.apache.hadoop.hbase.nio.SingleByteBuff;
062import org.apache.hadoop.hbase.testclassification.IOTests;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.ChecksumType;
066import org.apache.hadoop.hbase.util.ClassSize;
067import org.apache.hadoop.io.WritableUtils;
068import org.apache.hadoop.io.compress.Compressor;
069import org.junit.Before;
070import org.junit.ClassRule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.junit.runner.RunWith;
074import org.junit.runners.Parameterized;
075import org.junit.runners.Parameterized.Parameters;
076import org.mockito.Mockito;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080@Category({IOTests.class, MediumTests.class})
081@RunWith(Parameterized.class)
082public class TestHFileBlock {
083
084  @ClassRule
085  public static final HBaseClassTestRule CLASS_RULE =
086      HBaseClassTestRule.forClass(TestHFileBlock.class);
087
088  // change this value to activate more logs
089  private static final boolean detailedLogging = false;
090  private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
091
092  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
093
094  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
095
096  private static final int NUM_TEST_BLOCKS = 1000;
097  private static final int NUM_READER_THREADS = 26;
098
099  // Used to generate KeyValues
100  private static int NUM_KEYVALUES = 50;
101  private static int FIELD_LENGTH = 10;
102  private static float CHANCE_TO_REPEAT = 0.6f;
103
104  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
105  private FileSystem fs;
106
107  private final boolean includesMemstoreTS;
108  private final boolean includesTag;
109  public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
110    this.includesMemstoreTS = includesMemstoreTS;
111    this.includesTag = includesTag;
112  }
113
114  @Parameters
115  public static Collection<Object[]> parameters() {
116    return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
117  }
118
119  @Before
120  public void setUp() throws IOException {
121    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
122  }
123
124  static void writeTestBlockContents(DataOutputStream dos) throws IOException {
125    // This compresses really well.
126    for (int i = 0; i < 1000; ++i)
127      dos.writeInt(i / 100);
128  }
129
130  static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
131      boolean useTag) throws IOException {
132    List<KeyValue> keyValues = new ArrayList<>();
133    Random randomizer = new Random(42L + seed); // just any fixed number
134
135    // generate keyValues
136    for (int i = 0; i < NUM_KEYVALUES; ++i) {
137      byte[] row;
138      long timestamp;
139      byte[] family;
140      byte[] qualifier;
141      byte[] value;
142
143      // generate it or repeat, it should compress well
144      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
145        row = CellUtil.cloneRow(keyValues.get(randomizer.nextInt(keyValues.size())));
146      } else {
147        row = new byte[FIELD_LENGTH];
148        randomizer.nextBytes(row);
149      }
150      if (0 == i) {
151        family = new byte[FIELD_LENGTH];
152        randomizer.nextBytes(family);
153      } else {
154        family = CellUtil.cloneFamily(keyValues.get(0));
155      }
156      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
157        qualifier = CellUtil.cloneQualifier(keyValues.get(randomizer.nextInt(keyValues.size())));
158      } else {
159        qualifier = new byte[FIELD_LENGTH];
160        randomizer.nextBytes(qualifier);
161      }
162      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
163        value = CellUtil.cloneValue(keyValues.get(randomizer.nextInt(keyValues.size())));
164      } else {
165        value = new byte[FIELD_LENGTH];
166        randomizer.nextBytes(value);
167      }
168      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
169        timestamp = keyValues.get(
170            randomizer.nextInt(keyValues.size())).getTimestamp();
171      } else {
172        timestamp = randomizer.nextLong();
173      }
174      if (!useTag) {
175        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
176      } else {
177        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value,
178            new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) }));
179      }
180    }
181
182    // sort it and write to stream
183    int totalSize = 0;
184    Collections.sort(keyValues, CellComparatorImpl.COMPARATOR);
185
186    for (KeyValue kv : keyValues) {
187      totalSize += kv.getLength();
188      if (includesMemstoreTS) {
189        long memstoreTS = randomizer.nextLong();
190        kv.setSequenceId(memstoreTS);
191        totalSize += WritableUtils.getVIntSize(memstoreTS);
192      }
193      hbw.write(kv);
194    }
195    return totalSize;
196  }
197
198  public byte[] createTestV1Block(Compression.Algorithm algo)
199      throws IOException {
200    Compressor compressor = algo.getCompressor();
201    ByteArrayOutputStream baos = new ByteArrayOutputStream();
202    OutputStream os = algo.createCompressionStream(baos, compressor, 0);
203    DataOutputStream dos = new DataOutputStream(os);
204    BlockType.META.write(dos); // Let's make this a meta block.
205    writeTestBlockContents(dos);
206    dos.flush();
207    algo.returnCompressor(compressor);
208    return baos.toByteArray();
209  }
210
211  static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
212      boolean includesMemstoreTS, boolean includesTag) throws IOException {
213    final BlockType blockType = BlockType.DATA;
214    HFileContext meta = new HFileContextBuilder()
215                        .withCompression(algo)
216                        .withIncludesMvcc(includesMemstoreTS)
217                        .withIncludesTags(includesTag)
218                        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
219                        .build();
220    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
221    DataOutputStream dos = hbw.startWriting(blockType);
222    writeTestBlockContents(dos);
223    dos.flush();
224    hbw.ensureBlockReady();
225    assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
226    hbw.release();
227    return hbw;
228  }
229
230  public String createTestBlockStr(Compression.Algorithm algo,
231      int correctLength, boolean useTag) throws IOException {
232    HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
233    byte[] testV2Block = hbw.getHeaderAndDataForTest();
234    int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
235    if (testV2Block.length == correctLength) {
236      // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
237      // variations across operating systems.
238      // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
239      // We only make this change when the compressed block length matches.
240      // Otherwise, there are obviously other inconsistencies.
241      testV2Block[osOffset] = 3;
242    }
243    return Bytes.toStringBinary(testV2Block);
244  }
245
246  @Test
247  public void testNoCompression() throws IOException {
248    CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
249    Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
250
251    HFileBlock block =
252      createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
253    assertEquals(4000, block.getUncompressedSizeWithoutHeader());
254    assertEquals(4004, block.getOnDiskSizeWithoutHeader());
255    assertTrue(block.isUnpacked());
256  }
257
258  @Test
259  public void testGzipCompression() throws IOException {
260    final String correctTestBlockStr =
261        "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
262            + "\\xFF\\xFF\\xFF\\xFF"
263            + "\\x0" + ChecksumType.getDefaultChecksumType().getCode()
264            + "\\x00\\x00@\\x00\\x00\\x00\\x00["
265            // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
266            + "\\x1F\\x8B"  // gzip magic signature
267            + "\\x08"  // Compression method: 8 = "deflate"
268            + "\\x00"  // Flags
269            + "\\x00\\x00\\x00\\x00"  // mtime
270            + "\\x00"  // XFL (extra flags)
271            // OS (0 = FAT filesystems, 3 = Unix). However, this field
272            // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
273            // This appears to be a difference caused by the availability
274            // (and use) of the native GZ codec.
275            + "\\x03"
276            + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
277            + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
278            + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
279            + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
280    final int correctGzipBlockLength = 95;
281    final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
282    // We ignore the block checksum because createTestBlockStr can change the
283    // gzip header after the block is produced
284    assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
285      testBlockStr.substring(0, correctGzipBlockLength - 4));
286  }
287
288  @Test
289  public void testReaderV2() throws IOException {
290    testReaderV2Internals();
291  }
292
293  protected void testReaderV2Internals() throws IOException {
294    if(includesTag) {
295      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
296    }
297    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
298      for (boolean pread : new boolean[] { false, true }) {
299          LOG.info("testReaderV2: Compression algorithm: " + algo +
300                   ", pread=" + pread);
301        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
302            + algo);
303        FSDataOutputStream os = fs.create(path);
304        HFileContext meta = new HFileContextBuilder()
305                           .withCompression(algo)
306                           .withIncludesMvcc(includesMemstoreTS)
307                           .withIncludesTags(includesTag)
308                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
309                           .build();
310        HFileBlock.Writer hbw = new HFileBlock.Writer(null,
311           meta);
312        long totalSize = 0;
313        for (int blockId = 0; blockId < 2; ++blockId) {
314          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
315          for (int i = 0; i < 1234; ++i)
316            dos.writeInt(i);
317          hbw.writeHeaderAndData(os);
318          totalSize += hbw.getOnDiskSizeWithHeader();
319        }
320        os.close();
321
322        FSDataInputStream is = fs.open(path);
323        meta = new HFileContextBuilder()
324        .withHBaseCheckSum(true)
325        .withIncludesMvcc(includesMemstoreTS)
326        .withIncludesTags(includesTag)
327        .withCompression(algo).build();
328        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
329        HFileBlock b = hbr.readBlockData(0, -1, pread, false);
330        is.close();
331        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
332
333        b.sanityCheck();
334        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
335        assertEquals(algo == GZ ? 2173 : 4936,
336                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
337        HFileBlock expected = b;
338
339        if (algo == GZ) {
340          is = fs.open(path);
341          hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
342          b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
343                                b.totalChecksumBytes(), pread, false);
344          assertEquals(expected, b);
345          int wrongCompressedSize = 2172;
346          try {
347            b = hbr.readBlockData(0, wrongCompressedSize
348                + HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
349            fail("Exception expected");
350          } catch (IOException ex) {
351            String expectedPrefix = "Passed in onDiskSizeWithHeader=";
352            assertTrue("Invalid exception message: '" + ex.getMessage()
353                + "'.\nMessage is expected to start with: '" + expectedPrefix
354                + "'", ex.getMessage().startsWith(expectedPrefix));
355          }
356          is.close();
357        }
358      }
359    }
360  }
361
362  /**
363   * Test encoding/decoding data blocks.
364   * @throws IOException a bug or a problem with temporary files.
365   */
366  @Test
367  public void testDataBlockEncoding() throws IOException {
368    testInternals();
369  }
370
371  private void testInternals() throws IOException {
372    final int numBlocks = 5;
373    if(includesTag) {
374      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
375    }
376    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
377      for (boolean pread : new boolean[] { false, true }) {
378        for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
379          LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}",
380              algo.toString(), pread, encoding);
381          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
382              + algo + "_" + encoding.toString());
383          FSDataOutputStream os = fs.create(path);
384          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
385              new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
386          HFileContext meta = new HFileContextBuilder()
387                              .withCompression(algo)
388                              .withIncludesMvcc(includesMemstoreTS)
389                              .withIncludesTags(includesTag)
390                              .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
391                              .build();
392          HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
393          long totalSize = 0;
394          final List<Integer> encodedSizes = new ArrayList<>();
395          final List<ByteBuffer> encodedBlocks = new ArrayList<>();
396          for (int blockId = 0; blockId < numBlocks; ++blockId) {
397            hbw.startWriting(BlockType.DATA);
398            writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
399            hbw.writeHeaderAndData(os);
400            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
401            byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array();
402            final int encodedSize = encodedResultWithHeader.length - headerLen;
403            if (encoding != DataBlockEncoding.NONE) {
404              // We need to account for the two-byte encoding algorithm ID that
405              // comes after the 24-byte block header but before encoded KVs.
406              headerLen += DataBlockEncoding.ID_SIZE;
407            }
408            byte[] encodedDataSection =
409                new byte[encodedResultWithHeader.length - headerLen];
410            System.arraycopy(encodedResultWithHeader, headerLen,
411                encodedDataSection, 0, encodedDataSection.length);
412            final ByteBuffer encodedBuf =
413                ByteBuffer.wrap(encodedDataSection);
414            encodedSizes.add(encodedSize);
415            encodedBlocks.add(encodedBuf);
416            totalSize += hbw.getOnDiskSizeWithHeader();
417          }
418          os.close();
419
420          FSDataInputStream is = fs.open(path);
421          meta = new HFileContextBuilder()
422                .withHBaseCheckSum(true)
423                .withCompression(algo)
424                .withIncludesMvcc(includesMemstoreTS)
425                .withIncludesTags(includesTag)
426                .build();
427          HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
428          hbr.setDataBlockEncoder(dataBlockEncoder);
429          hbr.setIncludesMemStoreTS(includesMemstoreTS);
430          HFileBlock blockFromHFile, blockUnpacked;
431          int pos = 0;
432          for (int blockId = 0; blockId < numBlocks; ++blockId) {
433            blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
434            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
435            blockFromHFile.sanityCheck();
436            pos += blockFromHFile.getOnDiskSizeWithHeader();
437            assertEquals((int) encodedSizes.get(blockId),
438              blockFromHFile.getUncompressedSizeWithoutHeader());
439            assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
440            long packedHeapsize = blockFromHFile.heapSize();
441            blockUnpacked = blockFromHFile.unpack(meta, hbr);
442            assertTrue(blockUnpacked.isUnpacked());
443            if (meta.isCompressedOrEncrypted()) {
444              LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
445                .heapSize());
446              assertFalse(packedHeapsize == blockUnpacked.heapSize());
447              assertTrue("Packed heapSize should be < unpacked heapSize",
448                packedHeapsize < blockUnpacked.heapSize());
449            }
450            ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader();
451            if (encoding != DataBlockEncoding.NONE) {
452              // We expect a two-byte big-endian encoding id.
453              assertEquals(
454                "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
455                Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
456              assertEquals(
457                "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
458                Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
459              actualBuffer.position(2);
460              actualBuffer = actualBuffer.slice();
461            }
462
463            ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
464            expectedBuffer.rewind();
465
466            // test if content matches, produce nice message
467            assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding,
468                pread);
469
470            // test serialized blocks
471            for (boolean reuseBuffer : new boolean[] { false, true }) {
472              ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
473              blockFromHFile.serialize(serialized, true);
474              HFileBlock deserialized =
475                  (HFileBlock) blockFromHFile.getDeserializer().deserialize(
476                    new SingleByteBuff(serialized), reuseBuffer, MemoryType.EXCLUSIVE);
477              assertEquals(
478                "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
479                blockFromHFile, deserialized);
480              // intentional reference comparison
481              if (blockFromHFile != blockUnpacked) {
482                assertEquals("Deserializaed block cannot be unpacked correctly.",
483                  blockUnpacked, deserialized.unpack(meta, hbr));
484              }
485            }
486          }
487          is.close();
488        }
489      }
490    }
491  }
492
493  static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
494      boolean pread) {
495    return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
496  }
497
498  static void assertBuffersEqual(ByteBuff expectedBuffer,
499      ByteBuff actualBuffer, Compression.Algorithm compression,
500      DataBlockEncoding encoding, boolean pread) {
501    if (!actualBuffer.equals(expectedBuffer)) {
502      int prefix = 0;
503      int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
504      while (prefix < minLimit &&
505          expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
506        prefix++;
507      }
508
509      fail(String.format(
510          "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
511          buildMessageDetails(compression, encoding, pread), prefix,
512          nextBytesToStr(expectedBuffer, prefix),
513          nextBytesToStr(actualBuffer, prefix)));
514    }
515  }
516
517  /**
518   * Convert a few next bytes in the given buffer at the given position to
519   * string. Used for error messages.
520   */
521  private static String nextBytesToStr(ByteBuff buf, int pos) {
522    int maxBytes = buf.limit() - pos;
523    int numBytes = Math.min(16, maxBytes);
524    return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
525        numBytes) + (numBytes < maxBytes ? "..." : "");
526  }
527
528  @Test
529  public void testPreviousOffset() throws IOException {
530    testPreviousOffsetInternals();
531  }
532
533  protected void testPreviousOffsetInternals() throws IOException {
534    // TODO: parameterize these nested loops.
535    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
536      for (boolean pread : BOOLEAN_VALUES) {
537        for (boolean cacheOnWrite : BOOLEAN_VALUES) {
538          Random rand = defaultRandom();
539          LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}",
540              algo.toString(), pread, cacheOnWrite);
541          Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
542          List<Long> expectedOffsets = new ArrayList<>();
543          List<Long> expectedPrevOffsets = new ArrayList<>();
544          List<BlockType> expectedTypes = new ArrayList<>();
545          List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null;
546          long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
547              expectedPrevOffsets, expectedTypes, expectedContents);
548
549          FSDataInputStream is = fs.open(path);
550          HFileContext meta = new HFileContextBuilder()
551                              .withHBaseCheckSum(true)
552                              .withIncludesMvcc(includesMemstoreTS)
553                              .withIncludesTags(includesTag)
554                              .withCompression(algo).build();
555          HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
556          long curOffset = 0;
557          for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
558            if (!pread) {
559              assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
560                HConstants.HFILEBLOCK_HEADER_SIZE));
561            }
562
563            assertEquals(expectedOffsets.get(i).longValue(), curOffset);
564            if (detailedLogging) {
565              LOG.info("Reading block #" + i + " at offset " + curOffset);
566            }
567            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
568            if (detailedLogging) {
569              LOG.info("Block #" + i + ": " + b);
570            }
571            assertEquals("Invalid block #" + i + "'s type:",
572                expectedTypes.get(i), b.getBlockType());
573            assertEquals("Invalid previous block offset for block " + i
574                + " of " + "type " + b.getBlockType() + ":",
575                (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
576            b.sanityCheck();
577            assertEquals(curOffset, b.getOffset());
578
579            // Now re-load this block knowing the on-disk size. This tests a
580            // different branch in the loader.
581            HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false);
582            b2.sanityCheck();
583
584            assertEquals(b.getBlockType(), b2.getBlockType());
585            assertEquals(b.getOnDiskSizeWithoutHeader(),
586                b2.getOnDiskSizeWithoutHeader());
587            assertEquals(b.getOnDiskSizeWithHeader(),
588                b2.getOnDiskSizeWithHeader());
589            assertEquals(b.getUncompressedSizeWithoutHeader(),
590                b2.getUncompressedSizeWithoutHeader());
591            assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
592            assertEquals(curOffset, b2.getOffset());
593            assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
594            assertEquals(b.getOnDiskDataSizeWithHeader(),
595                         b2.getOnDiskDataSizeWithHeader());
596            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
597
598            curOffset += b.getOnDiskSizeWithHeader();
599
600            if (cacheOnWrite) {
601              // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
602              // verifies that the unpacked value read back off disk matches the unpacked value
603              // generated before writing to disk.
604              b = b.unpack(meta, hbr);
605              // b's buffer has header + data + checksum while
606              // expectedContents have header + data only
607              ByteBuff bufRead = b.getBufferReadOnly();
608              ByteBuffer bufExpected = expectedContents.get(i);
609              boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
610                  bufRead.arrayOffset(),
611                  bufRead.limit() - b.totalChecksumBytes(),
612                  bufExpected.array(), bufExpected.arrayOffset(),
613                  bufExpected.limit()) == 0;
614              String wrongBytesMsg = "";
615
616              if (!bytesAreCorrect) {
617                // Optimization: only construct an error message in case we
618                // will need it.
619                wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
620                    + algo + ", pread=" + pread
621                    + ", cacheOnWrite=" + cacheOnWrite + "):\n";
622                wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
623                  bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
624                    + ", actual:\n"
625                    + Bytes.toStringBinary(bufRead.array(),
626                  bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
627                if (detailedLogging) {
628                  LOG.warn("expected header" +
629                           HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) +
630                           "\nfound    header" +
631                           HFileBlock.toStringHeader(bufRead));
632                  LOG.warn("bufread offset " + bufRead.arrayOffset() +
633                           " limit " + bufRead.limit() +
634                           " expected offset " + bufExpected.arrayOffset() +
635                           " limit " + bufExpected.limit());
636                  LOG.warn(wrongBytesMsg);
637                }
638              }
639              assertTrue(wrongBytesMsg, bytesAreCorrect);
640            }
641          }
642
643          assertEquals(curOffset, fs.getFileStatus(path).getLen());
644          is.close();
645        }
646      }
647    }
648  }
649
650  private Random defaultRandom() {
651    return new Random(189237);
652  }
653
654  private class BlockReaderThread implements Callable<Boolean> {
655    private final String clientId;
656    private final HFileBlock.FSReader hbr;
657    private final List<Long> offsets;
658    private final List<BlockType> types;
659    private final long fileSize;
660
661    public BlockReaderThread(String clientId,
662        HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
663        long fileSize) {
664      this.clientId = clientId;
665      this.offsets = offsets;
666      this.hbr = hbr;
667      this.types = types;
668      this.fileSize = fileSize;
669    }
670
671    @Override
672    public Boolean call() throws Exception {
673      Random rand = new Random(clientId.hashCode());
674      long endTime = System.currentTimeMillis() + 10000;
675      int numBlocksRead = 0;
676      int numPositionalRead = 0;
677      int numWithOnDiskSize = 0;
678      while (System.currentTimeMillis() < endTime) {
679        int blockId = rand.nextInt(NUM_TEST_BLOCKS);
680        long offset = offsets.get(blockId);
681        // now we only support concurrent read with pread = true
682        boolean pread = true;
683        boolean withOnDiskSize = rand.nextBoolean();
684        long expectedSize =
685          (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
686              : offsets.get(blockId + 1)) - offset;
687
688        HFileBlock b;
689        try {
690          long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
691          b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
692        } catch (IOException ex) {
693          LOG.error("Error in client " + clientId + " trying to read block at "
694              + offset + ", pread=" + pread + ", withOnDiskSize=" +
695              withOnDiskSize, ex);
696          return false;
697        }
698
699        assertEquals(types.get(blockId), b.getBlockType());
700        assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
701        assertEquals(offset, b.getOffset());
702
703        ++numBlocksRead;
704        if (pread)
705          ++numPositionalRead;
706        if (withOnDiskSize)
707          ++numWithOnDiskSize;
708      }
709      LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
710        " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
711        "specified: " + numWithOnDiskSize + ")");
712
713      return true;
714    }
715
716  }
717
718  @Test
719  public void testConcurrentReading() throws Exception {
720    testConcurrentReadingInternals();
721  }
722
723  protected void testConcurrentReadingInternals() throws IOException,
724      InterruptedException, ExecutionException {
725    for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
726      Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
727      Random rand = defaultRandom();
728      List<Long> offsets = new ArrayList<>();
729      List<BlockType> types = new ArrayList<>();
730      writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
731      FSDataInputStream is = fs.open(path);
732      long fileSize = fs.getFileStatus(path).getLen();
733      HFileContext meta = new HFileContextBuilder()
734                          .withHBaseCheckSum(true)
735                          .withIncludesMvcc(includesMemstoreTS)
736                          .withIncludesTags(includesTag)
737                          .withCompression(compressAlgo)
738                          .build();
739      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta);
740
741      Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
742      ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
743
744      for (int i = 0; i < NUM_READER_THREADS; ++i) {
745        ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
746            offsets, types, fileSize));
747      }
748
749      for (int i = 0; i < NUM_READER_THREADS; ++i) {
750        Future<Boolean> result = ecs.take();
751        assertTrue(result.get());
752        if (detailedLogging) {
753          LOG.info(String.valueOf(i + 1)
754            + " reader threads finished successfully (algo=" + compressAlgo
755            + ")");
756        }
757      }
758
759      is.close();
760    }
761  }
762
763  private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
764      Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
765      List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
766  ) throws IOException {
767    boolean cacheOnWrite = expectedContents != null;
768    FSDataOutputStream os = fs.create(path);
769    HFileContext meta = new HFileContextBuilder()
770                        .withHBaseCheckSum(true)
771                        .withIncludesMvcc(includesMemstoreTS)
772                        .withIncludesTags(includesTag)
773                        .withCompression(compressAlgo)
774                        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
775                        .build();
776    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
777    Map<BlockType, Long> prevOffsetByType = new HashMap<>();
778    long totalSize = 0;
779    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
780      long pos = os.getPos();
781      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
782      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
783        blockTypeOrdinal = BlockType.DATA.ordinal();
784      }
785      BlockType bt = BlockType.values()[blockTypeOrdinal];
786      DataOutputStream dos = hbw.startWriting(bt);
787      int size = rand.nextInt(500);
788      for (int j = 0; j < size; ++j) {
789        // This might compress well.
790        dos.writeShort(i + 1);
791        dos.writeInt(j + 1);
792      }
793
794      if (expectedOffsets != null)
795        expectedOffsets.add(os.getPos());
796
797      if (expectedPrevOffsets != null) {
798        Long prevOffset = prevOffsetByType.get(bt);
799        expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
800        prevOffsetByType.put(bt, os.getPos());
801      }
802
803      expectedTypes.add(bt);
804
805      hbw.writeHeaderAndData(os);
806      totalSize += hbw.getOnDiskSizeWithHeader();
807
808      if (cacheOnWrite)
809        expectedContents.add(hbw.cloneUncompressedBufferWithHeader());
810
811      if (detailedLogging) {
812        LOG.info("Written block #" + i + " of type " + bt
813            + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
814            + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
815            + " at offset " + pos);
816      }
817    }
818    os.close();
819    LOG.info("Created a temporary file at " + path + ", "
820        + fs.getFileStatus(path).getLen() + " byte, compression=" +
821        compressAlgo);
822    return totalSize;
823  }
824
825  @Test
826  public void testBlockHeapSize() {
827    testBlockHeapSizeInternals();
828  }
829
830  protected void testBlockHeapSizeInternals() {
831    if (ClassSize.is32BitJVM()) {
832      assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
833    } else {
834      assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
835    }
836
837    for (int size : new int[] { 100, 256, 12345 }) {
838      byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
839      ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
840      HFileContext meta = new HFileContextBuilder()
841                          .withIncludesMvcc(includesMemstoreTS)
842                          .withIncludesTags(includesTag)
843                          .withHBaseCheckSum(false)
844                          .withCompression(Algorithm.NONE)
845                          .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
846                          .withChecksumType(ChecksumType.NULL).build();
847      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
848          HFileBlock.FILL_HEADER, -1, 0, -1, meta);
849      long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
850          new MultiByteBuff(buf).getClass(), true)
851          + HConstants.HFILEBLOCK_HEADER_SIZE + size);
852      long hfileMetaSize =  ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
853      long hfileBlockExpectedSize =
854          ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
855      long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
856      assertEquals("Block data size: " + size + ", byte buffer expected " +
857          "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
858          "size: " + hfileBlockExpectedSize + ";", expected,
859          block.heapSize());
860    }
861  }
862
863  @Test
864  public void testSerializeWithoutNextBlockMetadata() {
865    int size = 100;
866    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
867    byte[] byteArr = new byte[length];
868    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
869    HFileContext meta = new HFileContextBuilder().build();
870    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
871        HFileBlock.FILL_HEADER, -1, 52, -1, meta);
872    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
873        HFileBlock.FILL_HEADER, -1, -1, -1, meta);
874    ByteBuffer buff1 = ByteBuffer.allocate(length);
875    ByteBuffer buff2 = ByteBuffer.allocate(length);
876    blockWithNextBlockMetadata.serialize(buff1, true);
877    blockWithoutNextBlockMetadata.serialize(buff2, true);
878    assertNotEquals(buff1, buff2);
879    buff1.clear();
880    buff2.clear();
881    blockWithNextBlockMetadata.serialize(buff1, false);
882    blockWithoutNextBlockMetadata.serialize(buff2, false);
883    assertEquals(buff1, buff2);
884  }
885}