001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
021import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
022import static org.junit.Assert.*;
023
024import java.io.ByteArrayOutputStream;
025import java.io.DataOutputStream;
026import java.io.IOException;
027import java.io.OutputStream;
028import java.nio.ByteBuffer;
029import java.util.ArrayList;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.Random;
037import java.util.concurrent.Callable;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.Executor;
040import java.util.concurrent.ExecutorCompletionService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import org.apache.hadoop.fs.FSDataInputStream;
044import org.apache.hadoop.fs.FSDataOutputStream;
045import org.apache.hadoop.fs.FileSystem;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.hbase.ArrayBackedTag;
048import org.apache.hadoop.hbase.CellComparatorImpl;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.HBaseClassTestRule;
051import org.apache.hadoop.hbase.HBaseTestingUtility;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.KeyValue;
054import org.apache.hadoop.hbase.Tag;
055import org.apache.hadoop.hbase.fs.HFileSystem;
056import org.apache.hadoop.hbase.io.compress.Compression;
057import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
059import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
060import org.apache.hadoop.hbase.nio.ByteBuff;
061import org.apache.hadoop.hbase.nio.MultiByteBuff;
062import org.apache.hadoop.hbase.nio.SingleByteBuff;
063import org.apache.hadoop.hbase.testclassification.IOTests;
064import org.apache.hadoop.hbase.testclassification.MediumTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.ChecksumType;
067import org.apache.hadoop.hbase.util.ClassSize;
068import org.apache.hadoop.io.WritableUtils;
069import org.apache.hadoop.io.compress.Compressor;
070import org.junit.Before;
071import org.junit.ClassRule;
072import org.junit.Test;
073import org.junit.experimental.categories.Category;
074import org.junit.runner.RunWith;
075import org.junit.runners.Parameterized;
076import org.junit.runners.Parameterized.Parameters;
077import org.mockito.Mockito;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081@Category({IOTests.class, MediumTests.class})
082@RunWith(Parameterized.class)
083public class TestHFileBlock {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestHFileBlock.class);
088
089  // change this value to activate more logs
090  private static final boolean detailedLogging = false;
091  private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
092
093  private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
094
095  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
096
097  private static final int NUM_TEST_BLOCKS = 1000;
098  private static final int NUM_READER_THREADS = 26;
099
100  // Used to generate KeyValues
101  private static int NUM_KEYVALUES = 50;
102  private static int FIELD_LENGTH = 10;
103  private static float CHANCE_TO_REPEAT = 0.6f;
104
105  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
106  private FileSystem fs;
107
108  private final boolean includesMemstoreTS;
109  private final boolean includesTag;
110  public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
111    this.includesMemstoreTS = includesMemstoreTS;
112    this.includesTag = includesTag;
113  }
114
115  @Parameters
116  public static Collection<Object[]> parameters() {
117    return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
118  }
119
120  @Before
121  public void setUp() throws IOException {
122    fs = HFileSystem.get(TEST_UTIL.getConfiguration());
123  }
124
125  static void writeTestBlockContents(DataOutputStream dos) throws IOException {
126    // This compresses really well.
127    for (int i = 0; i < 1000; ++i)
128      dos.writeInt(i / 100);
129  }
130
131  static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
132      boolean useTag) throws IOException {
133    List<KeyValue> keyValues = new ArrayList<>();
134    Random randomizer = new Random(42L + seed); // just any fixed number
135
136    // generate keyValues
137    for (int i = 0; i < NUM_KEYVALUES; ++i) {
138      byte[] row;
139      long timestamp;
140      byte[] family;
141      byte[] qualifier;
142      byte[] value;
143
144      // generate it or repeat, it should compress well
145      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
146        row = CellUtil.cloneRow(keyValues.get(randomizer.nextInt(keyValues.size())));
147      } else {
148        row = new byte[FIELD_LENGTH];
149        randomizer.nextBytes(row);
150      }
151      if (0 == i) {
152        family = new byte[FIELD_LENGTH];
153        randomizer.nextBytes(family);
154      } else {
155        family = CellUtil.cloneFamily(keyValues.get(0));
156      }
157      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
158        qualifier = CellUtil.cloneQualifier(keyValues.get(randomizer.nextInt(keyValues.size())));
159      } else {
160        qualifier = new byte[FIELD_LENGTH];
161        randomizer.nextBytes(qualifier);
162      }
163      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
164        value = CellUtil.cloneValue(keyValues.get(randomizer.nextInt(keyValues.size())));
165      } else {
166        value = new byte[FIELD_LENGTH];
167        randomizer.nextBytes(value);
168      }
169      if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
170        timestamp = keyValues.get(
171            randomizer.nextInt(keyValues.size())).getTimestamp();
172      } else {
173        timestamp = randomizer.nextLong();
174      }
175      if (!useTag) {
176        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
177      } else {
178        keyValues.add(new KeyValue(row, family, qualifier, timestamp, value,
179            new Tag[] { new ArrayBackedTag((byte) 1, Bytes.toBytes("myTagVal")) }));
180      }
181    }
182
183    // sort it and write to stream
184    int totalSize = 0;
185    Collections.sort(keyValues, CellComparatorImpl.COMPARATOR);
186
187    for (KeyValue kv : keyValues) {
188      totalSize += kv.getLength();
189      if (includesMemstoreTS) {
190        long memstoreTS = randomizer.nextLong();
191        kv.setSequenceId(memstoreTS);
192        totalSize += WritableUtils.getVIntSize(memstoreTS);
193      }
194      hbw.write(kv);
195    }
196    return totalSize;
197  }
198
199  public byte[] createTestV1Block(Compression.Algorithm algo)
200      throws IOException {
201    Compressor compressor = algo.getCompressor();
202    ByteArrayOutputStream baos = new ByteArrayOutputStream();
203    OutputStream os = algo.createCompressionStream(baos, compressor, 0);
204    DataOutputStream dos = new DataOutputStream(os);
205    BlockType.META.write(dos); // Let's make this a meta block.
206    writeTestBlockContents(dos);
207    dos.flush();
208    algo.returnCompressor(compressor);
209    return baos.toByteArray();
210  }
211
212  static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
213      boolean includesMemstoreTS, boolean includesTag) throws IOException {
214    final BlockType blockType = BlockType.DATA;
215    HFileContext meta = new HFileContextBuilder()
216                        .withCompression(algo)
217                        .withIncludesMvcc(includesMemstoreTS)
218                        .withIncludesTags(includesTag)
219                        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
220                        .build();
221    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
222    DataOutputStream dos = hbw.startWriting(blockType);
223    writeTestBlockContents(dos);
224    dos.flush();
225    hbw.ensureBlockReady();
226    assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
227    hbw.release();
228    return hbw;
229  }
230
231  public String createTestBlockStr(Compression.Algorithm algo,
232      int correctLength, boolean useTag) throws IOException {
233    HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
234    byte[] testV2Block = hbw.getHeaderAndDataForTest();
235    int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
236    if (testV2Block.length == correctLength) {
237      // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
238      // variations across operating systems.
239      // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
240      // We only make this change when the compressed block length matches.
241      // Otherwise, there are obviously other inconsistencies.
242      testV2Block[osOffset] = 3;
243    }
244    return Bytes.toStringBinary(testV2Block);
245  }
246
247  @Test
248  public void testNoCompression() throws IOException {
249    CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
250    Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty());
251
252    HFileBlock block =
253      createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
254    assertEquals(4000, block.getUncompressedSizeWithoutHeader());
255    assertEquals(4004, block.getOnDiskSizeWithoutHeader());
256    assertTrue(block.isUnpacked());
257  }
258
259  @Test
260  public void testGzipCompression() throws IOException {
261    final String correctTestBlockStr =
262        "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
263            + "\\xFF\\xFF\\xFF\\xFF"
264            + "\\x0" + ChecksumType.getDefaultChecksumType().getCode()
265            + "\\x00\\x00@\\x00\\x00\\x00\\x00["
266            // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
267            + "\\x1F\\x8B"  // gzip magic signature
268            + "\\x08"  // Compression method: 8 = "deflate"
269            + "\\x00"  // Flags
270            + "\\x00\\x00\\x00\\x00"  // mtime
271            + "\\x00"  // XFL (extra flags)
272            // OS (0 = FAT filesystems, 3 = Unix). However, this field
273            // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
274            // This appears to be a difference caused by the availability
275            // (and use) of the native GZ codec.
276            + "\\x03"
277            + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
278            + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
279            + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
280            + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
281    final int correctGzipBlockLength = 95;
282    final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
283    // We ignore the block checksum because createTestBlockStr can change the
284    // gzip header after the block is produced
285    assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
286      testBlockStr.substring(0, correctGzipBlockLength - 4));
287  }
288
289  @Test
290  public void testReaderV2() throws IOException {
291    testReaderV2Internals();
292  }
293
294  protected void testReaderV2Internals() throws IOException {
295    if(includesTag) {
296      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
297    }
298    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
299      for (boolean pread : new boolean[] { false, true }) {
300          LOG.info("testReaderV2: Compression algorithm: " + algo +
301                   ", pread=" + pread);
302        Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
303            + algo);
304        FSDataOutputStream os = fs.create(path);
305        HFileContext meta = new HFileContextBuilder()
306                           .withCompression(algo)
307                           .withIncludesMvcc(includesMemstoreTS)
308                           .withIncludesTags(includesTag)
309                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
310                           .build();
311        HFileBlock.Writer hbw = new HFileBlock.Writer(null,
312           meta);
313        long totalSize = 0;
314        for (int blockId = 0; blockId < 2; ++blockId) {
315          DataOutputStream dos = hbw.startWriting(BlockType.DATA);
316          for (int i = 0; i < 1234; ++i)
317            dos.writeInt(i);
318          hbw.writeHeaderAndData(os);
319          totalSize += hbw.getOnDiskSizeWithHeader();
320        }
321        os.close();
322
323        FSDataInputStream is = fs.open(path);
324        meta = new HFileContextBuilder()
325        .withHBaseCheckSum(true)
326        .withIncludesMvcc(includesMemstoreTS)
327        .withIncludesTags(includesTag)
328        .withCompression(algo).build();
329        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
330        HFileBlock b = hbr.readBlockData(0, -1, pread, false);
331        is.close();
332        assertEquals(0, HFile.getAndResetChecksumFailuresCount());
333
334        b.sanityCheck();
335        assertEquals(4936, b.getUncompressedSizeWithoutHeader());
336        assertEquals(algo == GZ ? 2173 : 4936,
337                     b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
338        HFileBlock expected = b;
339
340        if (algo == GZ) {
341          is = fs.open(path);
342          hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
343          b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
344                                b.totalChecksumBytes(), pread, false);
345          assertEquals(expected, b);
346          int wrongCompressedSize = 2172;
347          try {
348            b = hbr.readBlockData(0, wrongCompressedSize
349                + HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
350            fail("Exception expected");
351          } catch (IOException ex) {
352            String expectedPrefix = "Passed in onDiskSizeWithHeader=";
353            assertTrue("Invalid exception message: '" + ex.getMessage()
354                + "'.\nMessage is expected to start with: '" + expectedPrefix
355                + "'", ex.getMessage().startsWith(expectedPrefix));
356          }
357          is.close();
358        }
359      }
360    }
361  }
362
363  /**
364   * Test encoding/decoding data blocks.
365   * @throws IOException a bug or a problem with temporary files.
366   */
367  @Test
368  public void testDataBlockEncoding() throws IOException {
369    testInternals();
370  }
371
372  private void testInternals() throws IOException {
373    final int numBlocks = 5;
374    if(includesTag) {
375      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
376    }
377    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
378      for (boolean pread : new boolean[] { false, true }) {
379        for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
380          LOG.info("testDataBlockEncoding: Compression algorithm={}, pread={}, dataBlockEncoder={}",
381              algo.toString(), pread, encoding);
382          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
383              + algo + "_" + encoding.toString());
384          FSDataOutputStream os = fs.create(path);
385          HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
386              new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
387          HFileContext meta = new HFileContextBuilder()
388                              .withCompression(algo)
389                              .withIncludesMvcc(includesMemstoreTS)
390                              .withIncludesTags(includesTag)
391                              .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
392                              .build();
393          HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
394          long totalSize = 0;
395          final List<Integer> encodedSizes = new ArrayList<>();
396          final List<ByteBuffer> encodedBlocks = new ArrayList<>();
397          for (int blockId = 0; blockId < numBlocks; ++blockId) {
398            hbw.startWriting(BlockType.DATA);
399            writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
400            hbw.writeHeaderAndData(os);
401            int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
402            byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array();
403            final int encodedSize = encodedResultWithHeader.length - headerLen;
404            if (encoding != DataBlockEncoding.NONE) {
405              // We need to account for the two-byte encoding algorithm ID that
406              // comes after the 24-byte block header but before encoded KVs.
407              headerLen += DataBlockEncoding.ID_SIZE;
408            }
409            byte[] encodedDataSection =
410                new byte[encodedResultWithHeader.length - headerLen];
411            System.arraycopy(encodedResultWithHeader, headerLen,
412                encodedDataSection, 0, encodedDataSection.length);
413            final ByteBuffer encodedBuf =
414                ByteBuffer.wrap(encodedDataSection);
415            encodedSizes.add(encodedSize);
416            encodedBlocks.add(encodedBuf);
417            totalSize += hbw.getOnDiskSizeWithHeader();
418          }
419          os.close();
420
421          FSDataInputStream is = fs.open(path);
422          meta = new HFileContextBuilder()
423                .withHBaseCheckSum(true)
424                .withCompression(algo)
425                .withIncludesMvcc(includesMemstoreTS)
426                .withIncludesTags(includesTag)
427                .build();
428          HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
429          hbr.setDataBlockEncoder(dataBlockEncoder);
430          hbr.setIncludesMemStoreTS(includesMemstoreTS);
431          HFileBlock blockFromHFile, blockUnpacked;
432          int pos = 0;
433          for (int blockId = 0; blockId < numBlocks; ++blockId) {
434            blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
435            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
436            blockFromHFile.sanityCheck();
437            pos += blockFromHFile.getOnDiskSizeWithHeader();
438            assertEquals((int) encodedSizes.get(blockId),
439              blockFromHFile.getUncompressedSizeWithoutHeader());
440            assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
441            long packedHeapsize = blockFromHFile.heapSize();
442            blockUnpacked = blockFromHFile.unpack(meta, hbr);
443            assertTrue(blockUnpacked.isUnpacked());
444            if (meta.isCompressedOrEncrypted()) {
445              LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
446                .heapSize());
447              assertFalse(packedHeapsize == blockUnpacked.heapSize());
448              assertTrue("Packed heapSize should be < unpacked heapSize",
449                packedHeapsize < blockUnpacked.heapSize());
450            }
451            ByteBuff actualBuffer = blockUnpacked.getBufferWithoutHeader();
452            if (encoding != DataBlockEncoding.NONE) {
453              // We expect a two-byte big-endian encoding id.
454              assertEquals(
455                "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
456                Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
457              assertEquals(
458                "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
459                Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
460              actualBuffer.position(2);
461              actualBuffer = actualBuffer.slice();
462            }
463
464            ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
465            expectedBuffer.rewind();
466
467            // test if content matches, produce nice message
468            assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding,
469                pread);
470
471            // test serialized blocks
472            for (boolean reuseBuffer : new boolean[] { false, true }) {
473              ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
474              blockFromHFile.serialize(serialized, true);
475              HFileBlock deserialized =
476                  (HFileBlock) blockFromHFile.getDeserializer().deserialize(
477                    new SingleByteBuff(serialized), reuseBuffer, MemoryType.EXCLUSIVE);
478              assertEquals(
479                "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
480                blockFromHFile, deserialized);
481              // intentional reference comparison
482              if (blockFromHFile != blockUnpacked) {
483                assertEquals("Deserializaed block cannot be unpacked correctly.",
484                  blockUnpacked, deserialized.unpack(meta, hbr));
485              }
486            }
487          }
488          is.close();
489        }
490      }
491    }
492  }
493
494  static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
495      boolean pread) {
496    return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
497  }
498
499  static void assertBuffersEqual(ByteBuff expectedBuffer,
500      ByteBuff actualBuffer, Compression.Algorithm compression,
501      DataBlockEncoding encoding, boolean pread) {
502    if (!actualBuffer.equals(expectedBuffer)) {
503      int prefix = 0;
504      int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
505      while (prefix < minLimit &&
506          expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
507        prefix++;
508      }
509
510      fail(String.format(
511          "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
512          buildMessageDetails(compression, encoding, pread), prefix,
513          nextBytesToStr(expectedBuffer, prefix),
514          nextBytesToStr(actualBuffer, prefix)));
515    }
516  }
517
518  /**
519   * Convert a few next bytes in the given buffer at the given position to
520   * string. Used for error messages.
521   */
522  private static String nextBytesToStr(ByteBuff buf, int pos) {
523    int maxBytes = buf.limit() - pos;
524    int numBytes = Math.min(16, maxBytes);
525    return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
526        numBytes) + (numBytes < maxBytes ? "..." : "");
527  }
528
529  @Test
530  public void testPreviousOffset() throws IOException {
531    testPreviousOffsetInternals();
532  }
533
534  protected void testPreviousOffsetInternals() throws IOException {
535    // TODO: parameterize these nested loops.
536    for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
537      for (boolean pread : BOOLEAN_VALUES) {
538        for (boolean cacheOnWrite : BOOLEAN_VALUES) {
539          Random rand = defaultRandom();
540          LOG.info("testPreviousOffset: Compression algorithm={}, pread={}, cacheOnWrite={}",
541              algo.toString(), pread, cacheOnWrite);
542          Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
543          List<Long> expectedOffsets = new ArrayList<>();
544          List<Long> expectedPrevOffsets = new ArrayList<>();
545          List<BlockType> expectedTypes = new ArrayList<>();
546          List<ByteBuffer> expectedContents = cacheOnWrite ? new ArrayList<>() : null;
547          long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
548              expectedPrevOffsets, expectedTypes, expectedContents);
549
550          FSDataInputStream is = fs.open(path);
551          HFileContext meta = new HFileContextBuilder()
552                              .withHBaseCheckSum(true)
553                              .withIncludesMvcc(includesMemstoreTS)
554                              .withIncludesTags(includesTag)
555                              .withCompression(algo).build();
556          HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
557          long curOffset = 0;
558          for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
559            if (!pread) {
560              assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
561                HConstants.HFILEBLOCK_HEADER_SIZE));
562            }
563
564            assertEquals(expectedOffsets.get(i).longValue(), curOffset);
565            if (detailedLogging) {
566              LOG.info("Reading block #" + i + " at offset " + curOffset);
567            }
568            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
569            if (detailedLogging) {
570              LOG.info("Block #" + i + ": " + b);
571            }
572            assertEquals("Invalid block #" + i + "'s type:",
573                expectedTypes.get(i), b.getBlockType());
574            assertEquals("Invalid previous block offset for block " + i
575                + " of " + "type " + b.getBlockType() + ":",
576                (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
577            b.sanityCheck();
578            assertEquals(curOffset, b.getOffset());
579
580            // Now re-load this block knowing the on-disk size. This tests a
581            // different branch in the loader.
582            HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false);
583            b2.sanityCheck();
584
585            assertEquals(b.getBlockType(), b2.getBlockType());
586            assertEquals(b.getOnDiskSizeWithoutHeader(),
587                b2.getOnDiskSizeWithoutHeader());
588            assertEquals(b.getOnDiskSizeWithHeader(),
589                b2.getOnDiskSizeWithHeader());
590            assertEquals(b.getUncompressedSizeWithoutHeader(),
591                b2.getUncompressedSizeWithoutHeader());
592            assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
593            assertEquals(curOffset, b2.getOffset());
594            assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
595            assertEquals(b.getOnDiskDataSizeWithHeader(),
596                         b2.getOnDiskDataSizeWithHeader());
597            assertEquals(0, HFile.getAndResetChecksumFailuresCount());
598
599            curOffset += b.getOnDiskSizeWithHeader();
600
601            if (cacheOnWrite) {
602              // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
603              // verifies that the unpacked value read back off disk matches the unpacked value
604              // generated before writing to disk.
605              b = b.unpack(meta, hbr);
606              // b's buffer has header + data + checksum while
607              // expectedContents have header + data only
608              ByteBuff bufRead = b.getBufferReadOnly();
609              ByteBuffer bufExpected = expectedContents.get(i);
610              boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
611                  bufRead.arrayOffset(),
612                  bufRead.limit() - b.totalChecksumBytes(),
613                  bufExpected.array(), bufExpected.arrayOffset(),
614                  bufExpected.limit()) == 0;
615              String wrongBytesMsg = "";
616
617              if (!bytesAreCorrect) {
618                // Optimization: only construct an error message in case we
619                // will need it.
620                wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
621                    + algo + ", pread=" + pread
622                    + ", cacheOnWrite=" + cacheOnWrite + "):\n";
623                wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
624                  bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
625                    + ", actual:\n"
626                    + Bytes.toStringBinary(bufRead.array(),
627                  bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
628                if (detailedLogging) {
629                  LOG.warn("expected header" +
630                           HFileBlock.toStringHeader(new SingleByteBuff(bufExpected)) +
631                           "\nfound    header" +
632                           HFileBlock.toStringHeader(bufRead));
633                  LOG.warn("bufread offset " + bufRead.arrayOffset() +
634                           " limit " + bufRead.limit() +
635                           " expected offset " + bufExpected.arrayOffset() +
636                           " limit " + bufExpected.limit());
637                  LOG.warn(wrongBytesMsg);
638                }
639              }
640              assertTrue(wrongBytesMsg, bytesAreCorrect);
641            }
642          }
643
644          assertEquals(curOffset, fs.getFileStatus(path).getLen());
645          is.close();
646        }
647      }
648    }
649  }
650
651  private Random defaultRandom() {
652    return new Random(189237);
653  }
654
655  private class BlockReaderThread implements Callable<Boolean> {
656    private final String clientId;
657    private final HFileBlock.FSReader hbr;
658    private final List<Long> offsets;
659    private final List<BlockType> types;
660    private final long fileSize;
661
662    public BlockReaderThread(String clientId,
663        HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
664        long fileSize) {
665      this.clientId = clientId;
666      this.offsets = offsets;
667      this.hbr = hbr;
668      this.types = types;
669      this.fileSize = fileSize;
670    }
671
672    @Override
673    public Boolean call() throws Exception {
674      Random rand = new Random(clientId.hashCode());
675      long endTime = System.currentTimeMillis() + 10000;
676      int numBlocksRead = 0;
677      int numPositionalRead = 0;
678      int numWithOnDiskSize = 0;
679      while (System.currentTimeMillis() < endTime) {
680        int blockId = rand.nextInt(NUM_TEST_BLOCKS);
681        long offset = offsets.get(blockId);
682        // now we only support concurrent read with pread = true
683        boolean pread = true;
684        boolean withOnDiskSize = rand.nextBoolean();
685        long expectedSize =
686          (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
687              : offsets.get(blockId + 1)) - offset;
688
689        HFileBlock b;
690        try {
691          long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
692          b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
693        } catch (IOException ex) {
694          LOG.error("Error in client " + clientId + " trying to read block at "
695              + offset + ", pread=" + pread + ", withOnDiskSize=" +
696              withOnDiskSize, ex);
697          return false;
698        }
699
700        assertEquals(types.get(blockId), b.getBlockType());
701        assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
702        assertEquals(offset, b.getOffset());
703
704        ++numBlocksRead;
705        if (pread)
706          ++numPositionalRead;
707        if (withOnDiskSize)
708          ++numWithOnDiskSize;
709      }
710      LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
711        " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
712        "specified: " + numWithOnDiskSize + ")");
713
714      return true;
715    }
716
717  }
718
719  @Test
720  public void testConcurrentReading() throws Exception {
721    testConcurrentReadingInternals();
722  }
723
724  protected void testConcurrentReadingInternals() throws IOException,
725      InterruptedException, ExecutionException {
726    for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
727      Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
728      Random rand = defaultRandom();
729      List<Long> offsets = new ArrayList<>();
730      List<BlockType> types = new ArrayList<>();
731      writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
732      FSDataInputStream is = fs.open(path);
733      long fileSize = fs.getFileStatus(path).getLen();
734      HFileContext meta = new HFileContextBuilder()
735                          .withHBaseCheckSum(true)
736                          .withIncludesMvcc(includesMemstoreTS)
737                          .withIncludesTags(includesTag)
738                          .withCompression(compressAlgo)
739                          .build();
740      HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta);
741
742      Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
743      ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
744
745      for (int i = 0; i < NUM_READER_THREADS; ++i) {
746        ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
747            offsets, types, fileSize));
748      }
749
750      for (int i = 0; i < NUM_READER_THREADS; ++i) {
751        Future<Boolean> result = ecs.take();
752        assertTrue(result.get());
753        if (detailedLogging) {
754          LOG.info(String.valueOf(i + 1)
755            + " reader threads finished successfully (algo=" + compressAlgo
756            + ")");
757        }
758      }
759
760      is.close();
761    }
762  }
763
764  private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
765      Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
766      List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
767  ) throws IOException {
768    boolean cacheOnWrite = expectedContents != null;
769    FSDataOutputStream os = fs.create(path);
770    HFileContext meta = new HFileContextBuilder()
771                        .withHBaseCheckSum(true)
772                        .withIncludesMvcc(includesMemstoreTS)
773                        .withIncludesTags(includesTag)
774                        .withCompression(compressAlgo)
775                        .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
776                        .build();
777    HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
778    Map<BlockType, Long> prevOffsetByType = new HashMap<>();
779    long totalSize = 0;
780    for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
781      long pos = os.getPos();
782      int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
783      if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
784        blockTypeOrdinal = BlockType.DATA.ordinal();
785      }
786      BlockType bt = BlockType.values()[blockTypeOrdinal];
787      DataOutputStream dos = hbw.startWriting(bt);
788      int size = rand.nextInt(500);
789      for (int j = 0; j < size; ++j) {
790        // This might compress well.
791        dos.writeShort(i + 1);
792        dos.writeInt(j + 1);
793      }
794
795      if (expectedOffsets != null)
796        expectedOffsets.add(os.getPos());
797
798      if (expectedPrevOffsets != null) {
799        Long prevOffset = prevOffsetByType.get(bt);
800        expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
801        prevOffsetByType.put(bt, os.getPos());
802      }
803
804      expectedTypes.add(bt);
805
806      hbw.writeHeaderAndData(os);
807      totalSize += hbw.getOnDiskSizeWithHeader();
808
809      if (cacheOnWrite)
810        expectedContents.add(hbw.cloneUncompressedBufferWithHeader());
811
812      if (detailedLogging) {
813        LOG.info("Written block #" + i + " of type " + bt
814            + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
815            + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
816            + " at offset " + pos);
817      }
818    }
819    os.close();
820    LOG.info("Created a temporary file at " + path + ", "
821        + fs.getFileStatus(path).getLen() + " byte, compression=" +
822        compressAlgo);
823    return totalSize;
824  }
825
826  @Test
827  public void testBlockHeapSize() {
828    testBlockHeapSizeInternals();
829  }
830
831  protected void testBlockHeapSizeInternals() {
832    if (ClassSize.is32BitJVM()) {
833      assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
834    } else {
835      assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE);
836    }
837
838    for (int size : new int[] { 100, 256, 12345 }) {
839      byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
840      ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
841      HFileContext meta = new HFileContextBuilder()
842                          .withIncludesMvcc(includesMemstoreTS)
843                          .withIncludesTags(includesTag)
844                          .withHBaseCheckSum(false)
845                          .withCompression(Algorithm.NONE)
846                          .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
847                          .withChecksumType(ChecksumType.NULL).build();
848      HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
849          HFileBlock.FILL_HEADER, -1, 0, -1, meta);
850      long byteBufferExpectedSize = ClassSize.align(ClassSize.estimateBase(
851          new MultiByteBuff(buf).getClass(), true)
852          + HConstants.HFILEBLOCK_HEADER_SIZE + size);
853      long hfileMetaSize =  ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
854      long hfileBlockExpectedSize =
855          ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
856      long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
857      assertEquals("Block data size: " + size + ", byte buffer expected " +
858          "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
859          "size: " + hfileBlockExpectedSize + ";", expected,
860          block.heapSize());
861    }
862  }
863
864  @Test
865  public void testSerializeWithoutNextBlockMetadata() {
866    int size = 100;
867    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
868    byte[] byteArr = new byte[length];
869    ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
870    HFileContext meta = new HFileContextBuilder().build();
871    HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
872        HFileBlock.FILL_HEADER, -1, 52, -1, meta);
873    HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
874        HFileBlock.FILL_HEADER, -1, -1, -1, meta);
875    ByteBuffer buff1 = ByteBuffer.allocate(length);
876    ByteBuffer buff2 = ByteBuffer.allocate(length);
877    blockWithNextBlockMetadata.serialize(buff1, true);
878    blockWithoutNextBlockMetadata.serialize(buff2, true);
879    assertNotEquals(buff1, buff2);
880    buff1.clear();
881    buff2.clear();
882    blockWithNextBlockMetadata.serialize(buff1, false);
883    blockWithoutNextBlockMetadata.serialize(buff2, false);
884    assertEquals(buff1, buff2);
885  }
886}