001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile;
019
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import java.util.concurrent.atomic.AtomicReference;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
034import org.apache.hadoop.hbase.Cell;
035import org.apache.hadoop.hbase.CellComparator;
036import org.apache.hadoop.hbase.CellUtil;
037import org.apache.hadoop.hbase.ExtendedCell;
038import org.apache.hadoop.hbase.HBaseInterfaceAudience;
039import org.apache.hadoop.hbase.KeyValue;
040import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
041import org.apache.hadoop.hbase.PrivateCellUtil;
042import org.apache.hadoop.hbase.io.HeapSize;
043import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
044import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
045import org.apache.hadoop.hbase.nio.ByteBuff;
046import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.ClassSize;
049import org.apache.hadoop.hbase.util.ObjectIntPair;
050import org.apache.hadoop.hbase.util.Strings;
051import org.apache.hadoop.io.WritableUtils;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Provides functionality to write ({@link BlockIndexWriter}) and read BlockIndexReader single-level
058 * and multi-level block indexes. Examples of how to use the block index writer can be found in
059 * {@link org.apache.hadoop.hbase.io.hfile.CompoundBloomFilterWriter} and {@link HFileWriterImpl}.
060 * Examples of how to use the reader can be found in {@link HFileReaderImpl} and
061 * org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex.
062 */
063@InterfaceAudience.Private
064public class HFileBlockIndex {
065
066  private static final Logger LOG = LoggerFactory.getLogger(HFileBlockIndex.class);
067
068  static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
069
070  /**
071   * The maximum size guideline for index blocks (both leaf, intermediate, and root). If not
072   * specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
073   */
074  public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
075
076  /**
077   * Minimum number of entries in a single index block. Even if we are above the
078   * hfile.index.block.max.size we will keep writing to the same block unless we have that many
079   * entries. We should have at least a few entries so that we don't have too many levels in the
080   * multi-level index. This should be at least 2 to make sure there is no infinite recursion.
081   */
082  public static final String MIN_INDEX_NUM_ENTRIES_KEY = "hfile.index.block.min.entries";
083
084  static final int DEFAULT_MIN_INDEX_NUM_ENTRIES = 16;
085
086  /**
087   * The number of bytes stored in each "secondary index" entry in addition to key bytes in the
088   * non-root index block format. The first long is the file offset of the deeper-level block the
089   * entry points to, and the int that follows is that block's on-disk size without including
090   * header.
091   */
092  static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
093
094  /**
095   * Error message when trying to use inline block API in single-level mode.
096   */
097  private static final String INLINE_BLOCKS_NOT_ALLOWED =
098    "Inline blocks are not allowed in the single-level-only mode";
099
100  /**
101   * The size of a meta-data record used for finding the mid-key in a multi-level index. Consists of
102   * the middle leaf-level index block offset (long), its on-disk size without header included
103   * (int), and the mid-key entry's zero-based index in that leaf index block.
104   */
105  protected static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG + 2 * Bytes.SIZEOF_INT;
106
107  /**
108   * An implementation of the BlockIndexReader that deals with block keys which are plain byte[]
109   * like MetaBlock or the Bloom Block for ROW bloom. Does not need a comparator. It can work on
110   * Bytes.BYTES_RAWCOMPARATOR
111   */
112  static class ByteArrayKeyBlockIndexReader extends BlockIndexReader {
113
114    private byte[][] blockKeys;
115
116    public ByteArrayKeyBlockIndexReader(final int treeLevel) {
117      // Can be null for METAINDEX block
118      searchTreeLevel = treeLevel;
119    }
120
121    @Override
122    protected long calculateHeapSizeForBlockKeys(long heapSize) {
123      // Calculating the size of blockKeys
124      if (blockKeys != null) {
125        heapSize += ClassSize.REFERENCE;
126        // Adding array + references overhead
127        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
128
129        // Adding bytes
130        for (byte[] key : blockKeys) {
131          heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
132        }
133      }
134      return heapSize;
135    }
136
137    @Override
138    public boolean isEmpty() {
139      return blockKeys.length == 0;
140    }
141
142    /**
143     * from 0 to {@link #getRootBlockCount() - 1}
144     */
145    public byte[] getRootBlockKey(int i) {
146      return blockKeys[i];
147    }
148
149    @Override
150    public BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key, HFileBlock currentBlock,
151      boolean cacheBlocks, boolean pread, boolean isCompaction,
152      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
153      throws IOException {
154      // this would not be needed
155      return null;
156    }
157
158    @Override
159    public Cell midkey(CachingBlockReader cachingBlockReader) throws IOException {
160      // Not needed here
161      return null;
162    }
163
164    @Override
165    protected void initialize(int numEntries) {
166      blockKeys = new byte[numEntries][];
167    }
168
169    @Override
170    protected void add(final byte[] key, final long offset, final int dataSize) {
171      blockOffsets[rootCount] = offset;
172      blockKeys[rootCount] = key;
173      blockDataSizes[rootCount] = dataSize;
174      rootCount++;
175    }
176
177    @Override
178    public int rootBlockContainingKey(byte[] key, int offset, int length, CellComparator comp) {
179      int pos = Bytes.binarySearch(blockKeys, key, offset, length);
180      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
181      // binarySearch's javadoc.
182
183      if (pos >= 0) {
184        // This means this is an exact match with an element of blockKeys.
185        assert pos < blockKeys.length;
186        return pos;
187      }
188
189      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
190      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
191      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
192      // key < blockKeys[0], meaning the file does not contain the given key.
193
194      int i = -pos - 1;
195      assert 0 <= i && i <= blockKeys.length;
196      return i - 1;
197    }
198
199    @Override
200    public int rootBlockContainingKey(Cell key) {
201      // Should not be called on this because here it deals only with byte[]
202      throw new UnsupportedOperationException(
203        "Cannot search for a key that is of Cell type. Only plain byte array keys "
204          + "can be searched for");
205    }
206
207    @Override
208    public String toString() {
209      StringBuilder sb = new StringBuilder();
210      sb.append("size=" + rootCount).append("\n");
211      for (int i = 0; i < rootCount; i++) {
212        sb.append("key=").append(KeyValue.keyToString(blockKeys[i])).append("\n  offset=")
213          .append(blockOffsets[i]).append(", dataSize=" + blockDataSizes[i]).append("\n");
214      }
215      return sb.toString();
216    }
217  }
218
219  /**
220   * An implementation of the BlockIndexReader that deals with block keys which are the key part of
221   * a cell like the Data block index or the ROW_COL bloom blocks This needs a comparator to work
222   * with the Cells
223   */
224  static class CellBasedKeyBlockIndexReader extends BlockIndexReader {
225
226    private ExtendedCell[] blockKeys;
227    /** Pre-computed mid-key */
228    private AtomicReference<ExtendedCell> midKey = new AtomicReference<>();
229    /** Needed doing lookup on blocks. */
230    protected CellComparator comparator;
231
232    public CellBasedKeyBlockIndexReader(final CellComparator c, final int treeLevel) {
233      // Can be null for METAINDEX block
234      comparator = c;
235      searchTreeLevel = treeLevel;
236    }
237
238    @Override
239    protected long calculateHeapSizeForBlockKeys(long heapSize) {
240      if (blockKeys != null) {
241        heapSize += ClassSize.REFERENCE;
242        // Adding array + references overhead
243        heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length * ClassSize.REFERENCE);
244
245        // Adding blockKeys
246        for (Cell key : blockKeys) {
247          heapSize += ClassSize.align(key.heapSize());
248        }
249      }
250      // Add comparator and the midkey atomicreference
251      heapSize += 2 * ClassSize.REFERENCE;
252      return heapSize;
253    }
254
255    @Override
256    public boolean isEmpty() {
257      return blockKeys.length == 0;
258    }
259
260    /**
261     * from 0 to {@link #getRootBlockCount() - 1}
262     */
263    public ExtendedCell getRootBlockKey(int i) {
264      return blockKeys[i];
265    }
266
267    @Override
268    public BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key, HFileBlock currentBlock,
269      boolean cacheBlocks, boolean pread, boolean isCompaction,
270      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
271      throws IOException {
272      int rootLevelIndex = rootBlockContainingKey(key);
273      if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
274        return null;
275      }
276
277      // the next indexed key
278      ExtendedCell nextIndexedKey = null;
279
280      // Read the next-level (intermediate or leaf) index block.
281      long currentOffset = blockOffsets[rootLevelIndex];
282      int currentOnDiskSize = blockDataSizes[rootLevelIndex];
283
284      if (rootLevelIndex < blockKeys.length - 1) {
285        nextIndexedKey = blockKeys[rootLevelIndex + 1];
286      } else {
287        nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
288      }
289
290      int lookupLevel = 1; // How many levels deep we are in our lookup.
291      int index = -1;
292
293      HFileBlock block = null;
294      KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
295      while (true) {
296        try {
297          // Must initialize it with null here, because if don't and once an exception happen in
298          // readBlock, then we'll release the previous assigned block twice in the finally block.
299          // (See HBASE-22422)
300          block = null;
301          if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
302            // Avoid reading the same block again, even with caching turned off.
303            // This is crucial for compaction-type workload which might have
304            // caching turned off. This is like a one-block cache inside the
305            // scanner.
306            block = currentBlock;
307          } else {
308            // Call HFile's caching block reader API. We always cache index
309            // blocks, otherwise we might get terrible performance.
310            boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
311            BlockType expectedBlockType;
312            if (lookupLevel < searchTreeLevel - 1) {
313              expectedBlockType = BlockType.INTERMEDIATE_INDEX;
314            } else if (lookupLevel == searchTreeLevel - 1) {
315              expectedBlockType = BlockType.LEAF_INDEX;
316            } else {
317              // this also accounts for ENCODED_DATA
318              expectedBlockType = BlockType.DATA;
319            }
320            block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
321              pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
322          }
323
324          if (block == null) {
325            throw new IOException("Failed to read block at offset " + currentOffset
326              + ", onDiskSize=" + currentOnDiskSize);
327          }
328
329          // Found a data block, break the loop and check our level in the tree.
330          if (block.getBlockType().isData()) {
331            break;
332          }
333
334          // Not a data block. This must be a leaf-level or intermediate-level
335          // index block. We don't allow going deeper than searchTreeLevel.
336          if (++lookupLevel > searchTreeLevel) {
337            throw new IOException("Search Tree Level overflow: lookupLevel=" + lookupLevel
338              + ", searchTreeLevel=" + searchTreeLevel);
339          }
340
341          // Locate the entry corresponding to the given key in the non-root
342          // (leaf or intermediate-level) index block.
343          ByteBuff buffer = block.getBufferWithoutHeader();
344          index = locateNonRootIndexEntry(buffer, key, comparator);
345          if (index == -1) {
346            // This has to be changed
347            // For now change this to key value
348            throw new IOException("The key " + CellUtil.getCellKeyAsString(key) + " is before the"
349              + " first key of the non-root index block " + block);
350          }
351
352          currentOffset = buffer.getLong();
353          currentOnDiskSize = buffer.getInt();
354
355          // Only update next indexed key if there is a next indexed key in the current level
356          byte[] nonRootIndexedKey = getNonRootIndexedKey(buffer, index + 1);
357          if (nonRootIndexedKey != null) {
358            tmpNextIndexKV.setKey(nonRootIndexedKey, 0, nonRootIndexedKey.length);
359            nextIndexedKey = tmpNextIndexKV;
360          }
361        } finally {
362          if (block != null && !block.getBlockType().isData()) {
363            // Release the block immediately if it is not the data block
364            block.release();
365          }
366        }
367      }
368
369      if (lookupLevel != searchTreeLevel) {
370        assert block.getBlockType().isData();
371        // Though we have retrieved a data block we have found an issue
372        // in the retrieved data block. Hence returned the block so that
373        // the ref count can be decremented
374        if (block != null) {
375          block.release();
376        }
377        throw new IOException("Reached a data block at level " + lookupLevel
378          + " but the number of levels is " + searchTreeLevel);
379      }
380
381      // set the next indexed key for the current block.
382      return new BlockWithScanInfo(block, nextIndexedKey);
383    }
384
385    @Override
386    public ExtendedCell midkey(CachingBlockReader cachingBlockReader) throws IOException {
387      if (rootCount == 0) {
388        throw new IOException("HFile empty");
389      }
390
391      ExtendedCell targetMidKey = this.midKey.get();
392      if (targetMidKey != null) {
393        return targetMidKey;
394      }
395
396      if (midLeafBlockOffset >= 0) {
397        if (cachingBlockReader == null) {
398          throw new IOException(
399            "Have to read the middle leaf block but " + "no block reader available");
400        }
401
402        // Caching, using pread, assuming this is not a compaction.
403        HFileBlock midLeafBlock = cachingBlockReader.readBlock(midLeafBlockOffset,
404          midLeafBlockOnDiskSize, true, true, false, true, BlockType.LEAF_INDEX, null);
405        try {
406          byte[] bytes = getNonRootIndexedKey(midLeafBlock.getBufferWithoutHeader(), midKeyEntry);
407          assert bytes != null;
408          targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
409        } finally {
410          midLeafBlock.release();
411        }
412      } else {
413        // The middle of the root-level index.
414        targetMidKey = blockKeys[rootCount / 2];
415      }
416
417      this.midKey.set(targetMidKey);
418      return targetMidKey;
419    }
420
421    @Override
422    protected void initialize(int numEntries) {
423      blockKeys = new ExtendedCell[numEntries];
424    }
425
426    /**
427     * Adds a new entry in the root block index. Only used when reading.
428     * @param key      Last key in the block
429     * @param offset   file offset where the block is stored
430     * @param dataSize the uncompressed data size
431     */
432    @Override
433    protected void add(final byte[] key, final long offset, final int dataSize) {
434      blockOffsets[rootCount] = offset;
435      // Create the blockKeys as Cells once when the reader is opened
436      blockKeys[rootCount] = new KeyValue.KeyOnlyKeyValue(key, 0, key.length);
437      blockDataSizes[rootCount] = dataSize;
438      rootCount++;
439    }
440
441    @Override
442    public int rootBlockContainingKey(final byte[] key, int offset, int length,
443      CellComparator comp) {
444      // This should always be called with Cell not with a byte[] key
445      throw new UnsupportedOperationException("Cannot find for a key containing plain byte "
446        + "array. Only cell based keys can be searched for");
447    }
448
449    @Override
450    public int rootBlockContainingKey(Cell key) {
451      // Here the comparator should not be null as this happens for the root-level block
452      int pos = Bytes.binarySearch(blockKeys, key, comparator);
453      // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
454      // binarySearch's javadoc.
455
456      if (pos >= 0) {
457        // This means this is an exact match with an element of blockKeys.
458        assert pos < blockKeys.length;
459        return pos;
460      }
461
462      // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
463      // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
464      // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
465      // key < blockKeys[0], meaning the file does not contain the given key.
466
467      int i = -pos - 1;
468      assert 0 <= i && i <= blockKeys.length;
469      return i - 1;
470    }
471
472    @Override
473    public String toString() {
474      StringBuilder sb = new StringBuilder();
475      sb.append("size=" + rootCount).append("\n");
476      for (int i = 0; i < rootCount; i++) {
477        sb.append("key=").append((blockKeys[i])).append("\n  offset=").append(blockOffsets[i])
478          .append(", dataSize=" + blockDataSizes[i]).append("\n");
479      }
480      return sb.toString();
481    }
482  }
483
484  static class CellBasedKeyBlockIndexReaderV2 extends CellBasedKeyBlockIndexReader {
485
486    private HFileIndexBlockEncoder indexBlockEncoder;
487
488    private HFileIndexBlockEncoder.EncodedSeeker seeker;
489
490    public CellBasedKeyBlockIndexReaderV2(final CellComparator c, final int treeLevel) {
491      this(c, treeLevel, null);
492    }
493
494    public CellBasedKeyBlockIndexReaderV2(final CellComparator c, final int treeLevel,
495      HFileIndexBlockEncoder indexBlockEncoder) {
496      super(c, treeLevel);
497      // Can be null for METAINDEX block
498      this.indexBlockEncoder =
499        indexBlockEncoder != null ? indexBlockEncoder : NoOpIndexBlockEncoder.INSTANCE;
500    }
501
502    @Override
503    public boolean isEmpty() {
504      return seeker.isEmpty();
505    }
506
507    @Override
508    public BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key, HFileBlock currentBlock,
509      boolean cacheBlocks, boolean pread, boolean isCompaction,
510      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
511      throws IOException {
512      return seeker.loadDataBlockWithScanInfo(key, currentBlock, cacheBlocks, pread, isCompaction,
513        expectedDataBlockEncoding, cachingBlockReader);
514    }
515
516    @Override
517    public ExtendedCell midkey(CachingBlockReader cachingBlockReader) throws IOException {
518      return seeker.midkey(cachingBlockReader);
519    }
520
521    /**
522     * from 0 to {@link #getRootBlockCount() - 1}
523     */
524    public ExtendedCell getRootBlockKey(int i) {
525      return seeker.getRootBlockKey(i);
526    }
527
528    @Override
529    public int getRootBlockCount() {
530      return seeker.getRootBlockCount();
531    }
532
533    @Override
534    public int rootBlockContainingKey(Cell key) {
535      return seeker.rootBlockContainingKey(key);
536    }
537
538    @Override
539    protected long calculateHeapSizeForBlockKeys(long heapSize) {
540      heapSize = super.calculateHeapSizeForBlockKeys(heapSize);
541      if (seeker != null) {
542        heapSize += ClassSize.REFERENCE;
543        heapSize += ClassSize.align(seeker.heapSize());
544      }
545      return heapSize;
546    }
547
548    @Override
549    public void readMultiLevelIndexRoot(HFileBlock blk, final int numEntries) throws IOException {
550      seeker = indexBlockEncoder.createSeeker();
551      seeker.initRootIndex(blk, numEntries, comparator, searchTreeLevel);
552    }
553
554    @Override
555    public String toString() {
556      return seeker.toString();
557    }
558  }
559
560  /**
561   * The reader will always hold the root level index in the memory. Index blocks at all other
562   * levels will be cached in the LRU cache in practice, although this API does not enforce that.
563   * <p>
564   * All non-root (leaf and intermediate) index blocks contain what we call a "secondary index": an
565   * array of offsets to the entries within the block. This allows us to do binary search for the
566   * entry corresponding to the given key without having to deserialize the block.
567   */
568  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
569  public static abstract class BlockIndexReader implements HeapSize {
570
571    protected long[] blockOffsets;
572    protected int[] blockDataSizes;
573    protected int rootCount = 0;
574
575    // Mid-key metadata.
576    protected long midLeafBlockOffset = -1;
577    protected int midLeafBlockOnDiskSize = -1;
578    protected int midKeyEntry = -1;
579
580    /**
581     * The number of levels in the block index tree. One if there is only root level, two for root
582     * and leaf levels, etc.
583     */
584    protected int searchTreeLevel;
585
586    /** Returns true if the block index is empty. */
587    public abstract boolean isEmpty();
588
589    /**
590     * Verifies that the block index is non-empty and throws an {@link IllegalStateException}
591     * otherwise.
592     */
593    public void ensureNonEmpty() {
594      if (isEmpty()) {
595        throw new IllegalStateException("Block index is empty or not loaded");
596      }
597    }
598
599    /**
600     * Return the data block which contains this key. This function will only be called when the
601     * HFile version is larger than 1.
602     * @param key                       the key we are looking for
603     * @param currentBlock              the current block, to avoid re-reading the same block
604     * @param expectedDataBlockEncoding the data block encoding the caller is expecting the data
605     *                                  block to be in, or null to not perform this check and return
606     *                                  the block irrespective of the encoding
607     * @return reader a basic way to load blocks
608     */
609    public HFileBlock seekToDataBlock(final ExtendedCell key, HFileBlock currentBlock,
610      boolean cacheBlocks, boolean pread, boolean isCompaction,
611      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
612      throws IOException {
613      BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
614        cacheBlocks, pread, isCompaction, expectedDataBlockEncoding, cachingBlockReader);
615      if (blockWithScanInfo == null) {
616        return null;
617      } else {
618        return blockWithScanInfo.getHFileBlock();
619      }
620    }
621
622    /**
623     * Return the BlockWithScanInfo, a data structure which contains the Data HFileBlock with other
624     * scan info such as the key that starts the next HFileBlock. This function will only be called
625     * when the HFile version is larger than 1.
626     * @param key                       the key we are looking for
627     * @param currentBlock              the current block, to avoid re-reading the same block
628     * @param expectedDataBlockEncoding the data block encoding the caller is expecting the data
629     *                                  block to be in, or null to not perform this check and return
630     *                                  the block irrespective of the encoding.
631     * @return the BlockWithScanInfo which contains the DataBlock with other scan info such as
632     *         nextIndexedKey.
633     */
634    public abstract BlockWithScanInfo loadDataBlockWithScanInfo(ExtendedCell key,
635      HFileBlock currentBlock, boolean cacheBlocks, boolean pread, boolean isCompaction,
636      DataBlockEncoding expectedDataBlockEncoding, CachingBlockReader cachingBlockReader)
637      throws IOException;
638
639    /**
640     * An approximation to the {@link HFile}'s mid-key. Operates on block boundaries, and does not
641     * go inside blocks. In other words, returns the first key of the middle block of the file.
642     * @return the first key of the middle block
643     */
644    public abstract Cell midkey(CachingBlockReader cachingBlockReader) throws IOException;
645
646    /**
647     * @param i from 0 to {@link #getRootBlockCount() - 1}
648     */
649    public long getRootBlockOffset(int i) {
650      return blockOffsets[i];
651    }
652
653    /**
654     * @param i zero-based index of a root-level block
655     * @return the on-disk size of the root-level block for version 2, or the uncompressed size for
656     *         version 1
657     */
658    public int getRootBlockDataSize(int i) {
659      return blockDataSizes[i];
660    }
661
662    /** Returns the number of root-level blocks in this block index */
663    public int getRootBlockCount() {
664      return rootCount;
665    }
666
667    /**
668     * Finds the root-level index block containing the given key. Key to find the comparator to be
669     * used
670     * @return Offset of block containing <code>key</code> (between 0 and the number of blocks - 1)
671     *         or -1 if this file does not contain the request.
672     */
673    // When we want to find the meta index block or bloom block for ROW bloom
674    // type Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we need the
675    // CellComparator.
676    public abstract int rootBlockContainingKey(final byte[] key, int offset, int length,
677      CellComparator comp);
678
679    /**
680     * Finds the root-level index block containing the given key. Key to find
681     * @return Offset of block containing <code>key</code> (between 0 and the number of blocks - 1)
682     *         or -1 if this file does not contain the request.
683     */
684    // When we want to find the meta index block or bloom block for ROW bloom
685    // type
686    // Bytes.BYTES_RAWCOMPARATOR would be enough. For the ROW_COL bloom case we
687    // need the CellComparator.
688    public int rootBlockContainingKey(final byte[] key, int offset, int length) {
689      return rootBlockContainingKey(key, offset, length, null);
690    }
691
692    /**
693     * Finds the root-level index block containing the given key. Key to find
694     */
695    public abstract int rootBlockContainingKey(final Cell key);
696
697    /**
698     * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
699     * @param i the ith position
700     * @return The indexed key at the ith position in the nonRootIndex.
701     */
702    static byte[] getNonRootIndexedKey(ByteBuff nonRootIndex, int i) {
703      int numEntries = nonRootIndex.getInt(0);
704      if (i < 0 || i >= numEntries) {
705        return null;
706      }
707
708      // Entries start after the number of entries and the secondary index.
709      // The secondary index takes numEntries + 1 ints.
710      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
711      // Targetkey's offset relative to the end of secondary index
712      int targetKeyRelOffset = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 1));
713
714      // The offset of the target key in the blockIndex buffer
715      int targetKeyOffset = entriesOffset // Skip secondary index
716        + targetKeyRelOffset // Skip all entries until mid
717        + SECONDARY_INDEX_ENTRY_OVERHEAD; // Skip offset and on-disk-size
718
719      // We subtract the two consecutive secondary index elements, which
720      // gives us the size of the whole (offset, onDiskSize, key) tuple. We
721      // then need to subtract the overhead of offset and onDiskSize.
722      int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) - targetKeyRelOffset
723        - SECONDARY_INDEX_ENTRY_OVERHEAD;
724
725      // TODO check whether we can make BB backed Cell here? So can avoid bytes copy.
726      return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength);
727    }
728
729    /**
730     * Performs a binary search over a non-root level index block. Utilizes the secondary index,
731     * which records the offsets of (offset, onDiskSize, firstKey) tuples of all entries. the key we
732     * are searching for offsets to individual entries in the blockIndex buffer the non-root index
733     * block buffer, starting with the secondary index. The position is ignored.
734     * @return the index i in [0, numEntries - 1] such that keys[i] <= key < keys[i + 1], if keys is
735     *         the array of all keys being searched, or -1 otherwise
736     */
737    static int binarySearchNonRootIndex(Cell key, ByteBuff nonRootIndex,
738      CellComparator comparator) {
739
740      int numEntries = nonRootIndex.getIntAfterPosition(0);
741      int low = 0;
742      int high = numEntries - 1;
743      int mid = 0;
744
745      // Entries start after the number of entries and the secondary index.
746      // The secondary index takes numEntries + 1 ints.
747      int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
748
749      // If we imagine that keys[-1] = -Infinity and
750      // keys[numEntries] = Infinity, then we are maintaining an invariant that
751      // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
752      ByteBufferKeyOnlyKeyValue nonRootIndexkeyOnlyKV = new ByteBufferKeyOnlyKeyValue();
753      ObjectIntPair<ByteBuffer> pair = new ObjectIntPair<>();
754      while (low <= high) {
755        mid = low + ((high - low) >> 1);
756
757        // Midkey's offset relative to the end of secondary index
758        int midKeyRelOffset = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 1));
759
760        // The offset of the middle key in the blockIndex buffer
761        int midKeyOffset = entriesOffset // Skip secondary index
762          + midKeyRelOffset // Skip all entries until mid
763          + SECONDARY_INDEX_ENTRY_OVERHEAD; // Skip offset and on-disk-size
764
765        // We subtract the two consecutive secondary index elements, which
766        // gives us the size of the whole (offset, onDiskSize, key) tuple. We
767        // then need to subtract the overhead of offset and onDiskSize.
768        int midLength = nonRootIndex.getIntAfterPosition(Bytes.SIZEOF_INT * (mid + 2))
769          - midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
770
771        // we have to compare in this order, because the comparator order
772        // has special logic when the 'left side' is a special key.
773        // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
774        // done after HBASE-12224 & HBASE-12282
775        // TODO avoid array call.
776        nonRootIndex.asSubByteBuffer(midKeyOffset, midLength, pair);
777        nonRootIndexkeyOnlyKV.setKey(pair.getFirst(), pair.getSecond(), midLength);
778        int cmp = PrivateCellUtil.compareKeyIgnoresMvcc(comparator, key, nonRootIndexkeyOnlyKV);
779
780        // key lives above the midpoint
781        if (cmp > 0) low = mid + 1; // Maintain the invariant that keys[low - 1] < key
782        // key lives below the midpoint
783        else if (cmp < 0) high = mid - 1; // Maintain the invariant that key < keys[high + 1]
784        else return mid; // exact match
785      }
786
787      // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
788      // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
789      // condition, low >= high + 1. Therefore, low = high + 1.
790
791      if (low != high + 1) {
792        throw new IllegalStateException(
793          "Binary search broken: low=" + low + " " + "instead of " + (high + 1));
794      }
795
796      // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
797      // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
798      int i = low - 1;
799
800      // Some extra validation on the result.
801      if (i < -1 || i >= numEntries) {
802        throw new IllegalStateException("Binary search broken: result is " + i
803          + " but expected to be between -1 and (numEntries - 1) = " + (numEntries - 1));
804      }
805
806      return i;
807    }
808
809    /**
810     * Search for one key using the secondary index in a non-root block. In case of success,
811     * positions the provided buffer at the entry of interest, where the file offset and the
812     * on-disk-size can be read. a non-root block without header. Initial position does not matter.
813     * the byte array containing the key
814     * @return the index position where the given key was found, otherwise return -1 in the case the
815     *         given key is before the first key.
816     */
817    public static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
818      CellComparator comparator) {
819      int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
820
821      if (entryIndex != -1) {
822        int numEntries = nonRootBlock.getIntAfterPosition(0);
823
824        // The end of secondary index and the beginning of entries themselves.
825        int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
826
827        // The offset of the entry we are interested in relative to the end of
828        // the secondary index.
829        int entryRelOffset = nonRootBlock.getIntAfterPosition(Bytes.SIZEOF_INT * (1 + entryIndex));
830
831        nonRootBlock.position(entriesOffset + entryRelOffset);
832      }
833
834      return entryIndex;
835    }
836
837    /**
838     * Read in the root-level index from the given input stream. Must match what was written into
839     * the root level by {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the offset
840     * that function returned.
841     * @param in         the buffered input stream or wrapped byte input stream
842     * @param numEntries the number of root-level index entries
843     */
844    public void readRootIndex(DataInput in, final int numEntries) throws IOException {
845      blockOffsets = new long[numEntries];
846      initialize(numEntries);
847      blockDataSizes = new int[numEntries];
848
849      // If index size is zero, no index was written.
850      if (numEntries > 0) {
851        for (int i = 0; i < numEntries; ++i) {
852          long offset = in.readLong();
853          int dataSize = in.readInt();
854          byte[] key = Bytes.readByteArray(in);
855          add(key, offset, dataSize);
856        }
857      }
858    }
859
860    protected abstract void initialize(int numEntries);
861
862    protected abstract void add(final byte[] key, final long offset, final int dataSize);
863
864    /**
865     * Read in the root-level index from the given input stream. Must match what was written into
866     * the root level by {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the offset
867     * that function returned.
868     * @param blk        the HFile block
869     * @param numEntries the number of root-level index entries
870     * @return the buffered input stream or wrapped byte input stream
871     */
872    public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
873      DataInputStream in = blk.getByteStream();
874      readRootIndex(in, numEntries);
875      return in;
876    }
877
878    /**
879     * Read the root-level metadata of a multi-level block index. Based on
880     * {@link #readRootIndex(DataInput, int)}, but also reads metadata necessary to compute the
881     * mid-key in a multi-level index.
882     * @param blk        the HFile block
883     * @param numEntries the number of root-level index entries
884     */
885    public void readMultiLevelIndexRoot(HFileBlock blk, final int numEntries) throws IOException {
886      DataInputStream in = readRootIndex(blk, numEntries);
887      // HFileBlock.getByteStream() returns a byte stream for reading the data(excluding checksum)
888      // of root index block, so after reading the root index there is no need to subtract the
889      // checksum bytes.
890      if (in.available() < MID_KEY_METADATA_SIZE) {
891        // No mid-key metadata available.
892        return;
893      }
894      midLeafBlockOffset = in.readLong();
895      midLeafBlockOnDiskSize = in.readInt();
896      midKeyEntry = in.readInt();
897    }
898
899    @Override
900    public long heapSize() {
901      // The BlockIndexReader does not have the blockKey, comparator and the midkey atomic reference
902      long heapSize =
903        ClassSize.align(3 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
904
905      // Mid-key metadata.
906      heapSize += MID_KEY_METADATA_SIZE;
907
908      heapSize = calculateHeapSizeForBlockKeys(heapSize);
909
910      if (blockOffsets != null) {
911        heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length * Bytes.SIZEOF_LONG);
912      }
913
914      if (blockDataSizes != null) {
915        heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length * Bytes.SIZEOF_INT);
916      }
917
918      return ClassSize.align(heapSize);
919    }
920
921    protected abstract long calculateHeapSizeForBlockKeys(long heapSize);
922  }
923
924  /**
925   * Writes the block index into the output stream. Generate the tree from bottom up. The leaf level
926   * is written to disk as a sequence of inline blocks, if it is larger than a certain number of
927   * bytes. If the leaf level is not large enough, we write all entries to the root level instead.
928   * After all leaf blocks have been written, we end up with an index referencing the resulting leaf
929   * index blocks. If that index is larger than the allowed root index size, the writer will break
930   * it up into reasonable-size intermediate-level index block chunks write those chunks out, and
931   * create another index referencing those chunks. This will be repeated until the remaining index
932   * is small enough to become the root index. However, in most practical cases we will only have
933   * leaf-level blocks and the root index, or just the root index.
934   */
935  public static class BlockIndexWriter implements InlineBlockWriter {
936    /**
937     * While the index is being written, this represents the current block index referencing all
938     * leaf blocks, with one exception. If the file is being closed and there are not enough blocks
939     * to complete even a single leaf block, no leaf blocks get written and this contains the entire
940     * block index. After all levels of the index were written by
941     * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final root-level index.
942     */
943    private BlockIndexChunk rootChunk = new BlockIndexChunkImpl();
944
945    /**
946     * Current leaf-level chunk. New entries referencing data blocks get added to this chunk until
947     * it grows large enough to be written to disk.
948     */
949    private BlockIndexChunk curInlineChunk = new BlockIndexChunkImpl();
950
951    /**
952     * The number of block index levels. This is one if there is only root level (even empty), two
953     * if there a leaf level and root level, and is higher if there are intermediate levels. This is
954     * only final after {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The initial
955     * value accounts for the root level, and will be increased to two as soon as we find out there
956     * is a leaf-level in {@link #blockWritten(long, int, int)}.
957     */
958    private int numLevels = 1;
959
960    private HFileBlock.Writer blockWriter;
961    private byte[] firstKey = null;
962
963    /**
964     * The total number of leaf-level entries, i.e. entries referenced by leaf-level blocks. For the
965     * data block index this is equal to the number of data blocks.
966     */
967    private long totalNumEntries;
968
969    /** Total compressed size of all index blocks. */
970    private long totalBlockOnDiskSize;
971
972    /** Total uncompressed size of all index blocks. */
973    private long totalBlockUncompressedSize;
974
975    /** The maximum size guideline of all multi-level index blocks. */
976    private int maxChunkSize;
977
978    /** The maximum level of multi-level index blocks */
979    private int minIndexNumEntries;
980
981    /** Whether we require this block index to always be single-level. */
982    private boolean singleLevelOnly;
983
984    /** CacheConfig, or null if cache-on-write is disabled */
985    private CacheConfig cacheConf;
986
987    /** Name to use for computing cache keys */
988    private String nameForCaching;
989
990    /** Type of encoding used for index blocks in HFile */
991    private HFileIndexBlockEncoder indexBlockEncoder;
992
993    /** Creates a single-level block index writer */
994    public BlockIndexWriter() {
995      this(null, null, null, null);
996      singleLevelOnly = true;
997    }
998
999    /**
1000     * Creates a multi-level block index writer.
1001     * @param blockWriter the block writer to use to write index blocks
1002     * @param cacheConf   used to determine when and how a block should be cached-on-write.
1003     */
1004    public BlockIndexWriter(HFileBlock.Writer blockWriter, CacheConfig cacheConf,
1005      String nameForCaching, HFileIndexBlockEncoder indexBlockEncoder) {
1006      if ((cacheConf == null) != (nameForCaching == null)) {
1007        throw new IllegalArgumentException(
1008          "Block cache and file name for " + "caching must be both specified or both null");
1009      }
1010
1011      this.blockWriter = blockWriter;
1012      this.cacheConf = cacheConf;
1013      this.nameForCaching = nameForCaching;
1014      this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
1015      this.minIndexNumEntries = HFileBlockIndex.DEFAULT_MIN_INDEX_NUM_ENTRIES;
1016      this.indexBlockEncoder =
1017        indexBlockEncoder != null ? indexBlockEncoder : NoOpIndexBlockEncoder.INSTANCE;
1018    }
1019
1020    public void setMaxChunkSize(int maxChunkSize) {
1021      if (maxChunkSize <= 0) {
1022        throw new IllegalArgumentException("Invalid maximum index block size");
1023      }
1024      this.maxChunkSize = maxChunkSize;
1025    }
1026
1027    public void setMinIndexNumEntries(int minIndexNumEntries) {
1028      if (minIndexNumEntries <= 1) {
1029        throw new IllegalArgumentException("Invalid maximum index level, should be >= 2");
1030      }
1031      this.minIndexNumEntries = minIndexNumEntries;
1032    }
1033
1034    /**
1035     * Writes the root level and intermediate levels of the block index into the output stream,
1036     * generating the tree from bottom up. Assumes that the leaf level has been inline-written to
1037     * the disk if there is enough data for more than one leaf block. We iterate by breaking the
1038     * current level of the block index, starting with the index of all leaf-level blocks, into
1039     * chunks small enough to be written to disk, and generate its parent level, until we end up
1040     * with a level small enough to become the root level. If the leaf level is not large enough,
1041     * there is no inline block index anymore, so we only write that level of block index to disk as
1042     * the root level.
1043     * @param out FSDataOutputStream
1044     * @return position at which we entered the root-level index.
1045     */
1046    public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
1047      if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
1048        throw new IOException("Trying to write a multi-level block index, " + "but are "
1049          + curInlineChunk.getNumEntries() + " entries in the " + "last inline chunk.");
1050      }
1051
1052      // We need to get mid-key metadata before we create intermediate
1053      // indexes and overwrite the root chunk.
1054      byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata() : null;
1055
1056      if (curInlineChunk != null) {
1057        while (
1058          rootChunk.getRootSize() > maxChunkSize
1059            // HBASE-16288: if firstKey is larger than maxChunkSize we will loop indefinitely
1060            && rootChunk.getNumEntries() > minIndexNumEntries
1061            // Sanity check. We will not hit this (minIndexNumEntries ^ 16) blocks can be addressed
1062            && numLevels < 16
1063        ) {
1064          rootChunk = writeIntermediateLevel(out, rootChunk);
1065          numLevels += 1;
1066        }
1067      }
1068
1069      // write the root level
1070      long rootLevelIndexPos = out.getPos();
1071
1072      {
1073        DataOutput blockStream = blockWriter.startWriting(BlockType.ROOT_INDEX);
1074        indexBlockEncoder.encode(rootChunk, true, blockStream);
1075        if (midKeyMetadata != null) blockStream.write(midKeyMetadata);
1076        blockWriter.writeHeaderAndData(out);
1077        if (cacheConf != null) {
1078          cacheConf.getBlockCache().ifPresent(cache -> {
1079            HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1080            cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
1081              blockForCaching.getBlockType()), blockForCaching);
1082            // Index blocks always go to LRU, which then converts any off-heap buffer to on-heap,
1083            // so we need to release any off-heap buffers now to avoid leak
1084            blockForCaching.release();
1085          });
1086        }
1087      }
1088
1089      // Add root index block size
1090      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1091      totalBlockUncompressedSize += blockWriter.getUncompressedSizeWithoutHeader();
1092
1093      if (LOG.isTraceEnabled()) {
1094        LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
1095          + rootLevelIndexPos + ", " + rootChunk.getNumEntries() + " root-level entries, "
1096          + totalNumEntries + " total entries, "
1097          + Strings.humanReadableInt(this.totalBlockOnDiskSize) + " on-disk size, "
1098          + Strings.humanReadableInt(totalBlockUncompressedSize) + " total uncompressed size.");
1099      }
1100      return rootLevelIndexPos;
1101    }
1102
1103    /**
1104     * Writes the block index data as a single level only. Does not do any block framing.
1105     * @param out         the buffered output stream to write the index to. Typically a stream
1106     *                    writing into an {@link HFile} block.
1107     * @param description a short description of the index being written. Used in a log message.
1108     */
1109    public void writeSingleLevelIndex(DataOutput out, String description) throws IOException {
1110      expectNumLevels(1);
1111
1112      if (!singleLevelOnly) throw new IOException("Single-level mode is turned off");
1113
1114      if (rootChunk.getNumEntries() > 0)
1115        throw new IOException("Root-level entries already added in " + "single-level mode");
1116
1117      rootChunk = curInlineChunk;
1118      curInlineChunk = new BlockIndexChunkImpl();
1119
1120      if (LOG.isTraceEnabled()) {
1121        LOG.trace("Wrote a single-level " + description + " index with " + rootChunk.getNumEntries()
1122          + " entries, " + rootChunk.getRootSize() + " bytes");
1123      }
1124      indexBlockEncoder.encode(rootChunk, true, out);
1125    }
1126
1127    /**
1128     * Split the current level of the block index into intermediate index blocks of permitted size
1129     * and write those blocks to disk. Return the next level of the block index referencing those
1130     * intermediate-level blocks.
1131     * @param currentLevel the current level of the block index, such as the a chunk referencing all
1132     *                     leaf-level index blocks
1133     * @return the parent level block index, which becomes the root index after a few (usually zero)
1134     *         iterations
1135     */
1136    private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
1137      BlockIndexChunk currentLevel) throws IOException {
1138      // Entries referencing intermediate-level blocks we are about to create.
1139      BlockIndexChunk parent = new BlockIndexChunkImpl();
1140
1141      // The current intermediate-level block index chunk.
1142      BlockIndexChunk curChunk = new BlockIndexChunkImpl();
1143
1144      for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
1145        curChunk.add(currentLevel.getBlockKey(i), currentLevel.getBlockOffset(i),
1146          currentLevel.getOnDiskDataSize(i));
1147
1148        // HBASE-16288: We have to have at least minIndexNumEntries(16) items in the index so that
1149        // we won't end up with too-many levels for a index with very large rowKeys. Also, if the
1150        // first key is larger than maxChunkSize this will cause infinite recursion.
1151        if (i >= minIndexNumEntries && curChunk.getRootSize() >= maxChunkSize) {
1152          writeIntermediateBlock(out, parent, curChunk);
1153        }
1154      }
1155
1156      if (curChunk.getNumEntries() > 0) {
1157        writeIntermediateBlock(out, parent, curChunk);
1158      }
1159
1160      return parent;
1161    }
1162
1163    private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk parent,
1164      BlockIndexChunk curChunk) throws IOException {
1165      long beginOffset = out.getPos();
1166      DataOutputStream dos = blockWriter.startWriting(BlockType.INTERMEDIATE_INDEX);
1167      indexBlockEncoder.encode(curChunk, false, dos);
1168      byte[] curFirstKey = curChunk.getBlockKey(0);
1169      blockWriter.writeHeaderAndData(out);
1170
1171      if (getCacheOnWrite()) {
1172        cacheConf.getBlockCache().ifPresent(cache -> {
1173          HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
1174          cache.cacheBlock(
1175            new BlockCacheKey(nameForCaching, beginOffset, true, blockForCaching.getBlockType()),
1176            blockForCaching);
1177        });
1178      }
1179
1180      // Add intermediate index block size
1181      totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
1182      totalBlockUncompressedSize += blockWriter.getUncompressedSizeWithoutHeader();
1183
1184      // OFFSET is the beginning offset the chunk of block index entries.
1185      // SIZE is the total byte size of the chunk of block index entries
1186      // + the secondary index size
1187      // FIRST_KEY is the first key in the chunk of block index
1188      // entries.
1189      parent.add(curFirstKey, beginOffset, blockWriter.getOnDiskSizeWithHeader());
1190
1191      // clear current block index chunk
1192      curChunk.clear();
1193      curFirstKey = null;
1194    }
1195
1196    /** Returns how many block index entries there are in the root level */
1197    public final int getNumRootEntries() {
1198      return rootChunk.getNumEntries();
1199    }
1200
1201    /** Returns the number of levels in this block index. */
1202    public int getNumLevels() {
1203      return numLevels;
1204    }
1205
1206    private void expectNumLevels(int expectedNumLevels) {
1207      if (numLevels != expectedNumLevels) {
1208        throw new IllegalStateException("Number of block index levels is " + numLevels
1209          + "but is expected to be " + expectedNumLevels);
1210      }
1211    }
1212
1213    /**
1214     * Whether there is an inline block ready to be written. In general, we write an leaf-level
1215     * index block as an inline block as soon as its size as serialized in the non-root format
1216     * reaches a certain threshold.
1217     */
1218    @Override
1219    public boolean shouldWriteBlock(boolean closing) {
1220      if (singleLevelOnly) {
1221        throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1222      }
1223
1224      if (curInlineChunk == null) {
1225        throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been "
1226          + "called with closing=true and then called again?");
1227      }
1228
1229      if (curInlineChunk.getNumEntries() == 0) {
1230        return false;
1231      }
1232
1233      // We do have some entries in the current inline chunk.
1234      if (closing) {
1235        if (rootChunk.getNumEntries() == 0) {
1236          // We did not add any leaf-level blocks yet. Instead of creating a
1237          // leaf level with one block, move these entries to the root level.
1238
1239          expectNumLevels(1);
1240          rootChunk = curInlineChunk;
1241          curInlineChunk = null; // Disallow adding any more index entries.
1242          return false;
1243        }
1244
1245        return true;
1246      } else {
1247        return curInlineChunk.getNonRootSize() >= maxChunkSize;
1248      }
1249    }
1250
1251    /**
1252     * Write out the current inline index block. Inline blocks are non-root blocks, so the non-root
1253     * index format is used.
1254     */
1255    @Override
1256    public void writeInlineBlock(DataOutput out) throws IOException {
1257      if (singleLevelOnly) throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1258
1259      // Write the inline block index to the output stream in the non-root
1260      // index block format.
1261      indexBlockEncoder.encode(curInlineChunk, false, out);
1262
1263      // Save the first key of the inline block so that we can add it to the
1264      // parent-level index.
1265      firstKey = curInlineChunk.getBlockKey(0);
1266
1267      // Start a new inline index block
1268      curInlineChunk.clear();
1269    }
1270
1271    /**
1272     * Called after an inline block has been written so that we can add an entry referring to that
1273     * block to the parent-level index.
1274     */
1275    @Override
1276    public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1277      // Add leaf index block size
1278      totalBlockOnDiskSize += onDiskSize;
1279      totalBlockUncompressedSize += uncompressedSize;
1280
1281      if (singleLevelOnly) throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1282
1283      if (firstKey == null) {
1284        throw new IllegalStateException(
1285          "Trying to add second-level index " + "entry with offset=" + offset + " and onDiskSize="
1286            + onDiskSize + "but the first key was not set in writeInlineBlock");
1287      }
1288
1289      if (rootChunk.getNumEntries() == 0) {
1290        // We are writing the first leaf block, so increase index level.
1291        expectNumLevels(1);
1292        numLevels = 2;
1293      }
1294
1295      // Add another entry to the second-level index. Include the number of
1296      // entries in all previous leaf-level chunks for mid-key calculation.
1297      rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1298      firstKey = null;
1299    }
1300
1301    @Override
1302    public BlockType getInlineBlockType() {
1303      return BlockType.LEAF_INDEX;
1304    }
1305
1306    /**
1307     * Add one index entry to the current leaf-level block. When the leaf-level block gets large
1308     * enough, it will be flushed to disk as an inline block.
1309     * @param firstKey      the first key of the data block
1310     * @param blockOffset   the offset of the data block
1311     * @param blockDataSize the on-disk size of the data block ({@link HFile} format version 2), or
1312     *                      the uncompressed size of the data block ( {@link HFile} format version
1313     *                      1).
1314     */
1315    public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1316      curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1317      ++totalNumEntries;
1318    }
1319
1320    /**
1321     * @throws IOException if we happened to write a multi-level index.
1322     */
1323    public void ensureSingleLevel() throws IOException {
1324      if (numLevels > 1) {
1325        throw new IOException(
1326          "Wrote a " + numLevels + "-level index with " + rootChunk.getNumEntries()
1327            + " root-level entries, but " + "this is expected to be a single-level block index.");
1328      }
1329    }
1330
1331    /**
1332     * @return true if we are using cache-on-write. This is configured by the caller of the
1333     *         constructor by either passing a valid block cache or null.
1334     */
1335    @Override
1336    public boolean getCacheOnWrite() {
1337      return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1338    }
1339
1340    /**
1341     * The total uncompressed size of the root index block, intermediate-level index blocks, and
1342     * leaf-level index blocks.
1343     * @return the total uncompressed size of all index blocks
1344     */
1345    public long getTotalUncompressedSize() {
1346      return totalBlockUncompressedSize;
1347    }
1348
1349  }
1350
1351  /**
1352   * A single chunk of the block index in the process of writing. The data in this chunk can become
1353   * a leaf-level, intermediate-level, or root index block.
1354   */
1355  static class BlockIndexChunkImpl implements BlockIndexChunk {
1356
1357    /** First keys of the key range corresponding to each index entry. */
1358    private final List<byte[]> blockKeys = new ArrayList<>();
1359
1360    /** Block offset in backing stream. */
1361    private final List<Long> blockOffsets = new ArrayList<>();
1362
1363    /** On-disk data sizes of lower-level data or index blocks. */
1364    private final List<Integer> onDiskDataSizes = new ArrayList<>();
1365
1366    /**
1367     * The cumulative number of sub-entries, i.e. entries on deeper-level block index entries.
1368     * numSubEntriesAt[i] is the number of sub-entries in the blocks corresponding to this chunk's
1369     * entries #0 through #i inclusively.
1370     */
1371    private final List<Long> numSubEntriesAt = new ArrayList<>();
1372
1373    /**
1374     * The offset of the next entry to be added, relative to the end of the "secondary index" in the
1375     * "non-root" format representation of this index chunk. This is the next value to be added to
1376     * the secondary index.
1377     */
1378    private int curTotalNonRootEntrySize = 0;
1379
1380    /**
1381     * The accumulated size of this chunk if stored in the root index format.
1382     */
1383    private int curTotalRootSize = 0;
1384
1385    /**
1386     * The "secondary index" used for binary search over variable-length records in a "non-root"
1387     * format block. These offsets are relative to the end of this secondary index.
1388     */
1389    private final List<Integer> secondaryIndexOffsetMarks = new ArrayList<>();
1390
1391    /**
1392     * Adds a new entry to this block index chunk.
1393     * @param firstKey              the first key in the block pointed to by this entry
1394     * @param blockOffset           the offset of the next-level block pointed to by this entry
1395     * @param onDiskDataSize        the on-disk data of the block pointed to by this entry,
1396     *                              including header size
1397     * @param curTotalNumSubEntries if this chunk is the root index chunk under construction, this
1398     *                              specifies the current total number of sub-entries in all
1399     *                              leaf-level chunks, including the one corresponding to the
1400     *                              second-level entry being added.
1401     */
1402    @Override
1403    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1404      long curTotalNumSubEntries) {
1405      // Record the offset for the secondary index
1406      secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1407      curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD + firstKey.length;
1408
1409      curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1410        + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1411
1412      blockKeys.add(firstKey);
1413      blockOffsets.add(blockOffset);
1414      onDiskDataSizes.add(onDiskDataSize);
1415
1416      if (curTotalNumSubEntries != -1) {
1417        numSubEntriesAt.add(curTotalNumSubEntries);
1418
1419        // Make sure the parallel arrays are in sync.
1420        if (numSubEntriesAt.size() != blockKeys.size()) {
1421          throw new IllegalStateException("Only have key/value count " + "stats for "
1422            + numSubEntriesAt.size() + " block index " + "entries out of " + blockKeys.size());
1423        }
1424      }
1425    }
1426
1427    /**
1428     * The same as {@link #add(byte[], long, int, long)} but does not take the key/value into
1429     * account. Used for single-level indexes.
1430     * @see #add(byte[], long, int, long)
1431     */
1432    @Override
1433    public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1434      add(firstKey, blockOffset, onDiskDataSize, -1);
1435    }
1436
1437    @Override
1438    public void clear() {
1439      blockKeys.clear();
1440      blockOffsets.clear();
1441      onDiskDataSizes.clear();
1442      secondaryIndexOffsetMarks.clear();
1443      numSubEntriesAt.clear();
1444      curTotalNonRootEntrySize = 0;
1445      curTotalRootSize = 0;
1446    }
1447
1448    /**
1449     * Finds the entry corresponding to the deeper-level index block containing the given
1450     * deeper-level entry (a "sub-entry"), assuming a global 0-based ordering of sub-entries.
1451     * <p>
1452     * <i> Implementation note. </i> We are looking for i such that numSubEntriesAt[i - 1] <= k <
1453     * numSubEntriesAt[i], because a deeper-level block #i (0-based) contains sub-entries #
1454     * numSubEntriesAt[i - 1]'th through numSubEntriesAt[i] - 1, assuming a global 0-based ordering
1455     * of sub-entries. i is by definition the insertion point of k in numSubEntriesAt.
1456     * @param k sub-entry index, from 0 to the total number sub-entries - 1
1457     * @return the 0-based index of the entry corresponding to the given sub-entry
1458     */
1459    @Override
1460    public int getEntryBySubEntry(long k) {
1461      // We define mid-key as the key corresponding to k'th sub-entry
1462      // (0-based).
1463
1464      int i = Collections.binarySearch(numSubEntriesAt, k);
1465
1466      // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1467      // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1468      // is in the (i + 1)'th chunk.
1469      if (i >= 0) return i + 1;
1470
1471      // Inexact match. Return the insertion point.
1472      return -i - 1;
1473    }
1474
1475    /**
1476     * Used when writing the root block index of a multi-level block index. Serializes additional
1477     * information allowing to efficiently identify the mid-key.
1478     * @return a few serialized fields for finding the mid-key
1479     * @throws IOException if could not create metadata for computing mid-key
1480     */
1481    @Override
1482    public byte[] getMidKeyMetadata() throws IOException {
1483      ByteArrayOutputStream baos = new ByteArrayOutputStream(MID_KEY_METADATA_SIZE);
1484      DataOutputStream baosDos = new DataOutputStream(baos);
1485      long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1486      if (totalNumSubEntries == 0) {
1487        throw new IOException("No leaf-level entries, mid-key unavailable");
1488      }
1489      long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1490      int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1491
1492      baosDos.writeLong(blockOffsets.get(midKeyEntry));
1493      baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1494
1495      long numSubEntriesBefore = midKeyEntry > 0 ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1496      long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1497      if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE) {
1498        throw new IOException("Could not identify mid-key index within the "
1499          + "leaf-level block containing mid-key: out of range (" + subEntryWithinEntry
1500          + ", numSubEntriesBefore=" + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1501          + ")");
1502      }
1503
1504      baosDos.writeInt((int) subEntryWithinEntry);
1505
1506      if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1507        throw new IOException("Could not write mid-key metadata: size=" + baosDos.size()
1508          + ", correct size: " + MID_KEY_METADATA_SIZE);
1509      }
1510
1511      // Close just to be good citizens, although this has no effect.
1512      baos.close();
1513
1514      return baos.toByteArray();
1515    }
1516
1517    /** Returns the size of this chunk if stored in the non-root index block format */
1518    @Override
1519    public int getNonRootSize() {
1520      return Bytes.SIZEOF_INT // Number of entries
1521        + Bytes.SIZEOF_INT * (blockKeys.size() + 1) // Secondary index
1522        + curTotalNonRootEntrySize; // All entries
1523    }
1524
1525    @Override
1526    public int getCurTotalNonRootEntrySize() {
1527      return curTotalNonRootEntrySize;
1528    }
1529
1530    @Override
1531    public List<byte[]> getBlockKeys() {
1532      return blockKeys;
1533    }
1534
1535    @Override
1536    public List<Integer> getSecondaryIndexOffsetMarks() {
1537      return secondaryIndexOffsetMarks;
1538    }
1539
1540    /** Returns the size of this chunk if stored in the root index block format */
1541    @Override
1542    public int getRootSize() {
1543      return curTotalRootSize;
1544    }
1545
1546    /** Returns the number of entries in this block index chunk */
1547    public int getNumEntries() {
1548      return blockKeys.size();
1549    }
1550
1551    public byte[] getBlockKey(int i) {
1552      return blockKeys.get(i);
1553    }
1554
1555    public long getBlockOffset(int i) {
1556      return blockOffsets.get(i);
1557    }
1558
1559    public int getOnDiskDataSize(int i) {
1560      return onDiskDataSizes.get(i);
1561    }
1562
1563    public long getCumulativeNumKV(int i) {
1564      if (i < 0) return 0;
1565      return numSubEntriesAt.get(i);
1566    }
1567
1568  }
1569
1570  public static int getMaxChunkSize(Configuration conf) {
1571    return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1572  }
1573
1574  public static int getMinIndexNumEntries(Configuration conf) {
1575    return conf.getInt(MIN_INDEX_NUM_ENTRIES_KEY, DEFAULT_MIN_INDEX_NUM_ENTRIES);
1576  }
1577}